You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by "snleee (via GitHub)" <gi...@apache.org> on 2023/02/10 06:36:07 UTC

[GitHub] [pinot] snleee commented on a diff in pull request #10261: [minion] check segment existency and fail early

snleee commented on code in PR #10261:
URL: https://github.com/apache/pinot/pull/10261#discussion_r1102323263


##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java:
##########
@@ -154,25 +156,33 @@ public void setMinionEventObserver(MinionEventObserver observer) {
   @Override
   public List<SegmentConversionResult> executeTask(PinotTaskConfig pinotTaskConfig)
       throws Exception {
-    preProcess(pinotTaskConfig);
-    _pinotTaskConfig = pinotTaskConfig;
-    _eventObserver = MinionEventObservers.getInstance().getMinionEventObserver(pinotTaskConfig.getTaskId());
-    String taskType = pinotTaskConfig.getTaskType();
+    // check whether all segments to process exist in the table, if not, terminate early to avoid wasting computing
+    // resources
     Map<String, String> configs = pinotTaskConfig.getConfigs();
     String tableNameWithType = configs.get(MinionConstants.TABLE_NAME_KEY);
     String inputSegmentNames = configs.get(MinionConstants.SEGMENT_NAME_KEY);
-    String downloadURLString = configs.get(MinionConstants.DOWNLOAD_URL_KEY);
-    String[] downloadURLs = downloadURLString.split(MinionConstants.URL_SEPARATOR);
     String uploadURL = configs.get(MinionConstants.UPLOAD_URL_KEY);
     AuthProvider authProvider = AuthProviderUtils.makeAuthProvider(configs.get(MinionConstants.AUTH_TOKEN));
+    Set<String> nonExistentSegmentNames = SegmentConversionUtils.extractNonExistentSegments(tableNameWithType,
+        FileUploadDownloadClient.extractBaseURI(new URI(uploadURL)),
+        Arrays.asList(inputSegmentNames.split(MinionConstants.SEGMENT_NAME_SEPARATOR)), authProvider);
+    if (!CollectionUtils.isEmpty(nonExistentSegmentNames)) {
+      throw new RuntimeException(String.format("Segments to process: %s do not exist in table: %s",

Review Comment:
   Let's change the order of list of segments and table. Depending on the case, the list of segments can be pretty long. In this case, it would be better to catch the table name in the first line and have the long list of segments. Otherwise, it will be a bit harder to figure out the table name.



##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java:
##########
@@ -154,25 +156,33 @@ public void setMinionEventObserver(MinionEventObserver observer) {
   @Override
   public List<SegmentConversionResult> executeTask(PinotTaskConfig pinotTaskConfig)
       throws Exception {
-    preProcess(pinotTaskConfig);
-    _pinotTaskConfig = pinotTaskConfig;
-    _eventObserver = MinionEventObservers.getInstance().getMinionEventObserver(pinotTaskConfig.getTaskId());
-    String taskType = pinotTaskConfig.getTaskType();
+    // check whether all segments to process exist in the table, if not, terminate early to avoid wasting computing

Review Comment:
   This is not relevant to this PR; but, can we also add the table level timeout for `endSegmentReplaceSegment` if possible?
   
   Currently, we have the minion instance level config `_minionConf.getEndReplaceSegmentsTimeoutMs()`. It would be very useful to us if we add the same config for task config in the table config. We can add the config in the `context` object.
   
   ```
     protected void postUploadSegments(SegmentUploadContext context)
         throws Exception {
       // Update the segment lineage to indicate that the segment replacement is done.
       _eventObserver.notifyProgress(_pinotTaskConfig,
           "Finishing uploading segments: " + context.getSegmentConversionResults().size());
       if (context.isReplaceSegmentsEnabled()) {
         String lineageEntryId = (String) context.getCustomContext(CUSTOM_SEGMENT_UPLOAD_CONTEXT_LINEAGE_ENTRY_ID);
         SegmentConversionUtils.endSegmentReplace(context.getTableNameWithType(), context.getUploadURL(), lineageEntryId,
             _minionConf.getEndReplaceSegmentsTimeoutMs(), context.getAuthProvider());
       }
     }
   ```



##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/SegmentConversionUtils.java:
##########
@@ -61,6 +65,39 @@ public class SegmentConversionUtils {
   private SegmentConversionUtils() {
   }
 
+  /**
+   * Extract non-existent segments from the given list of segments
+   * @param tableNameWithType a table name with type
+   * @param controllerBaseURI the controller base URI
+   * @param segmentNames a list of segments to check
+   * @param authProvider a {@link AuthProvider}
+   * @return a set of non-existent segment names
+   * @throws Exception when there are exceptions checking whether the given list of segments all exist or not
+   */
+  public static Set<String> extractNonExistentSegments(String tableNameWithType, URI controllerBaseURI,

Review Comment:
   Instead of this, I think that it's probably better to have the function that provides the list of tables from the table.
   
   We can simply compute `nonExistentSegments` by the following:
   
   ```
   inputSegmentsListFromTaskConfig.removeAll(SegmentConversionUtils.getSegmentsForTable(xxx))
   ```
   
   I feel that the getSegmentsForTable will likely be more frequently used.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org