You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sn...@apache.org on 2021/11/12 02:53:55 UTC

[pinot] branch master updated: Add forceCleanup option for 'startReplaceSegments' API (#7744)

This is an automated email from the ASF dual-hosted git repository.

snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new e3d238a  Add forceCleanup option for 'startReplaceSegments' API (#7744)
e3d238a is described below

commit e3d238ac1d8633331d9507713266e41e6b40f870
Author: Seunghyun Lee <sn...@linkedin.com>
AuthorDate: Thu Nov 11 18:52:49 2021 -0800

    Add forceCleanup option for 'startReplaceSegments' API (#7744)
---
 .../PinotSegmentUploadDownloadRestletResource.java |   3 +-
 .../helix/core/PinotHelixResourceManager.java      | 133 ++++++++++++--------
 .../helix/core/PinotHelixResourceManagerTest.java  | 134 ++++++++++++++++-----
 3 files changed, 189 insertions(+), 81 deletions(-)

diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
index 78de2fd..1a75bfb 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
@@ -545,6 +545,7 @@ public class PinotSegmentUploadDownloadRestletResource {
   public Response startReplaceSegments(
       @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName,
       @ApiParam(value = "OFFLINE|REALTIME", required = true) @QueryParam("type") String tableTypeStr,
+      @ApiParam(value = "Force cleanup") @QueryParam("forceCleanup") @DefaultValue("false") boolean forceCleanup,
       StartReplaceSegmentsRequest startReplaceSegmentsRequest) {
     try {
       TableType tableType = Constants.validateTableType(tableTypeStr);
@@ -555,7 +556,7 @@ public class PinotSegmentUploadDownloadRestletResource {
       String tableNameWithType = TableNameBuilder.forType(tableType).tableNameWithType(tableName);
       String segmentLineageEntryId = _pinotHelixResourceManager
           .startReplaceSegments(tableNameWithType, startReplaceSegmentsRequest.getSegmentsFrom(),
-              startReplaceSegmentsRequest.getSegmentsTo());
+              startReplaceSegmentsRequest.getSegmentsTo(), forceCleanup);
       return Response.ok(JsonUtils.newObjectNode().put("segmentLineageEntryId", segmentLineageEntryId)).build();
     } catch (Exception e) {
       throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR, e);
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 22b6872..e32506b 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -2747,11 +2747,13 @@ public class PinotHelixResourceManager {
    * @param tableNameWithType Table name with type
    * @param segmentsFrom a list of segments to be merged
    * @param segmentsTo a list of merged segments
+   * @param forceCleanup True for enabling the force segment cleanup
    * @return Segment lineage entry id
    *
    * @throws InvalidConfigException
    */
-  public String startReplaceSegments(String tableNameWithType, List<String> segmentsFrom, List<String> segmentsTo) {
+  public String startReplaceSegments(String tableNameWithType, List<String> segmentsFrom, List<String> segmentsTo,
+      boolean forceCleanup) {
     // Create a segment lineage entry id
     String segmentLineageEntryId = SegmentLineageUtils.generateLineageEntryId();
 
@@ -2786,37 +2788,61 @@ public class PinotHelixResourceManager {
         Preconditions.checkArgument(segmentLineage.getLineageEntry(segmentLineageEntryId) == null,
             String.format("SegmentLineageEntryId (%s) already exists in the segment lineage.", segmentLineageEntryId));
 
+        List<String> segmentsToCleanUp = new ArrayList<>();
         for (String entryId : segmentLineage.getLineageEntryIds()) {
           LineageEntry lineageEntry = segmentLineage.getLineageEntry(entryId);
 
-          // If segment entry is in 'REVERTED' state, no need to check for 'segmentsFrom'.
-          if (lineageEntry.getState() != LineageEntryState.REVERTED) {
+          // If the lineage entry is in 'REVERTED' state, no need to go through the validation because we can regard
+          // the entry as not existing.
+          if (lineageEntry.getState() == LineageEntryState.REVERTED) {
+            continue;
+          }
+
+          // By here, the lineage entry is either 'IN_PROGRESS' or 'COMPLETED'.
+
+          // When 'forceCleanup' is enabled, we need to proactively revert the lineage entry when we find the lineage
+          // entry with the same 'segmentFrom' values.
+          if (forceCleanup && lineageEntry.getState() == LineageEntryState.IN_PROGRESS && CollectionUtils
+              .isEqualCollection(segmentsFrom, lineageEntry.getSegmentsFrom())) {
+            // Update segment lineage entry to 'REVERTED'
+            updateSegmentLineageEntryToReverted(tableNameWithType, segmentLineage, entryId, lineageEntry);
+
+            // Add segments for proactive clean-up.
+            segmentsToCleanUp.addAll(lineageEntry.getSegmentsTo());
+          } else {
             // Check that any segment from 'segmentsFrom' does not appear twice.
             Preconditions.checkArgument(Collections.disjoint(lineageEntry.getSegmentsFrom(), segmentsFrom), String
-                .format(
-                    "It is not allowed to merge segments that are already merged. (tableName = %s, segmentsFrom from "
-                        + "existing lineage entry = %s, requested segmentsFrom = %s)", tableNameWithType,
-                    lineageEntry.getSegmentsFrom(), segmentsFrom));
+                .format("It is not allowed to replace segments that are already replaced. (tableName = %s, "
+                        + "segmentsFrom from the existing lineage entry = %s, requested segmentsFrom = %s)",
+                    tableNameWithType, lineageEntry.getSegmentsFrom(), segmentsFrom));
+
+            // Check that any segment from 'segmentTo' does not appear twice.
+            Preconditions.checkArgument(Collections.disjoint(lineageEntry.getSegmentsTo(), segmentsTo), String.format(
+                "It is not allowed to have the same segment name for segments in 'segmentsTo'. (tableName = %s, "
+                    + "segmentsTo from the existing lineage entry = %s, requested segmentsTo = %s)", tableNameWithType,
+                lineageEntry.getSegmentsTo(), segmentsTo));
           }
-
-          // Check that merged segments name cannot be the same.
-          Preconditions.checkArgument(Collections.disjoint(lineageEntry.getSegmentsTo(), segmentsTo), String.format(
-              "It is not allowed to have the same segment name for merged segments. (tableName = %s, segmentsTo from "
-                  + "existing lineage entry = %s, requested segmentsTo = %s)", tableNameWithType,
-              lineageEntry.getSegmentsTo(), segmentsTo));
         }
 
         // Update lineage entry
         segmentLineage.addLineageEntry(segmentLineageEntryId,
             new LineageEntry(segmentsFrom, segmentsTo, LineageEntryState.IN_PROGRESS, System.currentTimeMillis()));
 
-        // Write back to the lineage entry
-        return SegmentLineageAccessHelper.writeSegmentLineage(_propertyStore, segmentLineage, expectedVersion);
+        // Write back to the lineage entry to the property store
+        if (SegmentLineageAccessHelper.writeSegmentLineage(_propertyStore, segmentLineage, expectedVersion)) {
+          // Trigger the proactive segment clean up if needed. Once the lineage is updated in the property store, it
+          // is safe to physically delete segments.
+          if (!segmentsToCleanUp.isEmpty()) {
+            deleteSegments(tableNameWithType, segmentsToCleanUp);
+          }
+          return true;
+        } else {
+          return false;
+        }
       });
     } catch (Exception e) {
-      String errorMsg = String
-          .format("Failed while updating the segment lineage. (tableName = %s, segmentsFrom = %s, segmentsTo = %s)",
-              tableNameWithType, segmentsFrom, segmentsTo);
+      String errorMsg = String.format("Failed to update the segment lineage during startReplaceSegments. "
+          + "(tableName = %s, segmentsFrom = %s, segmentsTo = %s)", tableNameWithType, segmentsFrom, segmentsTo);
       LOGGER.error(errorMsg, e);
       throw new RuntimeException(errorMsg, e);
     }
@@ -2860,10 +2886,11 @@ public class PinotHelixResourceManager {
             .format("Invalid segment lineage entry id (tableName='%s', segmentLineageEntryId='%s')", tableNameWithType,
                 segmentLineageEntryId));
 
-        // NO-OPS if the entry is already completed
-        if (lineageEntry.getState() == LineageEntryState.COMPLETED) {
-          LOGGER.warn("Lineage entry state is already COMPLETED. Nothing to update. (tableNameWithType={}, "
-              + "segmentLineageEntryId={})", tableNameWithType, segmentLineageEntryId);
+        // NO-OPS if the entry is already 'COMPLETED' or 'REVERTED'
+        if (lineageEntry.getState() != LineageEntryState.IN_PROGRESS) {
+          LOGGER.warn("Lineage entry state is not 'IN_PROGRESS'. Cannot update to 'COMPLETED'. (tableNameWithType={}, "
+                  + "segmentLineageEntryId={}, state={})", tableNameWithType, segmentLineageEntryId,
+              lineageEntry.getState());
           return true;
         }
 
@@ -2901,9 +2928,8 @@ public class PinotHelixResourceManager {
         }
       });
     } catch (Exception e) {
-      String errorMsg = String
-          .format("Failed to update the segment lineage. (tableName = %s, segmentLineageEntryId = %s)",
-              tableNameWithType, segmentLineageEntryId);
+      String errorMsg = String.format("Failed to update the segment lineage during endReplaceSegments. "
+          + "(tableName = %s, segmentLineageEntryId = %s)", tableNameWithType, segmentLineageEntryId);
       LOGGER.error(errorMsg, e);
       throw new RuntimeException(errorMsg, e);
     }
@@ -2943,29 +2969,19 @@ public class PinotHelixResourceManager {
             .format("Invalid segment lineage entry id (tableName='%s', segmentLineageEntryId='%s')", tableNameWithType,
                 segmentLineageEntryId));
 
-        if (lineageEntry.getState() != LineageEntryState.COMPLETED) {
-          // We do not allow to revert the lineage entry with 'REVERTED' state. For 'IN_PROGRESS", we only allow to
-          // revert when 'forceRevert' is set to true.
-          if (lineageEntry.getState() != LineageEntryState.IN_PROGRESS || !forceRevert) {
-            LOGGER.warn("Lineage state is not valid. Cannot revert the lineage entry. (tableNameWithType={}, "
-                    + "segmentLineageEntryId={}, segmentLineageEntrySate={}, forceRevert={})", tableNameWithType,
-                segmentLineageEntryId, lineageEntry.getState(), forceRevert);
-            return false;
-          }
+        // We do not allow to revert the lineage entry with 'REVERTED' state. For 'IN_PROGRESS", we only allow to
+        // revert when 'forceRevert' is set to true.
+        if (lineageEntry.getState() == LineageEntryState.REVERTED || (
+            lineageEntry.getState() == LineageEntryState.IN_PROGRESS && !forceRevert)) {
+          String errorMsg = String.format(
+              "Lineage state is not valid. Cannot update the lineage entry to be 'REVERTED'. (tableNameWithType=%s, "
+                  + "segmentLineageEntryId=%s, segmentLineageEntryState=%s, forceRevert=%s)", tableNameWithType,
+              segmentLineageEntryId, lineageEntry.getState(), forceRevert);
+          throw new RuntimeException(errorMsg);
         }
 
-        // Check that all segments from 'segmentsFrom' are in ONLINE state in the external view.
-        Set<String> onlineSegments = getOnlineSegmentsFromExternalView(tableNameWithType);
-        Preconditions.checkArgument(onlineSegments.containsAll(lineageEntry.getSegmentsFrom()), String.format(
-            "Not all segments from 'segmentFrom' are in ONLINE state in the external view. (tableName = '%s', "
-                + "segmentsFrom = '%s', onlineSegments = '%s'", tableNameWithType, lineageEntry.getSegmentsFrom(),
-            onlineSegments));
-
-        // Update lineage entry
-        LineageEntry newLineageEntry =
-            new LineageEntry(lineageEntry.getSegmentsFrom(), lineageEntry.getSegmentsTo(), LineageEntryState.REVERTED,
-                System.currentTimeMillis());
-        segmentLineage.updateLineageEntry(segmentLineageEntryId, newLineageEntry);
+        // Update segment lineage entry to 'REVERTED'
+        updateSegmentLineageEntryToReverted(tableNameWithType, segmentLineage, segmentLineageEntryId, lineageEntry);
 
         // Write back to the lineage entry
         if (SegmentLineageAccessHelper.writeSegmentLineage(_propertyStore, segmentLineage, expectedVersion)) {
@@ -2973,15 +2989,19 @@ public class PinotHelixResourceManager {
           // routing table because it is possible that there has been no EV change but the routing result may be
           // different after updating the lineage entry.
           sendRoutingTableRebuildMessage(tableNameWithType);
+
+          // Invoke the proactive clean-up for segments that we no longer needs in case 'forceRevert' is enabled
+          if (forceRevert) {
+            deleteSegments(tableNameWithType, lineageEntry.getSegmentsTo());
+          }
           return true;
         } else {
           return false;
         }
       });
     } catch (Exception e) {
-      String errorMsg = String
-          .format("Failed to update the segment lineage. (tableName = %s, segmentLineageEntryId = %s)",
-              tableNameWithType, segmentLineageEntryId);
+      String errorMsg = String.format("Failed to update the segment lineage during revertReplaceSegments. "
+          + "(tableName = %s, segmentLineageEntryId = %s)", tableNameWithType, segmentLineageEntryId);
       LOGGER.error(errorMsg, e);
       throw new RuntimeException(errorMsg, e);
     }
@@ -2991,6 +3011,21 @@ public class PinotHelixResourceManager {
         tableNameWithType, segmentLineageEntryId);
   }
 
+  private void updateSegmentLineageEntryToReverted(String tableNameWithType, SegmentLineage segmentLineage,
+      String segmentLineageEntryId, LineageEntry lineageEntry) {
+    // Check that all segments from 'segmentsFrom' are in ONLINE state in the external view.
+    Set<String> onlineSegments = getOnlineSegmentsFromExternalView(tableNameWithType);
+    Preconditions.checkArgument(onlineSegments.containsAll(lineageEntry.getSegmentsFrom()), String.format(
+        "Failed to update the lineage to be 'REVERTED'. Not all segments from 'segmentFrom' are in ONLINE state "
+            + "in the external view. (tableName = '%s', segmentsFrom = '%s', onlineSegments = '%s'", tableNameWithType,
+        lineageEntry.getSegmentsFrom(), onlineSegments));
+
+    // Update lineage entry
+    segmentLineage.updateLineageEntry(segmentLineageEntryId,
+        new LineageEntry(lineageEntry.getSegmentsFrom(), lineageEntry.getSegmentsTo(), LineageEntryState.REVERTED,
+            System.currentTimeMillis()));
+  }
+
   private void waitForSegmentsBecomeOnline(String tableNameWithType, Set<String> segmentsToCheck)
       throws InterruptedException, TimeoutException {
     long endTimeMs = System.currentTimeMillis() + EXTERNAL_VIEW_ONLINE_SEGMENTS_MAX_WAIT_MS;
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
index 89e98b0..ae524a6 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
@@ -471,7 +471,7 @@ public class PinotHelixResourceManagerTest {
     List<String> segmentsTo = Arrays.asList("s5", "s6");
 
     String lineageEntryId = ControllerTestUtils.getHelixResourceManager()
-        .startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, segmentsFrom, segmentsTo);
+        .startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, segmentsFrom, segmentsTo, false);
     SegmentLineage segmentLineage = SegmentLineageAccessHelper
         .getSegmentLineage(ControllerTestUtils.getPropertyStore(), OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME);
     Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 1);
@@ -484,7 +484,7 @@ public class PinotHelixResourceManagerTest {
     segmentsTo = Arrays.asList("s3", "s4");
     try {
       ControllerTestUtils.getHelixResourceManager()
-          .startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, segmentsFrom, segmentsTo);
+          .startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, segmentsFrom, segmentsTo, false);
     } catch (Exception e) {
       // expected
     }
@@ -492,34 +492,16 @@ public class PinotHelixResourceManagerTest {
     segmentsTo = Arrays.asList("s2");
     try {
       ControllerTestUtils.getHelixResourceManager()
-          .startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, segmentsFrom, segmentsTo);
+          .startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, segmentsFrom, segmentsTo, false);
     } catch (Exception e) {
       // expected
     }
 
     // Check invalid segmentsFrom
     segmentsFrom = Arrays.asList("s1", "s6");
-    segmentsTo = Arrays.asList("merged1", "merged2");
     try {
       ControllerTestUtils.getHelixResourceManager()
-          .startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, segmentsFrom, segmentsTo);
-    } catch (Exception e) {
-      // expected
-    }
-
-    segmentsFrom = Arrays.asList("s1", "s2");
-    String lineageEntryId2 = ControllerTestUtils.getHelixResourceManager()
-        .startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, segmentsFrom, segmentsTo);
-    segmentLineage = SegmentLineageAccessHelper
-        .getSegmentLineage(ControllerTestUtils.getPropertyStore(), OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME);
-    Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 2);
-    Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId2).getSegmentsFrom(), segmentsFrom);
-    Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId2).getSegmentsTo(), segmentsTo);
-    Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId2).getState(), LineageEntryState.IN_PROGRESS);
-
-    try {
-      ControllerTestUtils.getHelixResourceManager()
-          .startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, segmentsFrom, segmentsTo);
+          .startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, segmentsFrom, segmentsTo, false);
     } catch (Exception e) {
       // expected
     }
@@ -556,27 +538,117 @@ public class PinotHelixResourceManagerTest {
         .endReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, lineageEntryId);
     segmentLineage = SegmentLineageAccessHelper
         .getSegmentLineage(ControllerTestUtils.getPropertyStore(), OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME);
-    Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 2);
+    Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 1);
     Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId).getSegmentsFrom(), new ArrayList<>());
     Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId).getSegmentsTo(), Arrays.asList("s5", "s6"));
     Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId).getState(), LineageEntryState.COMPLETED);
 
+    // Start the new segment replacement
+    segmentsFrom = Arrays.asList("s1", "s2");
+    segmentsTo = Arrays.asList("merged_t1_0", "merged_t1_1");
+    String lineageEntryId2 = ControllerTestUtils.getHelixResourceManager()
+        .startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, segmentsFrom, segmentsTo, false);
+    segmentLineage = SegmentLineageAccessHelper
+        .getSegmentLineage(ControllerTestUtils.getPropertyStore(), OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME);
+    Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 2);
+    Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId2).getSegmentsFrom(), Arrays.asList("s1", "s2"));
+    Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId2).getSegmentsTo(),
+        Arrays.asList("merged_t1_0", "merged_t1_1"));
+    Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId2).getState(), LineageEntryState.IN_PROGRESS);
+
+    // Upload partial data
     ControllerTestUtils.getHelixResourceManager().addNewSegment(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME,
-        SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, "merged1"),
+        SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, "merged_t1_0"),
+        "downloadUrl");
+
+    IdealState idealState =
+        ControllerTestUtils.getHelixResourceManager().getTableIdealState(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME);
+    Assert.assertTrue(!idealState.getInstanceSet("merged_t1_0").isEmpty());
+
+    // Try to revert the entry with partial data uploaded without forceRevert
+    try {
+      ControllerTestUtils.getHelixResourceManager()
+          .revertReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, lineageEntryId2, false);
+    } catch (Exception e) {
+      // expected
+    }
+
+    // Try to revert the entry with partial data uploaded with forceRevert
+    ControllerTestUtils.getHelixResourceManager()
+        .revertReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, lineageEntryId2, true);
+    segmentLineage = SegmentLineageAccessHelper
+        .getSegmentLineage(ControllerTestUtils.getPropertyStore(), OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME);
+    Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId2).getState(), LineageEntryState.REVERTED);
+
+    // 'merged_t1_0' segment should be proactively cleaned up
+    idealState =
+        ControllerTestUtils.getHelixResourceManager().getTableIdealState(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME);
+    Assert.assertTrue(idealState.getInstanceSet("merged_t1_0").isEmpty());
+
+    // Start new segment replacement since the above entry is reverted
+    segmentsFrom = Arrays.asList("s1", "s2");
+    segmentsTo = Arrays.asList("merged_t2_0", "merged_t2_1");
+    String lineageEntryId3 = ControllerTestUtils.getHelixResourceManager()
+        .startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, segmentsFrom, segmentsTo, false);
+    segmentLineage = SegmentLineageAccessHelper
+        .getSegmentLineage(ControllerTestUtils.getPropertyStore(), OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME);
+    Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 3);
+    Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId3).getSegmentsFrom(), segmentsFrom);
+    Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId3).getSegmentsTo(), segmentsTo);
+    Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId3).getState(), LineageEntryState.IN_PROGRESS);
+
+    // Upload partial data
+    ControllerTestUtils.getHelixResourceManager().addNewSegment(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME,
+        SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, "merged_t2_0"),
+        "downloadUrl");
+
+    // Without force cleanup, 'startReplaceSegments' should fail because of duplicate segments on 'segmentFrom'.
+    segmentsTo = Arrays.asList("merged_t3_0", "merged_t3_1");
+    try {
+      ControllerTestUtils.getHelixResourceManager()
+          .startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, segmentsFrom, segmentsTo, false);
+    } catch (Exception e) {
+      // expected
+    }
+
+    // Test force clean up case
+    String lineageEntryId4 = ControllerTestUtils.getHelixResourceManager()
+        .startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, segmentsFrom, segmentsTo, true);
+    segmentLineage = SegmentLineageAccessHelper
+        .getSegmentLineage(ControllerTestUtils.getPropertyStore(), OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME);
+    Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 4);
+    Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId3).getSegmentsFrom(), Arrays.asList("s1", "s2"));
+    Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId3).getSegmentsTo(),
+        Arrays.asList("merged_t2_0", "merged_t2_1"));
+    Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId3).getState(), LineageEntryState.REVERTED);
+    Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId4).getSegmentsFrom(), Arrays.asList("s1", "s2"));
+    Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId4).getSegmentsTo(),
+        Arrays.asList("merged_t3_0", "merged_t3_1"));
+    Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId4).getState(), LineageEntryState.IN_PROGRESS);
+
+    // 'merged_t2_0' segment should be proactively cleaned up
+    idealState =
+        ControllerTestUtils.getHelixResourceManager().getTableIdealState(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME);
+    Assert.assertTrue(idealState.getInstanceSet("merged_t2_0").isEmpty());
+
+    // Upload segments again
+    ControllerTestUtils.getHelixResourceManager().addNewSegment(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME,
+        SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, "merged_t3_0"),
         "downloadUrl");
     ControllerTestUtils.getHelixResourceManager().addNewSegment(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME,
-        SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, "merged2"),
+        SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, "merged_t3_1"),
         "downloadUrl");
 
+    // Finish the replacement
     ControllerTestUtils.getHelixResourceManager()
-        .endReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, lineageEntryId2);
+        .endReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, lineageEntryId4);
     segmentLineage = SegmentLineageAccessHelper
         .getSegmentLineage(ControllerTestUtils.getPropertyStore(), OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME);
-    Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 2);
-    Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId2).getSegmentsFrom(), Arrays.asList("s1", "s2"));
-    Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId2).getSegmentsTo(),
-        Arrays.asList("merged1", "merged2"));
-    Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId2).getState(), LineageEntryState.COMPLETED);
+    Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 4);
+    Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId4).getSegmentsFrom(), Arrays.asList("s1", "s2"));
+    Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId4).getSegmentsTo(),
+        Arrays.asList("merged_t3_0", "merged_t3_1"));
+    Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId4).getState(), LineageEntryState.COMPLETED);
   }
 
   @Test

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org