You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sn...@apache.org on 2021/11/12 02:53:55 UTC
[pinot] branch master updated: Add forceCleanup option for
'startReplaceSegments' API (#7744)
This is an automated email from the ASF dual-hosted git repository.
snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new e3d238a Add forceCleanup option for 'startReplaceSegments' API (#7744)
e3d238a is described below
commit e3d238ac1d8633331d9507713266e41e6b40f870
Author: Seunghyun Lee <sn...@linkedin.com>
AuthorDate: Thu Nov 11 18:52:49 2021 -0800
Add forceCleanup option for 'startReplaceSegments' API (#7744)
---
.../PinotSegmentUploadDownloadRestletResource.java | 3 +-
.../helix/core/PinotHelixResourceManager.java | 133 ++++++++++++--------
.../helix/core/PinotHelixResourceManagerTest.java | 134 ++++++++++++++++-----
3 files changed, 189 insertions(+), 81 deletions(-)
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
index 78de2fd..1a75bfb 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
@@ -545,6 +545,7 @@ public class PinotSegmentUploadDownloadRestletResource {
public Response startReplaceSegments(
@ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName,
@ApiParam(value = "OFFLINE|REALTIME", required = true) @QueryParam("type") String tableTypeStr,
+ @ApiParam(value = "Force cleanup") @QueryParam("forceCleanup") @DefaultValue("false") boolean forceCleanup,
StartReplaceSegmentsRequest startReplaceSegmentsRequest) {
try {
TableType tableType = Constants.validateTableType(tableTypeStr);
@@ -555,7 +556,7 @@ public class PinotSegmentUploadDownloadRestletResource {
String tableNameWithType = TableNameBuilder.forType(tableType).tableNameWithType(tableName);
String segmentLineageEntryId = _pinotHelixResourceManager
.startReplaceSegments(tableNameWithType, startReplaceSegmentsRequest.getSegmentsFrom(),
- startReplaceSegmentsRequest.getSegmentsTo());
+ startReplaceSegmentsRequest.getSegmentsTo(), forceCleanup);
return Response.ok(JsonUtils.newObjectNode().put("segmentLineageEntryId", segmentLineageEntryId)).build();
} catch (Exception e) {
throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR, e);
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 22b6872..e32506b 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -2747,11 +2747,13 @@ public class PinotHelixResourceManager {
* @param tableNameWithType Table name with type
* @param segmentsFrom a list of segments to be merged
* @param segmentsTo a list of merged segments
+ * @param forceCleanup True for enabling the force segment cleanup
* @return Segment lineage entry id
*
* @throws InvalidConfigException
*/
- public String startReplaceSegments(String tableNameWithType, List<String> segmentsFrom, List<String> segmentsTo) {
+ public String startReplaceSegments(String tableNameWithType, List<String> segmentsFrom, List<String> segmentsTo,
+ boolean forceCleanup) {
// Create a segment lineage entry id
String segmentLineageEntryId = SegmentLineageUtils.generateLineageEntryId();
@@ -2786,37 +2788,61 @@ public class PinotHelixResourceManager {
Preconditions.checkArgument(segmentLineage.getLineageEntry(segmentLineageEntryId) == null,
String.format("SegmentLineageEntryId (%s) already exists in the segment lineage.", segmentLineageEntryId));
+ List<String> segmentsToCleanUp = new ArrayList<>();
for (String entryId : segmentLineage.getLineageEntryIds()) {
LineageEntry lineageEntry = segmentLineage.getLineageEntry(entryId);
- // If segment entry is in 'REVERTED' state, no need to check for 'segmentsFrom'.
- if (lineageEntry.getState() != LineageEntryState.REVERTED) {
+ // If the lineage entry is in 'REVERTED' state, no need to go through the validation because we can regard
+ // the entry as not existing.
+ if (lineageEntry.getState() == LineageEntryState.REVERTED) {
+ continue;
+ }
+
+ // By here, the lineage entry is either 'IN_PROGRESS' or 'COMPLETED'.
+
+ // When 'forceCleanup' is enabled, we need to proactively revert the lineage entry when we find the lineage
+ // entry with the same 'segmentFrom' values.
+ if (forceCleanup && lineageEntry.getState() == LineageEntryState.IN_PROGRESS && CollectionUtils
+ .isEqualCollection(segmentsFrom, lineageEntry.getSegmentsFrom())) {
+ // Update segment lineage entry to 'REVERTED'
+ updateSegmentLineageEntryToReverted(tableNameWithType, segmentLineage, entryId, lineageEntry);
+
+ // Add segments for proactive clean-up.
+ segmentsToCleanUp.addAll(lineageEntry.getSegmentsTo());
+ } else {
// Check that any segment from 'segmentsFrom' does not appear twice.
Preconditions.checkArgument(Collections.disjoint(lineageEntry.getSegmentsFrom(), segmentsFrom), String
- .format(
- "It is not allowed to merge segments that are already merged. (tableName = %s, segmentsFrom from "
- + "existing lineage entry = %s, requested segmentsFrom = %s)", tableNameWithType,
- lineageEntry.getSegmentsFrom(), segmentsFrom));
+ .format("It is not allowed to replace segments that are already replaced. (tableName = %s, "
+ + "segmentsFrom from the existing lineage entry = %s, requested segmentsFrom = %s)",
+ tableNameWithType, lineageEntry.getSegmentsFrom(), segmentsFrom));
+
+ // Check that any segment from 'segmentTo' does not appear twice.
+ Preconditions.checkArgument(Collections.disjoint(lineageEntry.getSegmentsTo(), segmentsTo), String.format(
+ "It is not allowed to have the same segment name for segments in 'segmentsTo'. (tableName = %s, "
+ + "segmentsTo from the existing lineage entry = %s, requested segmentsTo = %s)", tableNameWithType,
+ lineageEntry.getSegmentsTo(), segmentsTo));
}
-
- // Check that merged segments name cannot be the same.
- Preconditions.checkArgument(Collections.disjoint(lineageEntry.getSegmentsTo(), segmentsTo), String.format(
- "It is not allowed to have the same segment name for merged segments. (tableName = %s, segmentsTo from "
- + "existing lineage entry = %s, requested segmentsTo = %s)", tableNameWithType,
- lineageEntry.getSegmentsTo(), segmentsTo));
}
// Update lineage entry
segmentLineage.addLineageEntry(segmentLineageEntryId,
new LineageEntry(segmentsFrom, segmentsTo, LineageEntryState.IN_PROGRESS, System.currentTimeMillis()));
- // Write back to the lineage entry
- return SegmentLineageAccessHelper.writeSegmentLineage(_propertyStore, segmentLineage, expectedVersion);
+ // Write back to the lineage entry to the property store
+ if (SegmentLineageAccessHelper.writeSegmentLineage(_propertyStore, segmentLineage, expectedVersion)) {
+ // Trigger the proactive segment clean up if needed. Once the lineage is updated in the property store, it
+ // is safe to physically delete segments.
+ if (!segmentsToCleanUp.isEmpty()) {
+ deleteSegments(tableNameWithType, segmentsToCleanUp);
+ }
+ return true;
+ } else {
+ return false;
+ }
});
} catch (Exception e) {
- String errorMsg = String
- .format("Failed while updating the segment lineage. (tableName = %s, segmentsFrom = %s, segmentsTo = %s)",
- tableNameWithType, segmentsFrom, segmentsTo);
+ String errorMsg = String.format("Failed to update the segment lineage during startReplaceSegments. "
+ + "(tableName = %s, segmentsFrom = %s, segmentsTo = %s)", tableNameWithType, segmentsFrom, segmentsTo);
LOGGER.error(errorMsg, e);
throw new RuntimeException(errorMsg, e);
}
@@ -2860,10 +2886,11 @@ public class PinotHelixResourceManager {
.format("Invalid segment lineage entry id (tableName='%s', segmentLineageEntryId='%s')", tableNameWithType,
segmentLineageEntryId));
- // NO-OPS if the entry is already completed
- if (lineageEntry.getState() == LineageEntryState.COMPLETED) {
- LOGGER.warn("Lineage entry state is already COMPLETED. Nothing to update. (tableNameWithType={}, "
- + "segmentLineageEntryId={})", tableNameWithType, segmentLineageEntryId);
+ // NO-OPS if the entry is already 'COMPLETED' or 'REVERTED'
+ if (lineageEntry.getState() != LineageEntryState.IN_PROGRESS) {
+ LOGGER.warn("Lineage entry state is not 'IN_PROGRESS'. Cannot update to 'COMPLETED'. (tableNameWithType={}, "
+ + "segmentLineageEntryId={}, state={})", tableNameWithType, segmentLineageEntryId,
+ lineageEntry.getState());
return true;
}
@@ -2901,9 +2928,8 @@ public class PinotHelixResourceManager {
}
});
} catch (Exception e) {
- String errorMsg = String
- .format("Failed to update the segment lineage. (tableName = %s, segmentLineageEntryId = %s)",
- tableNameWithType, segmentLineageEntryId);
+ String errorMsg = String.format("Failed to update the segment lineage during endReplaceSegments. "
+ + "(tableName = %s, segmentLineageEntryId = %s)", tableNameWithType, segmentLineageEntryId);
LOGGER.error(errorMsg, e);
throw new RuntimeException(errorMsg, e);
}
@@ -2943,29 +2969,19 @@ public class PinotHelixResourceManager {
.format("Invalid segment lineage entry id (tableName='%s', segmentLineageEntryId='%s')", tableNameWithType,
segmentLineageEntryId));
- if (lineageEntry.getState() != LineageEntryState.COMPLETED) {
- // We do not allow to revert the lineage entry with 'REVERTED' state. For 'IN_PROGRESS", we only allow to
- // revert when 'forceRevert' is set to true.
- if (lineageEntry.getState() != LineageEntryState.IN_PROGRESS || !forceRevert) {
- LOGGER.warn("Lineage state is not valid. Cannot revert the lineage entry. (tableNameWithType={}, "
- + "segmentLineageEntryId={}, segmentLineageEntrySate={}, forceRevert={})", tableNameWithType,
- segmentLineageEntryId, lineageEntry.getState(), forceRevert);
- return false;
- }
+ // We do not allow to revert the lineage entry with 'REVERTED' state. For 'IN_PROGRESS", we only allow to
+ // revert when 'forceRevert' is set to true.
+ if (lineageEntry.getState() == LineageEntryState.REVERTED || (
+ lineageEntry.getState() == LineageEntryState.IN_PROGRESS && !forceRevert)) {
+ String errorMsg = String.format(
+ "Lineage state is not valid. Cannot update the lineage entry to be 'REVERTED'. (tableNameWithType=%s, "
+ + "segmentLineageEntryId=%s, segmentLineageEntryState=%s, forceRevert=%s)", tableNameWithType,
+ segmentLineageEntryId, lineageEntry.getState(), forceRevert);
+ throw new RuntimeException(errorMsg);
}
- // Check that all segments from 'segmentsFrom' are in ONLINE state in the external view.
- Set<String> onlineSegments = getOnlineSegmentsFromExternalView(tableNameWithType);
- Preconditions.checkArgument(onlineSegments.containsAll(lineageEntry.getSegmentsFrom()), String.format(
- "Not all segments from 'segmentFrom' are in ONLINE state in the external view. (tableName = '%s', "
- + "segmentsFrom = '%s', onlineSegments = '%s'", tableNameWithType, lineageEntry.getSegmentsFrom(),
- onlineSegments));
-
- // Update lineage entry
- LineageEntry newLineageEntry =
- new LineageEntry(lineageEntry.getSegmentsFrom(), lineageEntry.getSegmentsTo(), LineageEntryState.REVERTED,
- System.currentTimeMillis());
- segmentLineage.updateLineageEntry(segmentLineageEntryId, newLineageEntry);
+ // Update segment lineage entry to 'REVERTED'
+ updateSegmentLineageEntryToReverted(tableNameWithType, segmentLineage, segmentLineageEntryId, lineageEntry);
// Write back to the lineage entry
if (SegmentLineageAccessHelper.writeSegmentLineage(_propertyStore, segmentLineage, expectedVersion)) {
@@ -2973,15 +2989,19 @@ public class PinotHelixResourceManager {
// routing table because it is possible that there has been no EV change but the routing result may be
// different after updating the lineage entry.
sendRoutingTableRebuildMessage(tableNameWithType);
+
+ // Invoke the proactive clean-up for segments that we no longer needs in case 'forceRevert' is enabled
+ if (forceRevert) {
+ deleteSegments(tableNameWithType, lineageEntry.getSegmentsTo());
+ }
return true;
} else {
return false;
}
});
} catch (Exception e) {
- String errorMsg = String
- .format("Failed to update the segment lineage. (tableName = %s, segmentLineageEntryId = %s)",
- tableNameWithType, segmentLineageEntryId);
+ String errorMsg = String.format("Failed to update the segment lineage during revertReplaceSegments. "
+ + "(tableName = %s, segmentLineageEntryId = %s)", tableNameWithType, segmentLineageEntryId);
LOGGER.error(errorMsg, e);
throw new RuntimeException(errorMsg, e);
}
@@ -2991,6 +3011,21 @@ public class PinotHelixResourceManager {
tableNameWithType, segmentLineageEntryId);
}
+ private void updateSegmentLineageEntryToReverted(String tableNameWithType, SegmentLineage segmentLineage,
+ String segmentLineageEntryId, LineageEntry lineageEntry) {
+ // Check that all segments from 'segmentsFrom' are in ONLINE state in the external view.
+ Set<String> onlineSegments = getOnlineSegmentsFromExternalView(tableNameWithType);
+ Preconditions.checkArgument(onlineSegments.containsAll(lineageEntry.getSegmentsFrom()), String.format(
+ "Failed to update the lineage to be 'REVERTED'. Not all segments from 'segmentFrom' are in ONLINE state "
+ + "in the external view. (tableName = '%s', segmentsFrom = '%s', onlineSegments = '%s'", tableNameWithType,
+ lineageEntry.getSegmentsFrom(), onlineSegments));
+
+ // Update lineage entry
+ segmentLineage.updateLineageEntry(segmentLineageEntryId,
+ new LineageEntry(lineageEntry.getSegmentsFrom(), lineageEntry.getSegmentsTo(), LineageEntryState.REVERTED,
+ System.currentTimeMillis()));
+ }
+
private void waitForSegmentsBecomeOnline(String tableNameWithType, Set<String> segmentsToCheck)
throws InterruptedException, TimeoutException {
long endTimeMs = System.currentTimeMillis() + EXTERNAL_VIEW_ONLINE_SEGMENTS_MAX_WAIT_MS;
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
index 89e98b0..ae524a6 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
@@ -471,7 +471,7 @@ public class PinotHelixResourceManagerTest {
List<String> segmentsTo = Arrays.asList("s5", "s6");
String lineageEntryId = ControllerTestUtils.getHelixResourceManager()
- .startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, segmentsFrom, segmentsTo);
+ .startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, segmentsFrom, segmentsTo, false);
SegmentLineage segmentLineage = SegmentLineageAccessHelper
.getSegmentLineage(ControllerTestUtils.getPropertyStore(), OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME);
Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 1);
@@ -484,7 +484,7 @@ public class PinotHelixResourceManagerTest {
segmentsTo = Arrays.asList("s3", "s4");
try {
ControllerTestUtils.getHelixResourceManager()
- .startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, segmentsFrom, segmentsTo);
+ .startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, segmentsFrom, segmentsTo, false);
} catch (Exception e) {
// expected
}
@@ -492,34 +492,16 @@ public class PinotHelixResourceManagerTest {
segmentsTo = Arrays.asList("s2");
try {
ControllerTestUtils.getHelixResourceManager()
- .startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, segmentsFrom, segmentsTo);
+ .startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, segmentsFrom, segmentsTo, false);
} catch (Exception e) {
// expected
}
// Check invalid segmentsFrom
segmentsFrom = Arrays.asList("s1", "s6");
- segmentsTo = Arrays.asList("merged1", "merged2");
try {
ControllerTestUtils.getHelixResourceManager()
- .startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, segmentsFrom, segmentsTo);
- } catch (Exception e) {
- // expected
- }
-
- segmentsFrom = Arrays.asList("s1", "s2");
- String lineageEntryId2 = ControllerTestUtils.getHelixResourceManager()
- .startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, segmentsFrom, segmentsTo);
- segmentLineage = SegmentLineageAccessHelper
- .getSegmentLineage(ControllerTestUtils.getPropertyStore(), OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME);
- Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 2);
- Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId2).getSegmentsFrom(), segmentsFrom);
- Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId2).getSegmentsTo(), segmentsTo);
- Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId2).getState(), LineageEntryState.IN_PROGRESS);
-
- try {
- ControllerTestUtils.getHelixResourceManager()
- .startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, segmentsFrom, segmentsTo);
+ .startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, segmentsFrom, segmentsTo, false);
} catch (Exception e) {
// expected
}
@@ -556,27 +538,117 @@ public class PinotHelixResourceManagerTest {
.endReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, lineageEntryId);
segmentLineage = SegmentLineageAccessHelper
.getSegmentLineage(ControllerTestUtils.getPropertyStore(), OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME);
- Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 2);
+ Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 1);
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId).getSegmentsFrom(), new ArrayList<>());
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId).getSegmentsTo(), Arrays.asList("s5", "s6"));
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId).getState(), LineageEntryState.COMPLETED);
+ // Start the new segment replacement
+ segmentsFrom = Arrays.asList("s1", "s2");
+ segmentsTo = Arrays.asList("merged_t1_0", "merged_t1_1");
+ String lineageEntryId2 = ControllerTestUtils.getHelixResourceManager()
+ .startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, segmentsFrom, segmentsTo, false);
+ segmentLineage = SegmentLineageAccessHelper
+ .getSegmentLineage(ControllerTestUtils.getPropertyStore(), OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME);
+ Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 2);
+ Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId2).getSegmentsFrom(), Arrays.asList("s1", "s2"));
+ Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId2).getSegmentsTo(),
+ Arrays.asList("merged_t1_0", "merged_t1_1"));
+ Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId2).getState(), LineageEntryState.IN_PROGRESS);
+
+ // Upload partial data
ControllerTestUtils.getHelixResourceManager().addNewSegment(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME,
- SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, "merged1"),
+ SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, "merged_t1_0"),
+ "downloadUrl");
+
+ IdealState idealState =
+ ControllerTestUtils.getHelixResourceManager().getTableIdealState(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME);
+ Assert.assertTrue(!idealState.getInstanceSet("merged_t1_0").isEmpty());
+
+ // Try to revert the entry with partial data uploaded without forceRevert
+ try {
+ ControllerTestUtils.getHelixResourceManager()
+ .revertReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, lineageEntryId2, false);
+ } catch (Exception e) {
+ // expected
+ }
+
+ // Try to revert the entry with partial data uploaded with forceRevert
+ ControllerTestUtils.getHelixResourceManager()
+ .revertReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, lineageEntryId2, true);
+ segmentLineage = SegmentLineageAccessHelper
+ .getSegmentLineage(ControllerTestUtils.getPropertyStore(), OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME);
+ Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId2).getState(), LineageEntryState.REVERTED);
+
+ // 'merged_t1_0' segment should be proactively cleaned up
+ idealState =
+ ControllerTestUtils.getHelixResourceManager().getTableIdealState(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME);
+ Assert.assertTrue(idealState.getInstanceSet("merged_t1_0").isEmpty());
+
+ // Start new segment replacement since the above entry is reverted
+ segmentsFrom = Arrays.asList("s1", "s2");
+ segmentsTo = Arrays.asList("merged_t2_0", "merged_t2_1");
+ String lineageEntryId3 = ControllerTestUtils.getHelixResourceManager()
+ .startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, segmentsFrom, segmentsTo, false);
+ segmentLineage = SegmentLineageAccessHelper
+ .getSegmentLineage(ControllerTestUtils.getPropertyStore(), OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME);
+ Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 3);
+ Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId3).getSegmentsFrom(), segmentsFrom);
+ Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId3).getSegmentsTo(), segmentsTo);
+ Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId3).getState(), LineageEntryState.IN_PROGRESS);
+
+ // Upload partial data
+ ControllerTestUtils.getHelixResourceManager().addNewSegment(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME,
+ SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, "merged_t2_0"),
+ "downloadUrl");
+
+ // Without force cleanup, 'startReplaceSegments' should fail because of duplicate segments on 'segmentFrom'.
+ segmentsTo = Arrays.asList("merged_t3_0", "merged_t3_1");
+ try {
+ ControllerTestUtils.getHelixResourceManager()
+ .startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, segmentsFrom, segmentsTo, false);
+ } catch (Exception e) {
+ // expected
+ }
+
+ // Test force clean up case
+ String lineageEntryId4 = ControllerTestUtils.getHelixResourceManager()
+ .startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, segmentsFrom, segmentsTo, true);
+ segmentLineage = SegmentLineageAccessHelper
+ .getSegmentLineage(ControllerTestUtils.getPropertyStore(), OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME);
+ Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 4);
+ Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId3).getSegmentsFrom(), Arrays.asList("s1", "s2"));
+ Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId3).getSegmentsTo(),
+ Arrays.asList("merged_t2_0", "merged_t2_1"));
+ Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId3).getState(), LineageEntryState.REVERTED);
+ Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId4).getSegmentsFrom(), Arrays.asList("s1", "s2"));
+ Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId4).getSegmentsTo(),
+ Arrays.asList("merged_t3_0", "merged_t3_1"));
+ Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId4).getState(), LineageEntryState.IN_PROGRESS);
+
+ // 'merged_t2_0' segment should be proactively cleaned up
+ idealState =
+ ControllerTestUtils.getHelixResourceManager().getTableIdealState(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME);
+ Assert.assertTrue(idealState.getInstanceSet("merged_t2_0").isEmpty());
+
+ // Upload segments again
+ ControllerTestUtils.getHelixResourceManager().addNewSegment(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME,
+ SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, "merged_t3_0"),
"downloadUrl");
ControllerTestUtils.getHelixResourceManager().addNewSegment(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME,
- SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, "merged2"),
+ SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, "merged_t3_1"),
"downloadUrl");
+ // Finish the replacement
ControllerTestUtils.getHelixResourceManager()
- .endReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, lineageEntryId2);
+ .endReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, lineageEntryId4);
segmentLineage = SegmentLineageAccessHelper
.getSegmentLineage(ControllerTestUtils.getPropertyStore(), OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME);
- Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 2);
- Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId2).getSegmentsFrom(), Arrays.asList("s1", "s2"));
- Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId2).getSegmentsTo(),
- Arrays.asList("merged1", "merged2"));
- Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId2).getState(), LineageEntryState.COMPLETED);
+ Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 4);
+ Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId4).getSegmentsFrom(), Arrays.asList("s1", "s2"));
+ Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId4).getSegmentsTo(),
+ Arrays.asList("merged_t3_0", "merged_t3_1"));
+ Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId4).getState(), LineageEntryState.COMPLETED);
}
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org