You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2022/07/02 00:50:42 UTC

[GitHub] [pinot] mcvsubbu commented on a diff in pull request #8986: Support pause/resume consumption of realtime tables

mcvsubbu commented on code in PR #8986:
URL: https://github.com/apache/pinot/pull/8986#discussion_r912297405


##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java:
##########
@@ -49,27 +50,71 @@
     HttpHeaders.AUTHORIZATION, in = ApiKeyAuthDefinition.ApiKeyLocation.HEADER, key = SWAGGER_AUTHORIZATION_KEY)))
 @Path("/")
 public class PinotRealtimeTableResource {
+  private static final Logger LOGGER = LoggerFactory.getLogger(PinotRealtimeTableResource.class);
+
   @Inject
   PinotHelixResourceManager _pinotHelixResourceManager;
 
+  @Inject
+  PinotLLCRealtimeSegmentManager _pinotLLCRealtimeSegmentManager;
+
+  @POST
+  @Path("/tables/{tableName}/pauseConsumption")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Pause consumption of a realtime table",
+      notes = "Pause the consumption of a realtime table")
+  public Response pauseConsumption(
+      @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName) {
+    String tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(tableName);
+    validate(tableNameWithType);
+    try {
+      return Response.ok(_pinotLLCRealtimeSegmentManager.pauseConsumption(tableNameWithType)).build();
+    } catch (Exception e) {
+      throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR, e);
+    }
+  }
+
   @POST
   @Path("/tables/{tableName}/resumeConsumption")
   @Produces(MediaType.APPLICATION_JSON)
-  @Consumes(MediaType.APPLICATION_JSON)
-  @ApiOperation(value = "Resume the consumption of a realtime table",
-      notes = "Resume the consumption of a realtime table")
-  public String resumeConsumption(
-      @ApiParam(value = "Name of the table", required = true)
-      @PathParam("tableName") String tableName) throws JsonProcessingException {
-    // TODO: Add util method for invoking periodic tasks
+  @ApiOperation(value = "Resume consumption of a realtime table",
+      notes = "Resume the consumption for a realtime table")
+  public Response resumeConsumption(
+      @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName) {
     String tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(tableName);
-    Map<String, String> taskProperties = new HashMap<>();
-    taskProperties.put(RealtimeSegmentValidationManager.RECREATE_DELETED_CONSUMING_SEGMENT_KEY, "true");
+    validate(tableNameWithType);
+    try {
+      return Response.ok(_pinotLLCRealtimeSegmentManager.resumeConsumption(tableNameWithType)).build();
+    } catch (Exception e) {
+      throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR, e);
+    }
+  }
 
-    Pair<String, Integer> taskExecutionDetails = _pinotHelixResourceManager
-        .invokeControllerPeriodicTask(tableNameWithType, Constants.REALTIME_SEGMENT_VALIDATION_MANAGER, taskProperties);
+  @GET
+  @Path("/tables/{tableName}/pauseStatus")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Return pause status of a realtime table",
+      notes = "Return pause status of a realtime table along with list of consuming segments.")
+  public Response getConsumptionStatus(
+      @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName) {
+    String tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(tableName);
+    validate(tableNameWithType);
+    try {
+      return Response.ok().entity(_pinotLLCRealtimeSegmentManager.getPauseStatus(tableNameWithType)).build();
+    } catch (Exception e) {
+      throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR, e);
+    }
+  }
 
-    return "{\"Log Request Id\": \"" + taskExecutionDetails.getLeft()
-        + "\",\"Controllers notified\":" + (taskExecutionDetails.getRight() > 0) + "}";
+  private void validate(String tableNameWithType) {
+    IdealState idealState = _pinotHelixResourceManager.getTableIdealState(tableNameWithType);
+    if (idealState == null) {
+      throw new ControllerApplicationException(LOGGER, "Ideal State is null for table " + tableNameWithType,

Review Comment:
   Maybe say "table not found"



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -916,11 +930,15 @@ void updateIdealStateOnSegmentCompletion(String realtimeTableName, String commit
             "Exceeded max segment completion time for segment " + committingSegmentName);
       }
       updateInstanceStatesForNewConsumingSegment(idealState.getRecord().getMapFields(), committingSegmentName,
-          newSegmentName, segmentAssignment, instancePartitionsMap);
+          isTablePaused(idealState) ? null : newSegmentName, segmentAssignment, instancePartitionsMap);

Review Comment:
   This method is called with `newSegmentName` set to null `null` if the table is being paused right? Why check idealstate again?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -3613,7 +3613,27 @@ public Pair<String, Integer> invokeControllerPeriodicTask(String tableName, Stri
 
     LOGGER.info("[TaskRequestId: {}] Periodic task execution message sent to {} controllers.", periodicTaskRequestId,
         messageCount);
-    return Pair.of(periodicTaskRequestId, messageCount);
+    return new PeriodicTaskInvocationResponse(periodicTaskRequestId, messageCount > 0);
+  }
+
+  public static class PeriodicTaskInvocationResponse {

Review Comment:
   Same with this class. It is returned externally, move it so some api (or perhaps spi) package. Not sure what the convention is that we are following



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -1399,4 +1417,120 @@ public void uploadToDeepStoreIfMissing(TableConfig tableConfig, List<SegmentZKMe
       }
     }
   }
+
+  /**
+   * Pause consumption on a table by
+   *   1) setting "isTablePaused" in ideal states to true and
+   *   2) sending force commit messages to servers
+   */
+  public PauseStatus pauseConsumption(String tableNameWithType) {
+    IdealState updatedIdealState = updatePauseStatusInIdealState(tableNameWithType, true);
+    Set<String> consumingSegments = findConsumingSegments(updatedIdealState);
+    sendForceCommitMessageToServers(tableNameWithType, consumingSegments);
+    return new PauseStatus(true, consumingSegments, consumingSegments.isEmpty() ? null : "Pause flag is set."

Review Comment:
   I would prefer returning a different class here than `PauseStatus`, since `PauseStatus` is an externally visible class, and we need to keep compatibility. For internal methods, we should be able to change things as needed



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -1399,4 +1417,120 @@ public void uploadToDeepStoreIfMissing(TableConfig tableConfig, List<SegmentZKMe
       }
     }
   }
+
+  /**
+   * Pause consumption on a table by
+   *   1) setting "isTablePaused" in ideal states to true and
+   *   2) sending force commit messages to servers
+   */
+  public PauseStatus pauseConsumption(String tableNameWithType) {
+    IdealState updatedIdealState = updatePauseStatusInIdealState(tableNameWithType, true);
+    Set<String> consumingSegments = findConsumingSegments(updatedIdealState);
+    sendForceCommitMessageToServers(tableNameWithType, consumingSegments);
+    return new PauseStatus(true, consumingSegments, consumingSegments.isEmpty() ? null : "Pause flag is set."
+        + " Consuming segments are being committed."
+        + " Use /pauseStatus endpoint in a few moments to check if all consuming segments have been committed.");
+  }
+
+  /**
+   * Resume consumption on a table by
+   *   1) setting "isTablePaused" in ideal states to false and
+   *   2) triggering segment validation job to create new consuming segments in ideal states
+   */
+  public PauseStatus resumeConsumption(String tableNameWithType) {
+    IdealState updatedIdealState = updatePauseStatusInIdealState(tableNameWithType, false);
+
+    // trigger realtime segment validation job to resume consumption
+    Map<String, String> taskProperties = new HashMap<>();
+    taskProperties.put(RealtimeSegmentValidationManager.RECREATE_DELETED_CONSUMING_SEGMENT_KEY, "true");

Review Comment:
   create an issue to see if this is still needed. No that we have pause/resume, I see no need for a user to delete a consuming segment. We can disable such deletion in the api.



##########
pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentMessageHandlerFactory.java:
##########
@@ -169,6 +173,34 @@ public HelixTaskResult handleMessage()
     }
   }
 
+  private class ForceCommitMessageHandler extends DefaultMessageHandler {
+
+    private String _tableName;
+    private Set<String> _segmentNames;
+
+    public ForceCommitMessageHandler(ForceCommitMessage forceCommitMessage, ServerMetrics metrics,
+        NotificationContext ctx) {
+      super(forceCommitMessage, metrics, ctx);
+      _tableName = forceCommitMessage.getTableName();
+      _segmentNames = forceCommitMessage.getSegmentNames();
+    }
+
+    @Override
+    public HelixTaskResult handleMessage()
+        throws InterruptedException {
+      HelixTaskResult helixTaskResult = new HelixTaskResult();
+      _logger.info("Handling force commit message for table {} segments {}", _tableName, _segmentNames);
+      try {
+        _instanceDataManager.forceCommit(_tableName, _segmentNames);
+        helixTaskResult.setSuccess(true);
+      } catch (Exception e) {
+        _metrics.addMeteredTableValue(_tableNameWithType, ServerMeter.DELETE_TABLE_FAILURES, 1);
+        Utils.rethrowException(e);

Review Comment:
   Not sure what helix does with exceptions thrown. Hopefully it does not retry. 
   Another way is to just log the exception and move on.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -552,12 +564,12 @@ private void commitSegmentMetadataInternal(String realtimeTableName,
             .collect(Collectors.toSet());
     int numPartitionGroups = newPartitionGroupMetadataList.size();
 
-    // Only if committingSegment's partitionGroup is present in the newPartitionGroupMetadataList, we create new
-    // segment metadata
     String newConsumingSegmentName = null;
-    String rawTableName = TableNameBuilder.extractRawTableName(realtimeTableName);
-    long newSegmentCreationTimeMs = getCurrentTimeMs();
-    if (newPartitionGroupSet.contains(committingSegmentPartitionGroupId)) {
+    if (!isTablePaused(idealState) && newPartitionGroupSet.contains(committingSegmentPartitionGroupId)) {
+      // Only if committingSegment's partitionGroup is present in the newPartitionGroupMetadataList, we create new
+      // segment metadata
+      String rawTableName = TableNameBuilder.extractRawTableName(realtimeTableName);

Review Comment:
   nice refactor moving these in



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java:
##########
@@ -169,6 +170,39 @@ public void deleteSegments(String tableName, TableType tableType)
     }
   }
 
+  public PinotLLCRealtimeSegmentManager.PauseStatus pauseConsumption(String tableName)
+      throws IOException {
+    try {
+      SimpleHttpResponse response = HttpClient.wrapAndThrowHttpException(_httpClient.sendJsonPostRequest(new URL(
+          _controllerRequestURLBuilder.forPauseConsumption(tableName)).toURI(), null));
+      return JsonUtils.stringToObject(response.getResponse(), PinotLLCRealtimeSegmentManager.PauseStatus.class);

Review Comment:
   We don't want `ControllerRequestClient` to depend on `PinotLLCRealtimeSegmentManager` right? Should we move `PauseStatus` to the controller/api package? We may rename or move the PinotLLCRealtmeSegmentManager at some point, and we don't want to rebuild clients just because. Perhaps there are other such uses in this file, maybe we should pay attention and fix those as well (on a separate PR/issue, of course) but we can stop introducing new ones



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org