You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2022/12/05 18:09:00 UTC

[GitHub] [pinot] snleee commented on a diff in pull request #9825: Allow segment upload via Metadata in MergeRollup Minion task

snleee commented on code in PR #9825:
URL: https://github.com/apache/pinot/pull/9825#discussion_r1039916803


##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java:
##########
@@ -276,6 +300,84 @@ public List<SegmentConversionResult> executeTask(PinotTaskConfig pinotTaskConfig
     }
   }
 
+  private void pushSegment(String tableName, Map<String, String> taskConfigs, URI outputSegmentTarURI,
+      List<Header> headers, List<NameValuePair> parameters) throws Exception {
+    String pushMode = taskConfigs.get(BatchConfigProperties.PUSH_MODE);
+    LOGGER.info("Trying to push Pinot segment with push mode {} from {}", pushMode, outputSegmentTarURI);
+
+    PushJobSpec pushJobSpec = new PushJobSpec();
+    pushJobSpec.setPushAttempts(DEFUALT_PUSH_ATTEMPTS);
+    pushJobSpec.setPushParallelism(DEFAULT_PUSH_PARALLELISM);
+    pushJobSpec.setPushRetryIntervalMillis(DEFAULT_PUSH_RETRY_INTERVAL_MILLIS);
+    pushJobSpec.setSegmentUriPrefix(taskConfigs.get(BatchConfigProperties.PUSH_SEGMENT_URI_PREFIX));
+    pushJobSpec.setSegmentUriSuffix(taskConfigs.get(BatchConfigProperties.PUSH_SEGMENT_URI_SUFFIX));
+
+    SegmentGenerationJobSpec spec = generatePushJobSpec(tableName, taskConfigs, pushJobSpec);
+
+    switch (BatchConfigProperties.SegmentPushType.valueOf(pushMode.toUpperCase())) {
+      case TAR:
+        try (PinotFS pinotFS = MinionTaskUtils.getLocalPinotFs()) {
+          SegmentPushUtils.pushSegments(

Review Comment:
   This will break backward compatibilty. We should use `SegmentConversionUtils.uploadSegment()` when `TAR push` is being used.
   
   `SegmentConversionUtils.uploadSegment()` takes some configuration from task config and we do use those.
   
   ```
       String maxNumAttemptsConfigStr = configs.get(MinionConstants.MAX_NUM_ATTEMPTS_KEY);
       int maxNumAttemptsFromConfig =
           maxNumAttemptsConfigStr != null ? Integer.parseInt(maxNumAttemptsConfigStr) : DEFAULT_MAX_NUM_ATTEMPTS;
       int maxNumAttempts = Math.max(maxNumAttemptsFromConfig, uriProvider.numAddresses());
       LOGGER.info("Retry uploading for {} times. Max num attempts from pinot minion config: {}, number of IP addresses "
           + "for upload URI: {}", maxNumAttempts, maxNumAttemptsFromConfig, uriProvider.numAddresses());
       String initialRetryDelayMsConfig = configs.get(MinionConstants.INITIAL_RETRY_DELAY_MS_KEY);
       long initialRetryDelayMs =
           initialRetryDelayMsConfig != null ? Long.parseLong(initialRetryDelayMsConfig) : DEFAULT_INITIAL_RETRY_DELAY_MS;
       String retryScaleFactorConfig = configs.get(MinionConstants.RETRY_SCALE_FACTOR_KEY);
       double retryScaleFactor =
           retryScaleFactorConfig != null ? Double.parseDouble(retryScaleFactorConfig) : DEFAULT_RETRY_SCALE_FACTOR;
       RetryPolicy retryPolicy =
           RetryPolicies.exponentialBackoffRetryPolicy(maxNumAttempts, initialRetryDelayMs, retryScaleFactor);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org