You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2022/10/13 00:47:32 UTC
[pinot] branch master updated: Fix the bug where uploaded segments cannot be deleted on real-time table (#9579)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 97bff5916d Fix the bug where uploaded segments cannot be deleted on real-time table (#9579)
97bff5916d is described below
commit 97bff5916d1182347225bf98b2fb17a403fdc8b8
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Wed Oct 12 17:47:26 2022 -0700
Fix the bug where uploaded segments cannot be deleted on real-time table (#9579)
---
.../api/resources/PinotSegmentRestletResource.java | 32 +++++++++----------
.../controller/helix/ControllerRequestClient.java | 36 ++++++++++++++++++++++
.../pinot/controller/helix/ControllerTest.java | 16 ++++++++++
.../UpsertTableSegmentUploadIntegrationTest.java | 20 +++++++++++-
4 files changed, 87 insertions(+), 17 deletions(-)
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
index 5e94c29ac5..3badc1f988 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
@@ -466,9 +466,7 @@ public class PinotSegmentRestletResource {
@ApiParam(value = "Whether to force server to download segment") @QueryParam("forceDownload")
@DefaultValue("false") boolean forceDownload) {
segmentName = URIUtils.decode(segmentName);
- TableType tableType = SegmentName.isRealtimeSegmentName(segmentName) ? TableType.REALTIME : TableType.OFFLINE;
- String tableNameWithType =
- ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, tableType, LOGGER).get(0);
+ String tableNameWithType = getExistingTable(tableName, segmentName);
Pair<Integer, String> msgInfo =
_pinotHelixResourceManager.reloadSegment(tableNameWithType, segmentName, forceDownload);
boolean zkJobMetaWriteSuccess = false;
@@ -494,6 +492,19 @@ public class PinotSegmentRestletResource {
}
}
+ /**
+ * Helper method to find the existing table based on the given table name (with or without type suffix) and segment
+ * name.
+ */
+ private String getExistingTable(String tableName, String segmentName) {
+ TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableName);
+ if (tableType == null) {
+ // Derive table type from segment name if the given table name doesn't have type suffix
+ tableType = SegmentName.isRealtimeSegmentName(segmentName) ? TableType.REALTIME : TableType.OFFLINE;
+ }
+ return ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, tableType, LOGGER).get(0);
+ }
+
/**
* Resets the segment of the table, by disabling and then enabling it.
* This API will take segments to OFFLINE state, wait for External View to stabilize, and then back to
@@ -796,9 +807,7 @@ public class PinotSegmentRestletResource {
+ "Using 0d or -1d will instantly delete segments without retention")
@QueryParam("retention") String retentionPeriod) {
segmentName = URIUtils.decode(segmentName);
- TableType tableType = SegmentName.isRealtimeSegmentName(segmentName) ? TableType.REALTIME : TableType.OFFLINE;
- String tableNameWithType =
- ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, tableType, LOGGER).get(0);
+ String tableNameWithType = getExistingTable(tableName, segmentName);
deleteSegmentsInternal(tableNameWithType, Collections.singletonList(segmentName), retentionPeriod);
return new SuccessResponse("Segment deleted");
}
@@ -843,16 +852,7 @@ public class PinotSegmentRestletResource {
if (numSegments == 0) {
throw new ControllerApplicationException(LOGGER, "Segments must be provided", Status.BAD_REQUEST);
}
- boolean isRealtimeSegment = SegmentName.isRealtimeSegmentName(segments.get(0));
- for (int i = 1; i < numSegments; i++) {
- if (SegmentName.isRealtimeSegmentName(segments.get(i)) != isRealtimeSegment) {
- throw new ControllerApplicationException(LOGGER, "All segments must be of the same type (OFFLINE|REALTIME)",
- Status.BAD_REQUEST);
- }
- }
- TableType tableType = isRealtimeSegment ? TableType.REALTIME : TableType.OFFLINE;
- String tableNameWithType =
- ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, tableType, LOGGER).get(0);
+ String tableNameWithType = getExistingTable(tableName, segments.get(0));
deleteSegmentsInternal(tableNameWithType, segments, retentionPeriod);
if (numSegments <= 5) {
return new SuccessResponse("Deleted segments: " + segments + " from table: " + tableNameWithType);
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java
index fb4978d1dd..2da40ef3e5 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java
@@ -18,9 +18,14 @@
*/
package org.apache.pinot.controller.helix;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
import java.io.IOException;
import java.net.URISyntaxException;
import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
+import javax.annotation.Nullable;
import org.apache.pinot.common.exception.HttpErrorStatusException;
import org.apache.pinot.common.utils.SimpleHttpResponse;
import org.apache.pinot.common.utils.http.HttpClient;
@@ -180,6 +185,37 @@ public class ControllerRequestClient {
}
}
+ public List<String> listSegments(String tableName, @Nullable String tableType, boolean excludeReplacedSegments)
+ throws IOException {
+ String url = _controllerRequestURLBuilder.forSegmentListAPI(tableName, tableType, excludeReplacedSegments);
+ try {
+ SimpleHttpResponse resp = HttpClient.wrapAndThrowHttpException(_httpClient.sendGetRequest(new URL(url).toURI()));
+ // Example response: (list of map from table type to segments)
+ // [{"REALTIME":["mytable__0__0__20221012T1952Z","mytable__1__0__20221012T1952Z"]}]
+ JsonNode jsonNode = JsonUtils.stringToJsonNode(resp.getResponse());
+ List<String> segments = new ArrayList<>();
+ for (JsonNode tableNode : jsonNode) {
+ ArrayNode segmentsNode = (ArrayNode) tableNode.elements().next();
+ for (JsonNode segmentNode : segmentsNode) {
+ segments.add(segmentNode.asText());
+ }
+ }
+ return segments;
+ } catch (HttpErrorStatusException | URISyntaxException e) {
+ throw new IOException(e);
+ }
+ }
+
+ public void deleteSegment(String tableName, String segmentName)
+ throws IOException {
+ try {
+ HttpClient.wrapAndThrowHttpException(_httpClient.sendDeleteRequest(
+ new URL(_controllerRequestURLBuilder.forSegmentDelete(tableName, segmentName)).toURI()));
+ } catch (HttpErrorStatusException | URISyntaxException e) {
+ throw new IOException(e);
+ }
+ }
+
public void deleteSegments(String tableName, TableType tableType)
throws IOException {
try {
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
index cadead8f70..68731ed806 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
@@ -29,6 +29,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import javax.annotation.Nullable;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
@@ -655,6 +656,21 @@ public class ControllerTest {
getControllerRequestClient().deleteTable(TableNameBuilder.REALTIME.tableNameWithType(tableName));
}
+ public List<String> listSegments(String tableName)
+ throws IOException {
+ return listSegments(tableName, null, false);
+ }
+
+ public List<String> listSegments(String tableName, @Nullable String tableType, boolean excludeReplacedSegments)
+ throws IOException {
+ return getControllerRequestClient().listSegments(tableName, tableType, excludeReplacedSegments);
+ }
+
+ public void dropSegment(String tableName, String segmentName)
+ throws IOException {
+ getControllerRequestClient().deleteSegment(tableName, segmentName);
+ }
+
public void dropAllSegments(String tableName, TableType tableType)
throws IOException {
getControllerRequestClient().deleteSegments(tableName, tableType);
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentUploadIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentUploadIntegrationTest.java
index e6f5ff248f..534c716b58 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentUploadIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentUploadIntegrationTest.java
@@ -37,6 +37,7 @@ import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
public class UpsertTableSegmentUploadIntegrationTest extends BaseClusterIntegrationTestSet {
@@ -87,7 +88,24 @@ public class UpsertTableSegmentUploadIntegrationTest extends BaseClusterIntegrat
@AfterClass
public void tearDown()
throws IOException {
- dropRealtimeTable(getTableName());
+ String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(getTableName());
+
+ // Test dropping all segments one by one
+ List<String> segments = listSegments(realtimeTableName);
+ assertFalse(segments.isEmpty());
+ for (String segment : segments) {
+ dropSegment(realtimeTableName, segment);
+ }
+ // NOTE: There is a delay to remove the segment from property store
+ TestUtils.waitForCondition((aVoid) -> {
+ try {
+ return listSegments(realtimeTableName).isEmpty();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }, 60_000L, "Failed to drop the segments");
+
+ dropRealtimeTable(realtimeTableName);
stopServer();
stopBroker();
stopController();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org