You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2022/10/13 00:47:32 UTC

[pinot] branch master updated: Fix the bug where uploaded segments cannot be deleted on real-time table (#9579)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 97bff5916d Fix the bug where uploaded segments cannot be deleted on real-time table (#9579)
97bff5916d is described below

commit 97bff5916d1182347225bf98b2fb17a403fdc8b8
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Wed Oct 12 17:47:26 2022 -0700

    Fix the bug where uploaded segments cannot be deleted on real-time table (#9579)
---
 .../api/resources/PinotSegmentRestletResource.java | 32 +++++++++----------
 .../controller/helix/ControllerRequestClient.java  | 36 ++++++++++++++++++++++
 .../pinot/controller/helix/ControllerTest.java     | 16 ++++++++++
 .../UpsertTableSegmentUploadIntegrationTest.java   | 20 +++++++++++-
 4 files changed, 87 insertions(+), 17 deletions(-)

diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
index 5e94c29ac5..3badc1f988 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
@@ -466,9 +466,7 @@ public class PinotSegmentRestletResource {
       @ApiParam(value = "Whether to force server to download segment") @QueryParam("forceDownload")
       @DefaultValue("false") boolean forceDownload) {
     segmentName = URIUtils.decode(segmentName);
-    TableType tableType = SegmentName.isRealtimeSegmentName(segmentName) ? TableType.REALTIME : TableType.OFFLINE;
-    String tableNameWithType =
-        ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, tableType, LOGGER).get(0);
+    String tableNameWithType = getExistingTable(tableName, segmentName);
     Pair<Integer, String> msgInfo =
         _pinotHelixResourceManager.reloadSegment(tableNameWithType, segmentName, forceDownload);
     boolean zkJobMetaWriteSuccess = false;
@@ -494,6 +492,19 @@ public class PinotSegmentRestletResource {
     }
   }
 
+  /**
+   * Helper method to find the existing table based on the given table name (with or without type suffix) and segment
+   * name.
+   */
+  private String getExistingTable(String tableName, String segmentName) {
+    TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableName);
+    if (tableType == null) {
+      // Derive table type from segment name if the given table name doesn't have type suffix
+      tableType = SegmentName.isRealtimeSegmentName(segmentName) ? TableType.REALTIME : TableType.OFFLINE;
+    }
+    return ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, tableType, LOGGER).get(0);
+  }
+
   /**
    * Resets the segment of the table, by disabling and then enabling it.
    * This API will take segments to OFFLINE state, wait for External View to stabilize, and then back to
@@ -796,9 +807,7 @@ public class PinotSegmentRestletResource {
           + "Using 0d or -1d will instantly delete segments without retention")
       @QueryParam("retention") String retentionPeriod) {
     segmentName = URIUtils.decode(segmentName);
-    TableType tableType = SegmentName.isRealtimeSegmentName(segmentName) ? TableType.REALTIME : TableType.OFFLINE;
-    String tableNameWithType =
-        ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, tableType, LOGGER).get(0);
+    String tableNameWithType = getExistingTable(tableName, segmentName);
     deleteSegmentsInternal(tableNameWithType, Collections.singletonList(segmentName), retentionPeriod);
     return new SuccessResponse("Segment deleted");
   }
@@ -843,16 +852,7 @@ public class PinotSegmentRestletResource {
     if (numSegments == 0) {
       throw new ControllerApplicationException(LOGGER, "Segments must be provided", Status.BAD_REQUEST);
     }
-    boolean isRealtimeSegment = SegmentName.isRealtimeSegmentName(segments.get(0));
-    for (int i = 1; i < numSegments; i++) {
-      if (SegmentName.isRealtimeSegmentName(segments.get(i)) != isRealtimeSegment) {
-        throw new ControllerApplicationException(LOGGER, "All segments must be of the same type (OFFLINE|REALTIME)",
-            Status.BAD_REQUEST);
-      }
-    }
-    TableType tableType = isRealtimeSegment ? TableType.REALTIME : TableType.OFFLINE;
-    String tableNameWithType =
-        ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, tableType, LOGGER).get(0);
+    String tableNameWithType = getExistingTable(tableName, segments.get(0));
     deleteSegmentsInternal(tableNameWithType, segments, retentionPeriod);
     if (numSegments <= 5) {
       return new SuccessResponse("Deleted segments: " + segments + " from table: " + tableNameWithType);
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java
index fb4978d1dd..2da40ef3e5 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java
@@ -18,9 +18,14 @@
  */
 package org.apache.pinot.controller.helix;
 
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
 import java.io.IOException;
 import java.net.URISyntaxException;
 import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
+import javax.annotation.Nullable;
 import org.apache.pinot.common.exception.HttpErrorStatusException;
 import org.apache.pinot.common.utils.SimpleHttpResponse;
 import org.apache.pinot.common.utils.http.HttpClient;
@@ -180,6 +185,37 @@ public class ControllerRequestClient {
     }
   }
 
+  public List<String> listSegments(String tableName, @Nullable String tableType, boolean excludeReplacedSegments)
+      throws IOException {
+    String url = _controllerRequestURLBuilder.forSegmentListAPI(tableName, tableType, excludeReplacedSegments);
+    try {
+      SimpleHttpResponse resp = HttpClient.wrapAndThrowHttpException(_httpClient.sendGetRequest(new URL(url).toURI()));
+      // Example response: (list of map from table type to segments)
+      // [{"REALTIME":["mytable__0__0__20221012T1952Z","mytable__1__0__20221012T1952Z"]}]
+      JsonNode jsonNode = JsonUtils.stringToJsonNode(resp.getResponse());
+      List<String> segments = new ArrayList<>();
+      for (JsonNode tableNode : jsonNode) {
+        ArrayNode segmentsNode = (ArrayNode) tableNode.elements().next();
+        for (JsonNode segmentNode : segmentsNode) {
+          segments.add(segmentNode.asText());
+        }
+      }
+      return segments;
+    } catch (HttpErrorStatusException | URISyntaxException e) {
+      throw new IOException(e);
+    }
+  }
+
+  public void deleteSegment(String tableName, String segmentName)
+      throws IOException {
+    try {
+      HttpClient.wrapAndThrowHttpException(_httpClient.sendDeleteRequest(
+          new URL(_controllerRequestURLBuilder.forSegmentDelete(tableName, segmentName)).toURI()));
+    } catch (HttpErrorStatusException | URISyntaxException e) {
+      throw new IOException(e);
+    }
+  }
+
   public void deleteSegments(String tableName, TableType tableType)
       throws IOException {
     try {
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
index cadead8f70..68731ed806 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
@@ -29,6 +29,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import javax.annotation.Nullable;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
@@ -655,6 +656,21 @@ public class ControllerTest {
     getControllerRequestClient().deleteTable(TableNameBuilder.REALTIME.tableNameWithType(tableName));
   }
 
+  public List<String> listSegments(String tableName)
+    throws IOException {
+    return listSegments(tableName, null, false);
+  }
+
+  public List<String> listSegments(String tableName, @Nullable String tableType, boolean excludeReplacedSegments)
+      throws IOException {
+    return getControllerRequestClient().listSegments(tableName, tableType, excludeReplacedSegments);
+  }
+
+  public void dropSegment(String tableName, String segmentName)
+      throws IOException {
+    getControllerRequestClient().deleteSegment(tableName, segmentName);
+  }
+
   public void dropAllSegments(String tableName, TableType tableType)
       throws IOException {
     getControllerRequestClient().deleteSegments(tableName, tableType);
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentUploadIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentUploadIntegrationTest.java
index e6f5ff248f..534c716b58 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentUploadIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentUploadIntegrationTest.java
@@ -37,6 +37,7 @@ import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
 
 
 public class UpsertTableSegmentUploadIntegrationTest extends BaseClusterIntegrationTestSet {
@@ -87,7 +88,24 @@ public class UpsertTableSegmentUploadIntegrationTest extends BaseClusterIntegrat
   @AfterClass
   public void tearDown()
       throws IOException {
-    dropRealtimeTable(getTableName());
+    String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(getTableName());
+
+    // Test dropping all segments one by one
+    List<String> segments = listSegments(realtimeTableName);
+    assertFalse(segments.isEmpty());
+    for (String segment : segments) {
+      dropSegment(realtimeTableName, segment);
+    }
+    // NOTE: There is a delay to remove the segment from property store
+    TestUtils.waitForCondition((aVoid) -> {
+      try {
+        return listSegments(realtimeTableName).isEmpty();
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }, 60_000L, "Failed to drop the segments");
+
+    dropRealtimeTable(realtimeTableName);
     stopServer();
     stopBroker();
     stopController();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org