You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2022/11/17 16:48:05 UTC

[GitHub] [pinot] KKcorps opened a new pull request, #9825: Allow segment upload via Metadata in MergeRollup Minion tas

KKcorps opened a new pull request, #9825:
URL: https://github.com/apache/pinot/pull/9825

   Instructions:
   1. The PR has to be tagged with at least one of the following labels (*):
      1. `feature`
      2. `bugfix`
      3. `performance`
      4. `ui`
      5. `backward-incompat`
      6. `release-notes` (**)
   2. Remove these instructions before publishing the PR.
    
   (*) Other labels to consider:
   - `testing`
   - `dependencies`
   - `docker`
   - `kubernetes`
   - `observability`
   - `security`
   - `code-style`
   - `extension-point`
   - `refactor`
   - `cleanup`
   
   (**) Use `release-notes` label for scenarios like:
   - New configuration options
   - Deprecation of configurations
   - Signature changes to public methods/interfaces
   - New plugins added or old plugins removed
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] KKcorps commented on a diff in pull request #9825: Allow segment upload via Metadata in MergeRollup Minion tas

Posted by GitBox <gi...@apache.org>.
KKcorps commented on code in PR #9825:
URL: https://github.com/apache/pinot/pull/9825#discussion_r1025588073


##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java:
##########
@@ -571,6 +577,40 @@ private List<PinotTaskConfig> createPinotTaskConfigs(List<SegmentZKMetadata> sel
     return pinotTaskConfigs;
   }
 
+  private Map<String, String> getPushTaskConfig(Map<String, String> batchConfigMap, String downloadUrls) {
+    try {
+      String[] downloadURLList = downloadUrls.split(MinionConstants.SEGMENT_NAME_SEPARATOR);
+      if (downloadURLList.length > 0) {
+        String downloadUrl = downloadURLList[0];
+        URI downloadURI = URI.create(downloadUrl);
+        URI outputDirURI = null;
+        if (!downloadURI.getScheme().contentEquals("http")) {

Review Comment:
   Because the segment needs to be copied to some place that is accessible by the controller in case of metadata push.
   If user doesn't set the outputDirUri then the best place would be to copy it in deep store. The download url refers to deep store path and hence using it to derive the metadata.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] KKcorps commented on a diff in pull request #9825: Allow segment upload via Metadata in MergeRollup Minion task

Posted by GitBox <gi...@apache.org>.
KKcorps commented on code in PR #9825:
URL: https://github.com/apache/pinot/pull/9825#discussion_r1036047120


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java:
##########
@@ -97,6 +97,49 @@ public static URI generateSegmentTarURI(URI dirURI, URI fileURI, String prefix,
   public static void pushSegments(SegmentGenerationJobSpec spec, PinotFS fileSystem, List<String> tarFilePaths)
       throws RetriableOperationException, AttemptsExceededException {
     String tableName = spec.getTableSpec().getTableName();
+    AuthProvider authProvider = AuthProviderUtils.makeAuthProvider(spec.getAuthToken());

Review Comment:
   We need to pass this custom header in the new flow
   Older methods don't allow passing new headers
   ```java
   // Set segment ZK metadata custom map modifier into HTTP header to modify the segment ZK metadata
   SegmentZKMetadataCustomMapModifier segmentZKMetadataCustomMapModifier =
       getSegmentZKMetadataCustomMapModifier(pinotTaskConfig, segmentConversionResult);
   Header segmentZKMetadataCustomMapModifierHeader =
       new BasicHeader(FileUploadDownloadClient.CustomHeaders.SEGMENT_ZK_METADATA_CUSTOM_MAP_MODIFIER,
           segmentZKMetadataCustomMapModifier.toJsonString());
   ```                



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] KKcorps commented on a diff in pull request #9825: Allow segment upload via Metadata in MergeRollup Minion task

Posted by GitBox <gi...@apache.org>.
KKcorps commented on code in PR #9825:
URL: https://github.com/apache/pinot/pull/9825#discussion_r1036053646


##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java:
##########
@@ -276,6 +299,98 @@ public List<SegmentConversionResult> executeTask(PinotTaskConfig pinotTaskConfig
     }
   }
 
+  private void pushSegment(String tableName, Map<String, String> taskConfigs, URI outputSegmentTarURI,
+      List<Header> headers, List<NameValuePair> parameters) throws Exception {
+    String pushMode = taskConfigs.get(BatchConfigProperties.PUSH_MODE);
+    LOGGER.info("Trying to push Pinot segment with push mode {} from {}", pushMode, outputSegmentTarURI);
+
+    PushJobSpec pushJobSpec = new PushJobSpec();
+    pushJobSpec.setPushAttempts(DEFUALT_PUSH_ATTEMPTS);
+    pushJobSpec.setPushParallelism(DEFAULT_PUSH_PARALLELISM);
+    pushJobSpec.setPushRetryIntervalMillis(DEFAULT_PUSH_RETRY_INTERVAL_MILLIS);
+    pushJobSpec.setSegmentUriPrefix(taskConfigs.get(BatchConfigProperties.PUSH_SEGMENT_URI_PREFIX));
+    pushJobSpec.setSegmentUriSuffix(taskConfigs.get(BatchConfigProperties.PUSH_SEGMENT_URI_SUFFIX));
+
+    SegmentGenerationJobSpec spec = generatePushJobSpec(tableName, taskConfigs, pushJobSpec);
+
+    URI outputSegmentDirURI = null;
+    if (taskConfigs.containsKey(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI)) {
+      outputSegmentDirURI = URI.create(taskConfigs.get(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI));
+    }
+    try (PinotFS outputFileFS = MinionTaskUtils.getOutputPinotFS(taskConfigs, outputSegmentDirURI)) {
+      switch (BatchConfigProperties.SegmentPushType.valueOf(pushMode.toUpperCase())) {
+        case TAR:
+          try (PinotFS pinotFS = MinionTaskUtils.getLocalPinotFs()) {
+            SegmentPushUtils.pushSegments(
+                spec, pinotFS, Arrays.asList(outputSegmentTarURI.toString()), headers, parameters);
+          } catch (RetriableOperationException | AttemptsExceededException e) {
+            throw new RuntimeException(e);
+          }
+          break;
+        case URI:

Review Comment:
   Agreed. Removed it now.



##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java:
##########
@@ -82,6 +84,8 @@ public class RealtimeToOfflineSegmentsTaskGenerator extends BaseTaskGenerator {
 
   private static final String DEFAULT_BUCKET_PERIOD = "1d";
   private static final String DEFAULT_BUFFER_PERIOD = "2d";
+  private static final BatchConfigProperties.SegmentPushType DEFAULT_SEGMENT_PUSH_TYPE =

Review Comment:
   Removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] zhtaoxiang commented on a diff in pull request #9825: Allow segment upload via Metadata in MergeRollup Minion tas

Posted by GitBox <gi...@apache.org>.
zhtaoxiang commented on code in PR #9825:
URL: https://github.com/apache/pinot/pull/9825#discussion_r1025503700


##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java:
##########
@@ -242,6 +255,10 @@ public List<SegmentConversionResult> executeTask(PinotTaskConfig pinotTaskConfig
             new BasicHeader(FileUploadDownloadClient.CustomHeaders.SEGMENT_ZK_METADATA_CUSTOM_MAP_MODIFIER,
                 segmentZKMetadataCustomMapModifier.toJsonString());
 
+        URI outputSegmentTarURI = moveSegmentToOutputPinotFS(pinotTaskConfig.getConfigs(), convertedTarredSegmentFile);

Review Comment:
   I feel that we should check push mode first before moving the segment. Otherwise, this middle step is not needed.



##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java:
##########
@@ -571,6 +577,40 @@ private List<PinotTaskConfig> createPinotTaskConfigs(List<SegmentZKMetadata> sel
     return pinotTaskConfigs;
   }
 
+  private Map<String, String> getPushTaskConfig(Map<String, String> batchConfigMap, String downloadUrls) {
+    try {
+      String[] downloadURLList = downloadUrls.split(MinionConstants.SEGMENT_NAME_SEPARATOR);
+      if (downloadURLList.length > 0) {
+        String downloadUrl = downloadURLList[0];
+        URI downloadURI = URI.create(downloadUrl);
+        URI outputDirURI = null;
+        if (!downloadURI.getScheme().contentEquals("http")) {

Review Comment:
   Maybe I misunderstand something, but I don't fully follow the logic here. Why is the outputDir decided by the downloadUrl?



##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java:
##########
@@ -571,6 +577,40 @@ private List<PinotTaskConfig> createPinotTaskConfigs(List<SegmentZKMetadata> sel
     return pinotTaskConfigs;
   }
 
+  private Map<String, String> getPushTaskConfig(Map<String, String> batchConfigMap, String downloadUrls) {
+    try {
+      String[] downloadURLList = downloadUrls.split(MinionConstants.SEGMENT_NAME_SEPARATOR);
+      if (downloadURLList.length > 0) {
+        String downloadUrl = downloadURLList[0];
+        URI downloadURI = URI.create(downloadUrl);
+        URI outputDirURI = null;
+        if (!downloadURI.getScheme().contentEquals("http")) {
+          String outputDir = downloadUrl.substring(0, downloadUrl.lastIndexOf("/"));
+          outputDirURI = URI.create(outputDir);
+        }
+        String pushMode = IngestionConfigUtils.getPushMode(batchConfigMap);
+
+        Map<String, String> singleFileGenerationTaskConfig = new HashMap<>(batchConfigMap);
+        if (outputDirURI != null) {
+          URI outputSegmentDirURI = SegmentGenerationUtils.getRelativeOutputPath(
+              outputDirURI, downloadURI, outputDirURI);
+          singleFileGenerationTaskConfig.put(
+              BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI, outputSegmentDirURI.toString());
+        }
+        if ((outputDirURI == null) || (pushMode == null)) {

Review Comment:
   When push mode is MetadataPush, we should set the default location `data_dir/tablename` when OUTPUT_SEGMENT_DIR_URI is not set by users. Most of the time, users will not need to set it.



##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/TaskUtils.java:
##########
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.minion.tasks;
+
+import java.net.URI;
+import java.util.Map;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.filesystem.LocalPinotFS;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
+import org.apache.pinot.spi.plugin.PluginManager;
+import org.apache.pinot.spi.utils.IngestionConfigUtils;
+
+
+public class TaskUtils {
+  private TaskUtils() {
+  }
+
+  static PinotFS getInputPinotFS(Map<String, String> taskConfigs, URI fileURI)

Review Comment:
   It seems that this is not used anywhere?



##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionPushUtils.java:
##########
@@ -0,0 +1,371 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.minion.tasks;
+
+import com.google.common.base.Preconditions;
+import java.io.File;
+import java.io.InputStream;
+import java.io.Serializable;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.FileSystems;
+import java.nio.file.PathMatcher;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.http.Header;
+import org.apache.http.NameValuePair;
+import org.apache.http.message.BasicHeader;
+import org.apache.pinot.common.exception.HttpErrorStatusException;
+import org.apache.pinot.common.utils.FileUploadDownloadClient;
+import org.apache.pinot.common.utils.SimpleHttpResponse;
+import org.apache.pinot.common.utils.TarGzCompressionUtils;
+import org.apache.pinot.common.utils.http.HttpClient;
+import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.creator.name.SegmentNameUtils;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import org.apache.pinot.spi.ingestion.batch.spec.Constants;
+import org.apache.pinot.spi.ingestion.batch.spec.PinotClusterSpec;
+import org.apache.pinot.spi.ingestion.batch.spec.PushJobSpec;
+import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
+import org.apache.pinot.spi.utils.retry.AttemptsExceededException;
+import org.apache.pinot.spi.utils.retry.RetriableOperationException;
+import org.apache.pinot.spi.utils.retry.RetryPolicies;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class MinionPushUtils implements Serializable {

Review Comment:
   Just curious: why don't we reuse `org.apache.pinot.segment.local.utils.SegmentPushUtils` directly?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] codecov-commenter commented on pull request #9825: Allow segment upload via Metadata in MergeRollup Minion task

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on PR #9825:
URL: https://github.com/apache/pinot/pull/9825#issuecomment-1321919289

   # [Codecov](https://codecov.io/gh/apache/pinot/pull/9825?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#9825](https://codecov.io/gh/apache/pinot/pull/9825?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (4486ae8) into [master](https://codecov.io/gh/apache/pinot/commit/e307106a59646a8e1d9475baabe66fa3cde09490?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (e307106) will **decrease** coverage by `12.23%`.
   > The diff coverage is `12.61%`.
   
   ```diff
   @@              Coverage Diff              @@
   ##             master    #9825       +/-   ##
   =============================================
   - Coverage     28.06%   15.82%   -12.24%     
   - Complexity       53      175      +122     
   =============================================
     Files          1949     1912       -37     
     Lines        104632   102792     -1840     
     Branches      15847    15627      -220     
   =============================================
   - Hits          29362    16264    -13098     
   - Misses        72395    85330    +12935     
   + Partials       2875     1198     -1677     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | integration1 | `?` | |
   | integration2 | `?` | |
   | unittests2 | `15.82% <12.61%> (?)` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/pinot/pull/9825?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [.../tasks/BaseMultipleSegmentsConversionExecutor.java](https://codecov.io/gh/apache/pinot/pull/9825/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1taW5pb24tdGFza3MvcGlub3QtbWluaW9uLWJ1aWx0aW4tdGFza3Mvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3Bpbm90L3BsdWdpbi9taW5pb24vdGFza3MvQmFzZU11bHRpcGxlU2VnbWVudHNDb252ZXJzaW9uRXhlY3V0b3IuamF2YQ==) | `3.01% <0.00%> (-86.84%)` | :arrow_down: |
   | [...nandpush/SegmentGenerationAndPushTaskExecutor.java](https://codecov.io/gh/apache/pinot/pull/9825/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1taW5pb24tdGFza3MvcGlub3QtbWluaW9uLWJ1aWx0aW4tdGFza3Mvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3Bpbm90L3BsdWdpbi9taW5pb24vdGFza3Mvc2VnbWVudGdlbmVyYXRpb25hbmRwdXNoL1NlZ21lbnRHZW5lcmF0aW9uQW5kUHVzaFRhc2tFeGVjdXRvci5qYXZh) | `0.00% <0.00%> (-67.89%)` | :arrow_down: |
   | [...andpush/SegmentGenerationAndPushTaskGenerator.java](https://codecov.io/gh/apache/pinot/pull/9825/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1taW5pb24tdGFza3MvcGlub3QtbWluaW9uLWJ1aWx0aW4tdGFza3Mvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3Bpbm90L3BsdWdpbi9taW5pb24vdGFza3Mvc2VnbWVudGdlbmVyYXRpb25hbmRwdXNoL1NlZ21lbnRHZW5lcmF0aW9uQW5kUHVzaFRhc2tHZW5lcmF0b3IuamF2YQ==) | `2.15% <0.00%> (-31.73%)` | :arrow_down: |
   | [...he/pinot/segment/local/utils/SegmentPushUtils.java](https://codecov.io/gh/apache/pinot/pull/9825/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc2VnbWVudC1sb2NhbC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3Qvc2VnbWVudC9sb2NhbC91dGlscy9TZWdtZW50UHVzaFV0aWxzLmphdmE=) | `0.00% <0.00%> (ø)` | |
   | [...che/pinot/plugin/minion/tasks/MinionTaskUtils.java](https://codecov.io/gh/apache/pinot/pull/9825/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1taW5pb24tdGFza3MvcGlub3QtbWluaW9uLWJ1aWx0aW4tdGFza3Mvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3Bpbm90L3BsdWdpbi9taW5pb24vdGFza3MvTWluaW9uVGFza1V0aWxzLmphdmE=) | `21.62% <50.00%> (ø)` | |
   | [...on/tasks/mergerollup/MergeRollupTaskGenerator.java](https://codecov.io/gh/apache/pinot/pull/9825/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1taW5pb24tdGFza3MvcGlub3QtbWluaW9uLWJ1aWx0aW4tdGFza3Mvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3Bpbm90L3BsdWdpbi9taW5pb24vdGFza3MvbWVyZ2Vyb2xsdXAvTWVyZ2VSb2xsdXBUYXNrR2VuZXJhdG9yLmphdmE=) | `75.22% <100.00%> (-1.75%)` | :arrow_down: |
   | [...gments/RealtimeToOfflineSegmentsTaskGenerator.java](https://codecov.io/gh/apache/pinot/pull/9825/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1taW5pb24tdGFza3MvcGlub3QtbWluaW9uLWJ1aWx0aW4tdGFza3Mvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3Bpbm90L3BsdWdpbi9taW5pb24vdGFza3MvcmVhbHRpbWV0b29mZmxpbmVzZWdtZW50cy9SZWFsdGltZVRvT2ZmbGluZVNlZ21lbnRzVGFza0dlbmVyYXRvci5qYXZh) | `93.43% <100.00%> (+22.84%)` | :arrow_up: |
   | [...src/main/java/org/apache/pinot/sql/FilterKind.java](https://codecov.io/gh/apache/pinot/pull/9825/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9zcWwvRmlsdGVyS2luZC5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...ain/java/org/apache/pinot/core/data/table/Key.java](https://codecov.io/gh/apache/pinot/pull/9825/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9kYXRhL3RhYmxlL0tleS5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...n/java/org/apache/pinot/core/data/table/Table.java](https://codecov.io/gh/apache/pinot/pull/9825/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9kYXRhL3RhYmxlL1RhYmxlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | ... and [1020 more](https://codecov.io/gh/apache/pinot/pull/9825/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] KKcorps commented on a diff in pull request #9825: Allow segment upload via Metadata in MergeRollup Minion task

Posted by GitBox <gi...@apache.org>.
KKcorps commented on code in PR #9825:
URL: https://github.com/apache/pinot/pull/9825#discussion_r1038166185


##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java:
##########
@@ -276,6 +299,87 @@ public List<SegmentConversionResult> executeTask(PinotTaskConfig pinotTaskConfig
     }
   }
 
+  private void pushSegment(String tableName, Map<String, String> taskConfigs, URI outputSegmentTarURI,
+      List<Header> headers, List<NameValuePair> parameters) throws Exception {
+    String pushMode = taskConfigs.get(BatchConfigProperties.PUSH_MODE);
+    LOGGER.info("Trying to push Pinot segment with push mode {} from {}", pushMode, outputSegmentTarURI);
+
+    PushJobSpec pushJobSpec = new PushJobSpec();
+    pushJobSpec.setPushAttempts(DEFUALT_PUSH_ATTEMPTS);
+    pushJobSpec.setPushParallelism(DEFAULT_PUSH_PARALLELISM);
+    pushJobSpec.setPushRetryIntervalMillis(DEFAULT_PUSH_RETRY_INTERVAL_MILLIS);
+    pushJobSpec.setSegmentUriPrefix(taskConfigs.get(BatchConfigProperties.PUSH_SEGMENT_URI_PREFIX));
+    pushJobSpec.setSegmentUriSuffix(taskConfigs.get(BatchConfigProperties.PUSH_SEGMENT_URI_SUFFIX));
+
+    SegmentGenerationJobSpec spec = generatePushJobSpec(tableName, taskConfigs, pushJobSpec);
+
+    URI outputSegmentDirURI = null;
+    if (taskConfigs.containsKey(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI)) {
+      outputSegmentDirURI = URI.create(taskConfigs.get(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI));
+    }
+    try (PinotFS outputFileFS = MinionTaskUtils.getOutputPinotFS(taskConfigs, outputSegmentDirURI)) {
+      switch (BatchConfigProperties.SegmentPushType.valueOf(pushMode.toUpperCase())) {
+        case TAR:
+          try (PinotFS pinotFS = MinionTaskUtils.getLocalPinotFs()) {
+            SegmentPushUtils.pushSegments(
+                spec, pinotFS, Arrays.asList(outputSegmentTarURI.toString()), headers, parameters);
+          } catch (RetriableOperationException | AttemptsExceededException e) {
+            throw new RuntimeException(e);
+          }
+          break;
+        case METADATA:
+          try {
+            Map<String, String> segmentUriToTarPathMap =
+                SegmentPushUtils.getSegmentUriToTarPathMap(outputSegmentDirURI, pushJobSpec,
+                    new String[]{outputSegmentTarURI.toString()});
+            SegmentPushUtils.sendSegmentUriAndMetadata(spec, outputFileFS, segmentUriToTarPathMap, headers, parameters);
+          } catch (RetriableOperationException | AttemptsExceededException e) {
+            throw new RuntimeException(e);
+          }
+          break;
+        default:
+          throw new UnsupportedOperationException("Unrecognized push mode - " + pushMode);
+      }
+    }
+  }
+
+  private SegmentGenerationJobSpec generatePushJobSpec(String tableName, Map<String, String> taskConfigs,
+      PushJobSpec pushJobSpec) {
+
+    TableSpec tableSpec = new TableSpec();
+    tableSpec.setTableName(tableName);
+
+    PinotClusterSpec pinotClusterSpec = new PinotClusterSpec();
+    pinotClusterSpec.setControllerURI(taskConfigs.get(BatchConfigProperties.PUSH_CONTROLLER_URI));
+    PinotClusterSpec[] pinotClusterSpecs = new PinotClusterSpec[]{pinotClusterSpec};
+
+    SegmentGenerationJobSpec spec = new SegmentGenerationJobSpec();
+    spec.setPushJobSpec(pushJobSpec);
+    spec.setTableSpec(tableSpec);
+    spec.setPinotClusterSpecs(pinotClusterSpecs);
+    spec.setAuthToken(taskConfigs.get(BatchConfigProperties.AUTH_TOKEN));
+
+    return spec;
+  }
+
+  private URI moveSegmentToOutputPinotFS(Map<String, String> taskConfigs, File localSegmentTarFile)
+      throws Exception {
+    if (!taskConfigs.containsKey(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI)) {

Review Comment:
   Makes sense. Removed this. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] zhtaoxiang commented on a diff in pull request #9825: Allow segment upload via Metadata in MergeRollup Minion task

Posted by GitBox <gi...@apache.org>.
zhtaoxiang commented on code in PR #9825:
URL: https://github.com/apache/pinot/pull/9825#discussion_r1035630266


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java:
##########
@@ -97,6 +97,49 @@ public static URI generateSegmentTarURI(URI dirURI, URI fileURI, String prefix,
   public static void pushSegments(SegmentGenerationJobSpec spec, PinotFS fileSystem, List<String> tarFilePaths)
       throws RetriableOperationException, AttemptsExceededException {
     String tableName = spec.getTableSpec().getTableName();
+    AuthProvider authProvider = AuthProviderUtils.makeAuthProvider(spec.getAuthToken());

Review Comment:
   just curious: why do we need to create those 3 new methods? It seems that there is no logic change?



##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java:
##########
@@ -276,6 +299,98 @@ public List<SegmentConversionResult> executeTask(PinotTaskConfig pinotTaskConfig
     }
   }
 
+  private void pushSegment(String tableName, Map<String, String> taskConfigs, URI outputSegmentTarURI,
+      List<Header> headers, List<NameValuePair> parameters) throws Exception {
+    String pushMode = taskConfigs.get(BatchConfigProperties.PUSH_MODE);
+    LOGGER.info("Trying to push Pinot segment with push mode {} from {}", pushMode, outputSegmentTarURI);
+
+    PushJobSpec pushJobSpec = new PushJobSpec();
+    pushJobSpec.setPushAttempts(DEFUALT_PUSH_ATTEMPTS);
+    pushJobSpec.setPushParallelism(DEFAULT_PUSH_PARALLELISM);
+    pushJobSpec.setPushRetryIntervalMillis(DEFAULT_PUSH_RETRY_INTERVAL_MILLIS);
+    pushJobSpec.setSegmentUriPrefix(taskConfigs.get(BatchConfigProperties.PUSH_SEGMENT_URI_PREFIX));
+    pushJobSpec.setSegmentUriSuffix(taskConfigs.get(BatchConfigProperties.PUSH_SEGMENT_URI_SUFFIX));
+
+    SegmentGenerationJobSpec spec = generatePushJobSpec(tableName, taskConfigs, pushJobSpec);
+
+    URI outputSegmentDirURI = null;
+    if (taskConfigs.containsKey(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI)) {
+      outputSegmentDirURI = URI.create(taskConfigs.get(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI));
+    }
+    try (PinotFS outputFileFS = MinionTaskUtils.getOutputPinotFS(taskConfigs, outputSegmentDirURI)) {
+      switch (BatchConfigProperties.SegmentPushType.valueOf(pushMode.toUpperCase())) {
+        case TAR:
+          try (PinotFS pinotFS = MinionTaskUtils.getLocalPinotFs()) {
+            SegmentPushUtils.pushSegments(
+                spec, pinotFS, Arrays.asList(outputSegmentTarURI.toString()), headers, parameters);
+          } catch (RetriableOperationException | AttemptsExceededException e) {
+            throw new RuntimeException(e);
+          }
+          break;
+        case URI:

Review Comment:
   I feel that we may not allow URI mode, it does not make too much sense to use URI mode in minion tasks. WDYT?



##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java:
##########
@@ -82,6 +84,8 @@ public class RealtimeToOfflineSegmentsTaskGenerator extends BaseTaskGenerator {
 
   private static final String DEFAULT_BUCKET_PERIOD = "1d";
   private static final String DEFAULT_BUFFER_PERIOD = "2d";
+  private static final BatchConfigProperties.SegmentPushType DEFAULT_SEGMENT_PUSH_TYPE =

Review Comment:
   It seems that this variable is not used?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] KKcorps commented on a diff in pull request #9825: Allow segment upload via Metadata in MergeRollup Minion task

Posted by GitBox <gi...@apache.org>.
KKcorps commented on code in PR #9825:
URL: https://github.com/apache/pinot/pull/9825#discussion_r1027786549


##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java:
##########
@@ -571,6 +577,40 @@ private List<PinotTaskConfig> createPinotTaskConfigs(List<SegmentZKMetadata> sel
     return pinotTaskConfigs;
   }
 
+  private Map<String, String> getPushTaskConfig(Map<String, String> batchConfigMap, String downloadUrls) {
+    try {
+      String[] downloadURLList = downloadUrls.split(MinionConstants.SEGMENT_NAME_SEPARATOR);
+      if (downloadURLList.length > 0) {
+        String downloadUrl = downloadURLList[0];
+        URI downloadURI = URI.create(downloadUrl);
+        URI outputDirURI = null;
+        if (!downloadURI.getScheme().contentEquals("http")) {

Review Comment:
   That makes more sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] snleee commented on a diff in pull request #9825: Allow segment upload via Metadata in MergeRollup Minion task

Posted by GitBox <gi...@apache.org>.
snleee commented on code in PR #9825:
URL: https://github.com/apache/pinot/pull/9825#discussion_r1026045441


##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java:
##########
@@ -242,6 +256,10 @@ public List<SegmentConversionResult> executeTask(PinotTaskConfig pinotTaskConfig
             new BasicHeader(FileUploadDownloadClient.CustomHeaders.SEGMENT_ZK_METADATA_CUSTOM_MAP_MODIFIER,
                 segmentZKMetadataCustomMapModifier.toJsonString());
 
+        URI outputSegmentTarURI = moveSegmentToOutputPinotFS(pinotTaskConfig.getConfigs(), convertedTarredSegmentFile);
+        LOGGER.info("Moved generated segment from [{}] to location: [{}]",

Review Comment:
   What happens if the Minion somehow got killed (e.g. restart) after finish the segment move to outputPinotFS but before finishing the metadata push to controller?
   
   I think that this can potentially leave some files in the deep storage without any segment metadata pointing those files. This will potentially become the problem unless we have the cleanup mechanism. Do we have any design/solution to address this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] snleee commented on a diff in pull request #9825: Allow segment upload via Metadata in MergeRollup Minion task

Posted by GitBox <gi...@apache.org>.
snleee commented on code in PR #9825:
URL: https://github.com/apache/pinot/pull/9825#discussion_r1026045441


##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java:
##########
@@ -242,6 +256,10 @@ public List<SegmentConversionResult> executeTask(PinotTaskConfig pinotTaskConfig
             new BasicHeader(FileUploadDownloadClient.CustomHeaders.SEGMENT_ZK_METADATA_CUSTOM_MAP_MODIFIER,
                 segmentZKMetadataCustomMapModifier.toJsonString());
 
+        URI outputSegmentTarURI = moveSegmentToOutputPinotFS(pinotTaskConfig.getConfigs(), convertedTarredSegmentFile);
+        LOGGER.info("Moved generated segment from [{}] to location: [{}]",

Review Comment:
   What happens if the Minion somehow got killed (e.g. restart) after finish the segment move to outputPinotFS but before finishing the metadata push to controller?
   
   I think that this can potentially leave some files in the deep storage without any Pinot side segment metadata pointing those files. This will potentially become the problem unless we have the cleanup mechanism. Do we have any design/solution to address this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] KKcorps commented on a diff in pull request #9825: Allow segment upload via Metadata in MergeRollup Minion task

Posted by GitBox <gi...@apache.org>.
KKcorps commented on code in PR #9825:
URL: https://github.com/apache/pinot/pull/9825#discussion_r1036044210


##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java:
##########
@@ -67,7 +72,35 @@ static PinotFS getOutputPinotFS(Map<String, String> taskConfigs, URI fileURI)
     return PinotFSFactory.create(fileURIScheme);
   }
 
-  static PinotFS getLocalPinotFs() {
+  public static Map<String, String> getPushTaskConfig(String tableName, Map<String, String> taskConfigs,
+      ClusterInfoAccessor clusterInfoAccessor) {
+    try {
+      URI outputDirURI = URI.create(clusterInfoAccessor.getDataDir() + "/" + tableName);

Review Comment:
   It is not really used in the case when pushMode is TAR.
   
   ```java
   if (BatchConfigProperties.SegmentPushType.valueOf(pushMode.toUpperCase())
       != BatchConfigProperties.SegmentPushType.TAR) {
     outputSegmentTarURI = moveSegmentToOutputPinotFS(configs, convertedTarredSegmentFile);
     LOGGER.info("Moved generated segment from [{}] to location: [{}]", convertedTarredSegmentFile,
         outputSegmentTarURI);
   } else {
     outputSegmentTarURI = convertedTarredSegmentFile.toURI();
   }
   
   ......
   case TAR:
     try (PinotFS pinotFS = MinionTaskUtils.getLocalPinotFs()) {
       SegmentPushUtils.pushSegments(
           spec, pinotFS, Arrays.asList(outputSegmentTarURI.toString()), headers, parameters);
     } catch (RetriableOperationException | AttemptsExceededException e) {
       throw new RuntimeException(e);
     }
     break;
    ```         



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] snleee commented on a diff in pull request #9825: Allow segment upload via Metadata in MergeRollup Minion task

Posted by GitBox <gi...@apache.org>.
snleee commented on code in PR #9825:
URL: https://github.com/apache/pinot/pull/9825#discussion_r1040096535


##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java:
##########
@@ -276,6 +300,89 @@ public List<SegmentConversionResult> executeTask(PinotTaskConfig pinotTaskConfig
     }
   }
 
+  private void pushSegment(String tableName, Map<String, String> taskConfigs, URI outputSegmentTarURI,
+      List<Header> headers, List<NameValuePair> parameters, SegmentConversionResult segmentConversionResult)
+      throws Exception {
+    String pushMode = taskConfigs.get(BatchConfigProperties.PUSH_MODE);
+    LOGGER.info("Trying to push Pinot segment with push mode {} from {}", pushMode, outputSegmentTarURI);
+
+    PushJobSpec pushJobSpec = new PushJobSpec();
+    pushJobSpec.setPushAttempts(DEFUALT_PUSH_ATTEMPTS);
+    pushJobSpec.setPushParallelism(DEFAULT_PUSH_PARALLELISM);
+    pushJobSpec.setPushRetryIntervalMillis(DEFAULT_PUSH_RETRY_INTERVAL_MILLIS);
+    pushJobSpec.setSegmentUriPrefix(taskConfigs.get(BatchConfigProperties.PUSH_SEGMENT_URI_PREFIX));
+    pushJobSpec.setSegmentUriSuffix(taskConfigs.get(BatchConfigProperties.PUSH_SEGMENT_URI_SUFFIX));
+
+    SegmentGenerationJobSpec spec = generatePushJobSpec(tableName, taskConfigs, pushJobSpec);
+
+    switch (BatchConfigProperties.SegmentPushType.valueOf(pushMode.toUpperCase())) {
+      case TAR:
+        try {
+          File tarFile = new File(outputSegmentTarURI);
+          String segmentName = segmentConversionResult.getSegmentName();
+          String tableNameWithType = segmentConversionResult.getTableNameWithType();
+          String uploadURL = taskConfigs.get(MinionConstants.UPLOAD_URL_KEY);
+          SegmentConversionUtils.uploadSegment(taskConfigs, headers, parameters, tableNameWithType, segmentName,
+              uploadURL, tarFile);
+        } catch (RetriableOperationException | AttemptsExceededException e) {

Review Comment:
   Why do we need to catch specific exceptions and wrap it with `RuntimeException` here? Our top caller function's definition: `executeTask() throw Exception`



##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java:
##########
@@ -276,6 +300,89 @@ public List<SegmentConversionResult> executeTask(PinotTaskConfig pinotTaskConfig
     }
   }
 
+  private void pushSegment(String tableName, Map<String, String> taskConfigs, URI outputSegmentTarURI,
+      List<Header> headers, List<NameValuePair> parameters, SegmentConversionResult segmentConversionResult)
+      throws Exception {
+    String pushMode = taskConfigs.get(BatchConfigProperties.PUSH_MODE);
+    LOGGER.info("Trying to push Pinot segment with push mode {} from {}", pushMode, outputSegmentTarURI);
+
+    PushJobSpec pushJobSpec = new PushJobSpec();
+    pushJobSpec.setPushAttempts(DEFUALT_PUSH_ATTEMPTS);
+    pushJobSpec.setPushParallelism(DEFAULT_PUSH_PARALLELISM);
+    pushJobSpec.setPushRetryIntervalMillis(DEFAULT_PUSH_RETRY_INTERVAL_MILLIS);
+    pushJobSpec.setSegmentUriPrefix(taskConfigs.get(BatchConfigProperties.PUSH_SEGMENT_URI_PREFIX));
+    pushJobSpec.setSegmentUriSuffix(taskConfigs.get(BatchConfigProperties.PUSH_SEGMENT_URI_SUFFIX));
+
+    SegmentGenerationJobSpec spec = generatePushJobSpec(tableName, taskConfigs, pushJobSpec);
+
+    switch (BatchConfigProperties.SegmentPushType.valueOf(pushMode.toUpperCase())) {
+      case TAR:
+        try {
+          File tarFile = new File(outputSegmentTarURI);
+          String segmentName = segmentConversionResult.getSegmentName();
+          String tableNameWithType = segmentConversionResult.getTableNameWithType();
+          String uploadURL = taskConfigs.get(MinionConstants.UPLOAD_URL_KEY);
+          SegmentConversionUtils.uploadSegment(taskConfigs, headers, parameters, tableNameWithType, segmentName,
+              uploadURL, tarFile);
+        } catch (RetriableOperationException | AttemptsExceededException e) {
+          throw new RuntimeException(e);
+        }
+        break;
+      case METADATA:
+        if (taskConfigs.containsKey(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI)) {
+          URI outputSegmentDirURI = URI.create(taskConfigs.get(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI));
+          try (PinotFS outputFileFS = MinionTaskUtils.getOutputPinotFS(taskConfigs, outputSegmentDirURI)) {
+            Map<String, String> segmentUriToTarPathMap =
+                SegmentPushUtils.getSegmentUriToTarPathMap(outputSegmentDirURI, pushJobSpec,
+                    new String[]{outputSegmentTarURI.toString()});
+            SegmentPushUtils.sendSegmentUriAndMetadata(spec, outputFileFS, segmentUriToTarPathMap, headers, parameters);
+          } catch (RetriableOperationException | AttemptsExceededException e) {

Review Comment:
   same here 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] KKcorps commented on a diff in pull request #9825: Allow segment upload via Metadata in MergeRollup Minion tas

Posted by GitBox <gi...@apache.org>.
KKcorps commented on code in PR #9825:
URL: https://github.com/apache/pinot/pull/9825#discussion_r1025584865


##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java:
##########
@@ -242,6 +255,10 @@ public List<SegmentConversionResult> executeTask(PinotTaskConfig pinotTaskConfig
             new BasicHeader(FileUploadDownloadClient.CustomHeaders.SEGMENT_ZK_METADATA_CUSTOM_MAP_MODIFIER,
                 segmentZKMetadataCustomMapModifier.toJsonString());
 
+        URI outputSegmentTarURI = moveSegmentToOutputPinotFS(pinotTaskConfig.getConfigs(), convertedTarredSegmentFile);

Review Comment:
   makes sense



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] KKcorps commented on a diff in pull request #9825: Allow segment upload via Metadata in MergeRollup Minion task

Posted by GitBox <gi...@apache.org>.
KKcorps commented on code in PR #9825:
URL: https://github.com/apache/pinot/pull/9825#discussion_r1038164688


##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java:
##########
@@ -79,15 +81,18 @@ public class MergeRollupMinionClusterIntegrationTest extends BaseClusterIntegrat
   protected final File _segmentDir1 = new File(_tempDir, "segmentDir1");
   protected final File _segmentDir2 = new File(_tempDir, "segmentDir2");
   protected final File _segmentDir3 = new File(_tempDir, "segmentDir3");
+  protected final File _segmentDir4 = new File(_tempDir, "segmentDir4");
   protected final File _tarDir1 = new File(_tempDir, "tarDir1");
   protected final File _tarDir2 = new File(_tempDir, "tarDir2");
   protected final File _tarDir3 = new File(_tempDir, "tarDir3");
+  protected final File _tarDir4 = new File(_tempDir, "tarDir4");
 
   @BeforeClass
   public void setUp()
       throws Exception {
-    TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir1, _segmentDir2, _segmentDir3, _tarDir1, _tarDir2,
-        _tarDir3);
+    TestUtils.ensureDirectoriesExistAndEmpty(_tempDir,

Review Comment:
   fixed now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] snleee commented on a diff in pull request #9825: Allow segment upload via Metadata in MergeRollup Minion task

Posted by GitBox <gi...@apache.org>.
snleee commented on code in PR #9825:
URL: https://github.com/apache/pinot/pull/9825#discussion_r1039916803


##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java:
##########
@@ -276,6 +300,84 @@ public List<SegmentConversionResult> executeTask(PinotTaskConfig pinotTaskConfig
     }
   }
 
+  private void pushSegment(String tableName, Map<String, String> taskConfigs, URI outputSegmentTarURI,
+      List<Header> headers, List<NameValuePair> parameters) throws Exception {
+    String pushMode = taskConfigs.get(BatchConfigProperties.PUSH_MODE);
+    LOGGER.info("Trying to push Pinot segment with push mode {} from {}", pushMode, outputSegmentTarURI);
+
+    PushJobSpec pushJobSpec = new PushJobSpec();
+    pushJobSpec.setPushAttempts(DEFUALT_PUSH_ATTEMPTS);
+    pushJobSpec.setPushParallelism(DEFAULT_PUSH_PARALLELISM);
+    pushJobSpec.setPushRetryIntervalMillis(DEFAULT_PUSH_RETRY_INTERVAL_MILLIS);
+    pushJobSpec.setSegmentUriPrefix(taskConfigs.get(BatchConfigProperties.PUSH_SEGMENT_URI_PREFIX));
+    pushJobSpec.setSegmentUriSuffix(taskConfigs.get(BatchConfigProperties.PUSH_SEGMENT_URI_SUFFIX));
+
+    SegmentGenerationJobSpec spec = generatePushJobSpec(tableName, taskConfigs, pushJobSpec);
+
+    switch (BatchConfigProperties.SegmentPushType.valueOf(pushMode.toUpperCase())) {
+      case TAR:
+        try (PinotFS pinotFS = MinionTaskUtils.getLocalPinotFs()) {
+          SegmentPushUtils.pushSegments(

Review Comment:
   This will break backward compatibilty. We should use `SegmentConversionUtils.uploadSegment()` when `TAR push` is being used.
   
   `SegmentConversionUtils.uploadSegment()` takes some configuration from task config and we do use those.
   
   ```
       String maxNumAttemptsConfigStr = configs.get(MinionConstants.MAX_NUM_ATTEMPTS_KEY);
       int maxNumAttemptsFromConfig =
           maxNumAttemptsConfigStr != null ? Integer.parseInt(maxNumAttemptsConfigStr) : DEFAULT_MAX_NUM_ATTEMPTS;
       int maxNumAttempts = Math.max(maxNumAttemptsFromConfig, uriProvider.numAddresses());
       LOGGER.info("Retry uploading for {} times. Max num attempts from pinot minion config: {}, number of IP addresses "
           + "for upload URI: {}", maxNumAttempts, maxNumAttemptsFromConfig, uriProvider.numAddresses());
       String initialRetryDelayMsConfig = configs.get(MinionConstants.INITIAL_RETRY_DELAY_MS_KEY);
       long initialRetryDelayMs =
           initialRetryDelayMsConfig != null ? Long.parseLong(initialRetryDelayMsConfig) : DEFAULT_INITIAL_RETRY_DELAY_MS;
       String retryScaleFactorConfig = configs.get(MinionConstants.RETRY_SCALE_FACTOR_KEY);
       double retryScaleFactor =
           retryScaleFactorConfig != null ? Double.parseDouble(retryScaleFactorConfig) : DEFAULT_RETRY_SCALE_FACTOR;
       RetryPolicy retryPolicy =
           RetryPolicies.exponentialBackoffRetryPolicy(maxNumAttempts, initialRetryDelayMs, retryScaleFactor);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] KKcorps commented on a diff in pull request #9825: Allow segment upload via Metadata in MergeRollup Minion task

Posted by GitBox <gi...@apache.org>.
KKcorps commented on code in PR #9825:
URL: https://github.com/apache/pinot/pull/9825#discussion_r1039983878


##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java:
##########
@@ -276,6 +300,84 @@ public List<SegmentConversionResult> executeTask(PinotTaskConfig pinotTaskConfig
     }
   }
 
+  private void pushSegment(String tableName, Map<String, String> taskConfigs, URI outputSegmentTarURI,
+      List<Header> headers, List<NameValuePair> parameters) throws Exception {
+    String pushMode = taskConfigs.get(BatchConfigProperties.PUSH_MODE);
+    LOGGER.info("Trying to push Pinot segment with push mode {} from {}", pushMode, outputSegmentTarURI);
+
+    PushJobSpec pushJobSpec = new PushJobSpec();
+    pushJobSpec.setPushAttempts(DEFUALT_PUSH_ATTEMPTS);
+    pushJobSpec.setPushParallelism(DEFAULT_PUSH_PARALLELISM);
+    pushJobSpec.setPushRetryIntervalMillis(DEFAULT_PUSH_RETRY_INTERVAL_MILLIS);
+    pushJobSpec.setSegmentUriPrefix(taskConfigs.get(BatchConfigProperties.PUSH_SEGMENT_URI_PREFIX));
+    pushJobSpec.setSegmentUriSuffix(taskConfigs.get(BatchConfigProperties.PUSH_SEGMENT_URI_SUFFIX));
+
+    SegmentGenerationJobSpec spec = generatePushJobSpec(tableName, taskConfigs, pushJobSpec);
+
+    switch (BatchConfigProperties.SegmentPushType.valueOf(pushMode.toUpperCase())) {
+      case TAR:
+        try (PinotFS pinotFS = MinionTaskUtils.getLocalPinotFs()) {
+          SegmentPushUtils.pushSegments(

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] snleee commented on a diff in pull request #9825: Allow segment upload via Metadata in MergeRollup Minion task

Posted by GitBox <gi...@apache.org>.
snleee commented on code in PR #9825:
URL: https://github.com/apache/pinot/pull/9825#discussion_r1026920136


##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java:
##########
@@ -242,6 +256,10 @@ public List<SegmentConversionResult> executeTask(PinotTaskConfig pinotTaskConfig
             new BasicHeader(FileUploadDownloadClient.CustomHeaders.SEGMENT_ZK_METADATA_CUSTOM_MAP_MODIFIER,
                 segmentZKMetadataCustomMapModifier.toJsonString());
 
+        URI outputSegmentTarURI = moveSegmentToOutputPinotFS(pinotTaskConfig.getConfigs(), convertedTarredSegmentFile);
+        LOGGER.info("Moved generated segment from [{}] to location: [{}]",

Review Comment:
   We should not assume that the later retry will always come to finish this particular task. Similarly, the controller can be down and fail the metadata update. If the controller recovery happens after the minion's retry attempt fails, I think that this will cause some files to sit at the deep storage.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] KKcorps commented on a diff in pull request #9825: Allow segment upload via Metadata in MergeRollup Minion task

Posted by GitBox <gi...@apache.org>.
KKcorps commented on code in PR #9825:
URL: https://github.com/apache/pinot/pull/9825#discussion_r1038165234


##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java:
##########
@@ -276,6 +299,87 @@ public List<SegmentConversionResult> executeTask(PinotTaskConfig pinotTaskConfig
     }
   }
 
+  private void pushSegment(String tableName, Map<String, String> taskConfigs, URI outputSegmentTarURI,
+      List<Header> headers, List<NameValuePair> parameters) throws Exception {
+    String pushMode = taskConfigs.get(BatchConfigProperties.PUSH_MODE);
+    LOGGER.info("Trying to push Pinot segment with push mode {} from {}", pushMode, outputSegmentTarURI);
+
+    PushJobSpec pushJobSpec = new PushJobSpec();
+    pushJobSpec.setPushAttempts(DEFUALT_PUSH_ATTEMPTS);
+    pushJobSpec.setPushParallelism(DEFAULT_PUSH_PARALLELISM);
+    pushJobSpec.setPushRetryIntervalMillis(DEFAULT_PUSH_RETRY_INTERVAL_MILLIS);
+    pushJobSpec.setSegmentUriPrefix(taskConfigs.get(BatchConfigProperties.PUSH_SEGMENT_URI_PREFIX));
+    pushJobSpec.setSegmentUriSuffix(taskConfigs.get(BatchConfigProperties.PUSH_SEGMENT_URI_SUFFIX));
+
+    SegmentGenerationJobSpec spec = generatePushJobSpec(tableName, taskConfigs, pushJobSpec);
+
+    URI outputSegmentDirURI = null;
+    if (taskConfigs.containsKey(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI)) {
+      outputSegmentDirURI = URI.create(taskConfigs.get(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI));
+    }
+    try (PinotFS outputFileFS = MinionTaskUtils.getOutputPinotFS(taskConfigs, outputSegmentDirURI)) {
+      switch (BatchConfigProperties.SegmentPushType.valueOf(pushMode.toUpperCase())) {
+        case TAR:
+          try (PinotFS pinotFS = MinionTaskUtils.getLocalPinotFs()) {
+            SegmentPushUtils.pushSegments(
+                spec, pinotFS, Arrays.asList(outputSegmentTarURI.toString()), headers, parameters);
+          } catch (RetriableOperationException | AttemptsExceededException e) {
+            throw new RuntimeException(e);
+          }
+          break;
+        case METADATA:
+          try {
+            Map<String, String> segmentUriToTarPathMap =
+                SegmentPushUtils.getSegmentUriToTarPathMap(outputSegmentDirURI, pushJobSpec,
+                    new String[]{outputSegmentTarURI.toString()});
+            SegmentPushUtils.sendSegmentUriAndMetadata(spec, outputFileFS, segmentUriToTarPathMap, headers, parameters);
+          } catch (RetriableOperationException | AttemptsExceededException e) {
+            throw new RuntimeException(e);
+          }
+          break;
+        default:
+          throw new UnsupportedOperationException("Unrecognized push mode - " + pushMode);
+      }
+    }
+  }
+
+  private SegmentGenerationJobSpec generatePushJobSpec(String tableName, Map<String, String> taskConfigs,
+      PushJobSpec pushJobSpec) {
+
+    TableSpec tableSpec = new TableSpec();
+    tableSpec.setTableName(tableName);
+
+    PinotClusterSpec pinotClusterSpec = new PinotClusterSpec();
+    pinotClusterSpec.setControllerURI(taskConfigs.get(BatchConfigProperties.PUSH_CONTROLLER_URI));
+    PinotClusterSpec[] pinotClusterSpecs = new PinotClusterSpec[]{pinotClusterSpec};
+
+    SegmentGenerationJobSpec spec = new SegmentGenerationJobSpec();
+    spec.setPushJobSpec(pushJobSpec);
+    spec.setTableSpec(tableSpec);
+    spec.setPinotClusterSpecs(pinotClusterSpecs);
+    spec.setAuthToken(taskConfigs.get(BatchConfigProperties.AUTH_TOKEN));
+
+    return spec;
+  }
+
+  private URI moveSegmentToOutputPinotFS(Map<String, String> taskConfigs, File localSegmentTarFile)
+      throws Exception {
+    if (!taskConfigs.containsKey(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI)) {
+      return localSegmentTarFile.toURI();
+    }
+    URI outputSegmentDirURI = URI.create(taskConfigs.get(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI));
+    try (PinotFS outputFileFS = MinionTaskUtils.getOutputPinotFS(taskConfigs, outputSegmentDirURI)) {
+      URI outputSegmentTarURI = URI.create(outputSegmentDirURI + localSegmentTarFile.getName());
+      if (!Boolean.parseBoolean(taskConfigs.get(BatchConfigProperties.OVERWRITE_OUTPUT)) && outputFileFS.exists(
+          outputSegmentDirURI)) {
+        LOGGER.warn("Not overwrite existing output segment tar file: {}", outputFileFS.exists(outputSegmentDirURI));

Review Comment:
   Makes sense. Now an exception will be thrown if overwrite is disabled.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] KKcorps commented on a diff in pull request #9825: Allow segment upload via Metadata in MergeRollup Minion task

Posted by GitBox <gi...@apache.org>.
KKcorps commented on code in PR #9825:
URL: https://github.com/apache/pinot/pull/9825#discussion_r1038165869


##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java:
##########
@@ -276,6 +299,87 @@ public List<SegmentConversionResult> executeTask(PinotTaskConfig pinotTaskConfig
     }
   }
 
+  private void pushSegment(String tableName, Map<String, String> taskConfigs, URI outputSegmentTarURI,
+      List<Header> headers, List<NameValuePair> parameters) throws Exception {
+    String pushMode = taskConfigs.get(BatchConfigProperties.PUSH_MODE);
+    LOGGER.info("Trying to push Pinot segment with push mode {} from {}", pushMode, outputSegmentTarURI);
+
+    PushJobSpec pushJobSpec = new PushJobSpec();
+    pushJobSpec.setPushAttempts(DEFUALT_PUSH_ATTEMPTS);
+    pushJobSpec.setPushParallelism(DEFAULT_PUSH_PARALLELISM);
+    pushJobSpec.setPushRetryIntervalMillis(DEFAULT_PUSH_RETRY_INTERVAL_MILLIS);
+    pushJobSpec.setSegmentUriPrefix(taskConfigs.get(BatchConfigProperties.PUSH_SEGMENT_URI_PREFIX));
+    pushJobSpec.setSegmentUriSuffix(taskConfigs.get(BatchConfigProperties.PUSH_SEGMENT_URI_SUFFIX));
+
+    SegmentGenerationJobSpec spec = generatePushJobSpec(tableName, taskConfigs, pushJobSpec);
+
+    URI outputSegmentDirURI = null;

Review Comment:
   Not exactly failing but enforcing the dir is always present in metadata push. If not present, we will throw a warning and use TAR push.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] snleee commented on a diff in pull request #9825: Allow segment upload via Metadata in MergeRollup Minion task

Posted by GitBox <gi...@apache.org>.
snleee commented on code in PR #9825:
URL: https://github.com/apache/pinot/pull/9825#discussion_r1036255304


##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java:
##########
@@ -242,6 +256,17 @@ public List<SegmentConversionResult> executeTask(PinotTaskConfig pinotTaskConfig
             new BasicHeader(FileUploadDownloadClient.CustomHeaders.SEGMENT_ZK_METADATA_CUSTOM_MAP_MODIFIER,
                 segmentZKMetadataCustomMapModifier.toJsonString());
 
+        String pushMode = configs.get(BatchConfigProperties.PUSH_MODE);

Review Comment:
   Can we use the uniform naming convention? Is `pushMode` and `PushType` different thing?



##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java:
##########
@@ -276,6 +299,87 @@ public List<SegmentConversionResult> executeTask(PinotTaskConfig pinotTaskConfig
     }
   }
 
+  private void pushSegment(String tableName, Map<String, String> taskConfigs, URI outputSegmentTarURI,
+      List<Header> headers, List<NameValuePair> parameters) throws Exception {
+    String pushMode = taskConfigs.get(BatchConfigProperties.PUSH_MODE);
+    LOGGER.info("Trying to push Pinot segment with push mode {} from {}", pushMode, outputSegmentTarURI);
+
+    PushJobSpec pushJobSpec = new PushJobSpec();
+    pushJobSpec.setPushAttempts(DEFUALT_PUSH_ATTEMPTS);
+    pushJobSpec.setPushParallelism(DEFAULT_PUSH_PARALLELISM);
+    pushJobSpec.setPushRetryIntervalMillis(DEFAULT_PUSH_RETRY_INTERVAL_MILLIS);
+    pushJobSpec.setSegmentUriPrefix(taskConfigs.get(BatchConfigProperties.PUSH_SEGMENT_URI_PREFIX));
+    pushJobSpec.setSegmentUriSuffix(taskConfigs.get(BatchConfigProperties.PUSH_SEGMENT_URI_SUFFIX));
+
+    SegmentGenerationJobSpec spec = generatePushJobSpec(tableName, taskConfigs, pushJobSpec);
+
+    URI outputSegmentDirURI = null;
+    if (taskConfigs.containsKey(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI)) {
+      outputSegmentDirURI = URI.create(taskConfigs.get(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI));
+    }
+    try (PinotFS outputFileFS = MinionTaskUtils.getOutputPinotFS(taskConfigs, outputSegmentDirURI)) {
+      switch (BatchConfigProperties.SegmentPushType.valueOf(pushMode.toUpperCase())) {
+        case TAR:
+          try (PinotFS pinotFS = MinionTaskUtils.getLocalPinotFs()) {
+            SegmentPushUtils.pushSegments(
+                spec, pinotFS, Arrays.asList(outputSegmentTarURI.toString()), headers, parameters);
+          } catch (RetriableOperationException | AttemptsExceededException e) {
+            throw new RuntimeException(e);
+          }
+          break;
+        case METADATA:
+          try {
+            Map<String, String> segmentUriToTarPathMap =
+                SegmentPushUtils.getSegmentUriToTarPathMap(outputSegmentDirURI, pushJobSpec,
+                    new String[]{outputSegmentTarURI.toString()});
+            SegmentPushUtils.sendSegmentUriAndMetadata(spec, outputFileFS, segmentUriToTarPathMap, headers, parameters);
+          } catch (RetriableOperationException | AttemptsExceededException e) {
+            throw new RuntimeException(e);
+          }
+          break;
+        default:
+          throw new UnsupportedOperationException("Unrecognized push mode - " + pushMode);
+      }
+    }
+  }
+
+  private SegmentGenerationJobSpec generatePushJobSpec(String tableName, Map<String, String> taskConfigs,
+      PushJobSpec pushJobSpec) {
+
+    TableSpec tableSpec = new TableSpec();
+    tableSpec.setTableName(tableName);
+
+    PinotClusterSpec pinotClusterSpec = new PinotClusterSpec();
+    pinotClusterSpec.setControllerURI(taskConfigs.get(BatchConfigProperties.PUSH_CONTROLLER_URI));
+    PinotClusterSpec[] pinotClusterSpecs = new PinotClusterSpec[]{pinotClusterSpec};
+
+    SegmentGenerationJobSpec spec = new SegmentGenerationJobSpec();
+    spec.setPushJobSpec(pushJobSpec);
+    spec.setTableSpec(tableSpec);
+    spec.setPinotClusterSpecs(pinotClusterSpecs);
+    spec.setAuthToken(taskConfigs.get(BatchConfigProperties.AUTH_TOKEN));
+
+    return spec;
+  }
+
+  private URI moveSegmentToOutputPinotFS(Map<String, String> taskConfigs, File localSegmentTarFile)
+      throws Exception {
+    if (!taskConfigs.containsKey(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI)) {
+      return localSegmentTarFile.toURI();
+    }
+    URI outputSegmentDirURI = URI.create(taskConfigs.get(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI));
+    try (PinotFS outputFileFS = MinionTaskUtils.getOutputPinotFS(taskConfigs, outputSegmentDirURI)) {
+      URI outputSegmentTarURI = URI.create(outputSegmentDirURI + localSegmentTarFile.getName());
+      if (!Boolean.parseBoolean(taskConfigs.get(BatchConfigProperties.OVERWRITE_OUTPUT)) && outputFileFS.exists(
+          outputSegmentDirURI)) {
+        LOGGER.warn("Not overwrite existing output segment tar file: {}", outputFileFS.exists(outputSegmentDirURI));

Review Comment:
   What's the default value of `OVERWRITE_OUTPUT`? I think that it's `false`?
   
   If the user doesn't pay attention to this config, they may get the false impression that the data gets refreshed (because the minion job succeeded) while it's actually not.



##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java:
##########
@@ -79,15 +81,18 @@ public class MergeRollupMinionClusterIntegrationTest extends BaseClusterIntegrat
   protected final File _segmentDir1 = new File(_tempDir, "segmentDir1");
   protected final File _segmentDir2 = new File(_tempDir, "segmentDir2");
   protected final File _segmentDir3 = new File(_tempDir, "segmentDir3");
+  protected final File _segmentDir4 = new File(_tempDir, "segmentDir4");
   protected final File _tarDir1 = new File(_tempDir, "tarDir1");
   protected final File _tarDir2 = new File(_tempDir, "tarDir2");
   protected final File _tarDir3 = new File(_tempDir, "tarDir3");
+  protected final File _tarDir4 = new File(_tempDir, "tarDir4");
 
   @BeforeClass
   public void setUp()
       throws Exception {
-    TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir1, _segmentDir2, _segmentDir3, _tarDir1, _tarDir2,
-        _tarDir3);
+    TestUtils.ensureDirectoriesExistAndEmpty(_tempDir,

Review Comment:
   This is based on the pinot formatter?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] snleee commented on a diff in pull request #9825: Allow segment upload via Metadata in MergeRollup Minion task

Posted by GitBox <gi...@apache.org>.
snleee commented on code in PR #9825:
URL: https://github.com/apache/pinot/pull/9825#discussion_r1026040267


##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java:
##########
@@ -253,9 +271,10 @@ public List<SegmentConversionResult> executeTask(PinotTaskConfig pinotTaskConfig
             TableNameBuilder.extractRawTableName(tableNameWithType));
         List<NameValuePair> parameters = Arrays.asList(enableParallelPushProtectionParameter, tableNameParameter);
 
-        SegmentConversionUtils
-            .uploadSegment(configs, httpHeaders, parameters, tableNameWithType, resultSegmentName, uploadURL,
-                convertedTarredSegmentFile);
+        pushSegment(tableNameWithType, pinotTaskConfig.getConfigs(), outputSegmentTarURI, httpHeaders, parameters);
+//        SegmentConversionUtils

Review Comment:
   Can you clean up this?



##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java:
##########
@@ -242,6 +256,10 @@ public List<SegmentConversionResult> executeTask(PinotTaskConfig pinotTaskConfig
             new BasicHeader(FileUploadDownloadClient.CustomHeaders.SEGMENT_ZK_METADATA_CUSTOM_MAP_MODIFIER,
                 segmentZKMetadataCustomMapModifier.toJsonString());
 
+        URI outputSegmentTarURI = moveSegmentToOutputPinotFS(pinotTaskConfig.getConfigs(), convertedTarredSegmentFile);
+        LOGGER.info("Moved generated segment from [{}] to location: [{}]",

Review Comment:
   What happens if the Minion somehow got killed (e.g. restart) after finish the segment move to outputPinotFS but before finishing the metadata push to controller?
   
   I think that this can potentially leave some files in the deep storage without any segment metadata. Do we have any design/solution to address this?



##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java:
##########
@@ -253,9 +271,10 @@ public List<SegmentConversionResult> executeTask(PinotTaskConfig pinotTaskConfig
             TableNameBuilder.extractRawTableName(tableNameWithType));
         List<NameValuePair> parameters = Arrays.asList(enableParallelPushProtectionParameter, tableNameParameter);
 
-        SegmentConversionUtils
-            .uploadSegment(configs, httpHeaders, parameters, tableNameWithType, resultSegmentName, uploadURL,
-                convertedTarredSegmentFile);
+        pushSegment(tableNameWithType, pinotTaskConfig.getConfigs(), outputSegmentTarURI, httpHeaders, parameters);

Review Comment:
   We can use `configs` instead of `pinotTaskConfig.getConfigs()`



##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java:
##########
@@ -242,6 +255,10 @@ public List<SegmentConversionResult> executeTask(PinotTaskConfig pinotTaskConfig
             new BasicHeader(FileUploadDownloadClient.CustomHeaders.SEGMENT_ZK_METADATA_CUSTOM_MAP_MODIFIER,
                 segmentZKMetadataCustomMapModifier.toJsonString());
 
+        URI outputSegmentTarURI = moveSegmentToOutputPinotFS(pinotTaskConfig.getConfigs(), convertedTarredSegmentFile);

Review Comment:
   +1 
   As @zhtaoxiang mentioned, we should NOT move the segments to deep storage when we do the segment tar file push. In that case, the controller will move the segment to the deep storage during the upload phase.



##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java:
##########
@@ -242,6 +256,10 @@ public List<SegmentConversionResult> executeTask(PinotTaskConfig pinotTaskConfig
             new BasicHeader(FileUploadDownloadClient.CustomHeaders.SEGMENT_ZK_METADATA_CUSTOM_MAP_MODIFIER,
                 segmentZKMetadataCustomMapModifier.toJsonString());
 
+        URI outputSegmentTarURI = moveSegmentToOutputPinotFS(pinotTaskConfig.getConfigs(), convertedTarredSegmentFile);

Review Comment:
   We can use `configs ` instead of `pinotTaskConfig.getConfigs()`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] zhtaoxiang commented on a diff in pull request #9825: Allow segment upload via Metadata in MergeRollup Minion task

Posted by GitBox <gi...@apache.org>.
zhtaoxiang commented on code in PR #9825:
URL: https://github.com/apache/pinot/pull/9825#discussion_r1038378843


##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java:
##########
@@ -276,6 +299,87 @@ public List<SegmentConversionResult> executeTask(PinotTaskConfig pinotTaskConfig
     }
   }
 
+  private void pushSegment(String tableName, Map<String, String> taskConfigs, URI outputSegmentTarURI,
+      List<Header> headers, List<NameValuePair> parameters) throws Exception {
+    String pushMode = taskConfigs.get(BatchConfigProperties.PUSH_MODE);
+    LOGGER.info("Trying to push Pinot segment with push mode {} from {}", pushMode, outputSegmentTarURI);
+
+    PushJobSpec pushJobSpec = new PushJobSpec();
+    pushJobSpec.setPushAttempts(DEFUALT_PUSH_ATTEMPTS);
+    pushJobSpec.setPushParallelism(DEFAULT_PUSH_PARALLELISM);
+    pushJobSpec.setPushRetryIntervalMillis(DEFAULT_PUSH_RETRY_INTERVAL_MILLIS);
+    pushJobSpec.setSegmentUriPrefix(taskConfigs.get(BatchConfigProperties.PUSH_SEGMENT_URI_PREFIX));
+    pushJobSpec.setSegmentUriSuffix(taskConfigs.get(BatchConfigProperties.PUSH_SEGMENT_URI_SUFFIX));
+
+    SegmentGenerationJobSpec spec = generatePushJobSpec(tableName, taskConfigs, pushJobSpec);
+
+    URI outputSegmentDirURI = null;

Review Comment:
   👍🏻



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] KKcorps commented on a diff in pull request #9825: Allow segment upload via Metadata in MergeRollup Minion task

Posted by GitBox <gi...@apache.org>.
KKcorps commented on code in PR #9825:
URL: https://github.com/apache/pinot/pull/9825#discussion_r1038094950


##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java:
##########
@@ -242,6 +256,17 @@ public List<SegmentConversionResult> executeTask(PinotTaskConfig pinotTaskConfig
             new BasicHeader(FileUploadDownloadClient.CustomHeaders.SEGMENT_ZK_METADATA_CUSTOM_MAP_MODIFIER,
                 segmentZKMetadataCustomMapModifier.toJsonString());
 
+        String pushMode = configs.get(BatchConfigProperties.PUSH_MODE);

Review Comment:
   It is same thing. I have borrowed it from the rest of the code base. Can we do it as part of seperate PR since it will touch other classes as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] zhtaoxiang commented on a diff in pull request #9825: Allow segment upload via Metadata in MergeRollup Minion task

Posted by GitBox <gi...@apache.org>.
zhtaoxiang commented on code in PR #9825:
URL: https://github.com/apache/pinot/pull/9825#discussion_r1038386220


##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java:
##########
@@ -67,7 +73,40 @@ static PinotFS getOutputPinotFS(Map<String, String> taskConfigs, URI fileURI)
     return PinotFSFactory.create(fileURIScheme);
   }
 
-  static PinotFS getLocalPinotFs() {
+  public static Map<String, String> getPushTaskConfig(String tableName, Map<String, String> taskConfigs,
+      ClusterInfoAccessor clusterInfoAccessor) {
+    try {
+      String pushMode = IngestionConfigUtils.getPushMode(taskConfigs);
+      Map<String, String> singleFileGenerationTaskConfig = new HashMap<>(taskConfigs);
+      if (pushMode == null || pushMode.contentEquals(BatchConfigProperties.SegmentPushType.TAR.toString())) {
+        singleFileGenerationTaskConfig.put(BatchConfigProperties.PUSH_MODE,
+            BatchConfigProperties.SegmentPushType.TAR.toString());
+      } else {
+        URI outputDirURI = URI.create(clusterInfoAccessor.getDataDir() + "/" + tableName);
+        String outputDirURIScheme = outputDirURI.getScheme();
+
+        if (!isLocalOutputDir(outputDirURIScheme)) {
+          singleFileGenerationTaskConfig.put(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI, outputDirURI.toString());
+          singleFileGenerationTaskConfig.put(BatchConfigProperties.PUSH_MODE, pushMode);

Review Comment:
   nit: should we set push_mode to `metadata` when the provided one is `URI`? and we should log this change.



##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java:
##########
@@ -276,6 +300,84 @@ public List<SegmentConversionResult> executeTask(PinotTaskConfig pinotTaskConfig
     }
   }
 
+  private void pushSegment(String tableName, Map<String, String> taskConfigs, URI outputSegmentTarURI,
+      List<Header> headers, List<NameValuePair> parameters) throws Exception {
+    String pushMode = taskConfigs.get(BatchConfigProperties.PUSH_MODE);
+    LOGGER.info("Trying to push Pinot segment with push mode {} from {}", pushMode, outputSegmentTarURI);
+
+    PushJobSpec pushJobSpec = new PushJobSpec();
+    pushJobSpec.setPushAttempts(DEFUALT_PUSH_ATTEMPTS);
+    pushJobSpec.setPushParallelism(DEFAULT_PUSH_PARALLELISM);
+    pushJobSpec.setPushRetryIntervalMillis(DEFAULT_PUSH_RETRY_INTERVAL_MILLIS);
+    pushJobSpec.setSegmentUriPrefix(taskConfigs.get(BatchConfigProperties.PUSH_SEGMENT_URI_PREFIX));
+    pushJobSpec.setSegmentUriSuffix(taskConfigs.get(BatchConfigProperties.PUSH_SEGMENT_URI_SUFFIX));
+
+    SegmentGenerationJobSpec spec = generatePushJobSpec(tableName, taskConfigs, pushJobSpec);
+
+    switch (BatchConfigProperties.SegmentPushType.valueOf(pushMode.toUpperCase())) {
+      case TAR:
+        try (PinotFS pinotFS = MinionTaskUtils.getLocalPinotFs()) {
+          SegmentPushUtils.pushSegments(
+              spec, pinotFS, Collections.singletonList(outputSegmentTarURI.toString()), headers, parameters);
+        } catch (RetriableOperationException | AttemptsExceededException e) {
+          throw new RuntimeException(e);
+        }
+        break;
+      case METADATA:
+        if (taskConfigs.containsKey(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI)) {
+          URI outputSegmentDirURI = URI.create(taskConfigs.get(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI));
+          try (PinotFS outputFileFS = MinionTaskUtils.getOutputPinotFS(taskConfigs, outputSegmentDirURI)) {
+            Map<String, String> segmentUriToTarPathMap =
+                SegmentPushUtils.getSegmentUriToTarPathMap(outputSegmentDirURI, pushJobSpec,
+                    new String[]{outputSegmentTarURI.toString()});
+            SegmentPushUtils.sendSegmentUriAndMetadata(spec, outputFileFS, segmentUriToTarPathMap, headers, parameters);
+          } catch (RetriableOperationException | AttemptsExceededException e) {
+            throw new RuntimeException(e);
+          }
+        } else {
+          throw new RuntimeException("Output dir URI missing for metadata push");
+        }
+        break;
+      default:
+        throw new UnsupportedOperationException("Unrecognized push mode - " + pushMode);
+    }
+  }
+
+  private SegmentGenerationJobSpec generatePushJobSpec(String tableName, Map<String, String> taskConfigs,
+      PushJobSpec pushJobSpec) {
+
+    TableSpec tableSpec = new TableSpec();
+    tableSpec.setTableName(tableName);
+
+    PinotClusterSpec pinotClusterSpec = new PinotClusterSpec();
+    pinotClusterSpec.setControllerURI(taskConfigs.get(BatchConfigProperties.PUSH_CONTROLLER_URI));
+    PinotClusterSpec[] pinotClusterSpecs = new PinotClusterSpec[]{pinotClusterSpec};
+
+    SegmentGenerationJobSpec spec = new SegmentGenerationJobSpec();
+    spec.setPushJobSpec(pushJobSpec);
+    spec.setTableSpec(tableSpec);
+    spec.setPinotClusterSpecs(pinotClusterSpecs);
+    spec.setAuthToken(taskConfigs.get(BatchConfigProperties.AUTH_TOKEN));
+
+    return spec;
+  }
+
+  private URI moveSegmentToOutputPinotFS(Map<String, String> taskConfigs, File localSegmentTarFile)
+      throws Exception {
+    URI outputSegmentDirURI = URI.create(taskConfigs.get(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI));
+    try (PinotFS outputFileFS = MinionTaskUtils.getOutputPinotFS(taskConfigs, outputSegmentDirURI)) {
+      URI outputSegmentTarURI = URI.create(outputSegmentDirURI + localSegmentTarFile.getName());
+      if (!Boolean.parseBoolean(taskConfigs.get(BatchConfigProperties.OVERWRITE_OUTPUT)) && outputFileFS.exists(
+          outputSegmentDirURI)) {

Review Comment:
   We should check existence of the segment (i.e. `outputSegmentDirURI + / + segmentName`) but not the `outputSegmentDirURI`, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] KKcorps commented on a diff in pull request #9825: Allow segment upload via Metadata in MergeRollup Minion task

Posted by GitBox <gi...@apache.org>.
KKcorps commented on code in PR #9825:
URL: https://github.com/apache/pinot/pull/9825#discussion_r1036047120


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java:
##########
@@ -97,6 +97,49 @@ public static URI generateSegmentTarURI(URI dirURI, URI fileURI, String prefix,
   public static void pushSegments(SegmentGenerationJobSpec spec, PinotFS fileSystem, List<String> tarFilePaths)
       throws RetriableOperationException, AttemptsExceededException {
     String tableName = spec.getTableSpec().getTableName();
+    AuthProvider authProvider = AuthProviderUtils.makeAuthProvider(spec.getAuthToken());

Review Comment:
   We need to pass this custom header in the new flow
   Older methods don't allow passing new headers
   ```java
           // Set segment ZK metadata custom map modifier into HTTP header to modify the segment ZK metadata
           SegmentZKMetadataCustomMapModifier segmentZKMetadataCustomMapModifier =
               getSegmentZKMetadataCustomMapModifier(pinotTaskConfig, segmentConversionResult);
           Header segmentZKMetadataCustomMapModifierHeader =
               new BasicHeader(FileUploadDownloadClient.CustomHeaders.SEGMENT_ZK_METADATA_CUSTOM_MAP_MODIFIER,
                   segmentZKMetadataCustomMapModifier.toJsonString());
   ```                



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] klsince commented on a diff in pull request #9825: Allow segment upload via Metadata in MergeRollup Minion task

Posted by GitBox <gi...@apache.org>.
klsince commented on code in PR #9825:
URL: https://github.com/apache/pinot/pull/9825#discussion_r1042529911


##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java:
##########
@@ -242,6 +256,17 @@ public List<SegmentConversionResult> executeTask(PinotTaskConfig pinotTaskConfig
             new BasicHeader(FileUploadDownloadClient.CustomHeaders.SEGMENT_ZK_METADATA_CUSTOM_MAP_MODIFIER,
                 segmentZKMetadataCustomMapModifier.toJsonString());
 
+        String pushMode = configs.get(BatchConfigProperties.PUSH_MODE);

Review Comment:
   the if-check at L259 and the switch-case at L317 assume pushMode must not be null, but some of my test tasks didn't set it so they failed with NPE.  
   
   So, should tasks always set pushMode explicitly going on, or add some null checks to be more defensive here? I assume we should do the latter to be backward compatible. Thoughts? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] KKcorps merged pull request #9825: Allow segment upload via Metadata in MergeRollup Minion task

Posted by GitBox <gi...@apache.org>.
KKcorps merged PR #9825:
URL: https://github.com/apache/pinot/pull/9825


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] klsince commented on a diff in pull request #9825: Allow segment upload via Metadata in MergeRollup Minion task

Posted by GitBox <gi...@apache.org>.
klsince commented on code in PR #9825:
URL: https://github.com/apache/pinot/pull/9825#discussion_r1042536215


##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java:
##########
@@ -242,6 +256,17 @@ public List<SegmentConversionResult> executeTask(PinotTaskConfig pinotTaskConfig
             new BasicHeader(FileUploadDownloadClient.CustomHeaders.SEGMENT_ZK_METADATA_CUSTOM_MAP_MODIFIER,
                 segmentZKMetadataCustomMapModifier.toJsonString());
 
+        String pushMode = configs.get(BatchConfigProperties.PUSH_MODE);

Review Comment:
   https://github.com/apache/pinot/pull/9935 pls help review if this makes sense



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] zhtaoxiang commented on a diff in pull request #9825: Allow segment upload via Metadata in MergeRollup Minion task

Posted by GitBox <gi...@apache.org>.
zhtaoxiang commented on code in PR #9825:
URL: https://github.com/apache/pinot/pull/9825#discussion_r1036364399


##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java:
##########
@@ -276,6 +299,87 @@ public List<SegmentConversionResult> executeTask(PinotTaskConfig pinotTaskConfig
     }
   }
 
+  private void pushSegment(String tableName, Map<String, String> taskConfigs, URI outputSegmentTarURI,
+      List<Header> headers, List<NameValuePair> parameters) throws Exception {
+    String pushMode = taskConfigs.get(BatchConfigProperties.PUSH_MODE);
+    LOGGER.info("Trying to push Pinot segment with push mode {} from {}", pushMode, outputSegmentTarURI);
+
+    PushJobSpec pushJobSpec = new PushJobSpec();
+    pushJobSpec.setPushAttempts(DEFUALT_PUSH_ATTEMPTS);
+    pushJobSpec.setPushParallelism(DEFAULT_PUSH_PARALLELISM);
+    pushJobSpec.setPushRetryIntervalMillis(DEFAULT_PUSH_RETRY_INTERVAL_MILLIS);
+    pushJobSpec.setSegmentUriPrefix(taskConfigs.get(BatchConfigProperties.PUSH_SEGMENT_URI_PREFIX));
+    pushJobSpec.setSegmentUriSuffix(taskConfigs.get(BatchConfigProperties.PUSH_SEGMENT_URI_SUFFIX));
+
+    SegmentGenerationJobSpec spec = generatePushJobSpec(tableName, taskConfigs, pushJobSpec);
+
+    URI outputSegmentDirURI = null;

Review Comment:
   Should we fail early is outputSegmentDirURI is null?



##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java:
##########
@@ -276,6 +299,87 @@ public List<SegmentConversionResult> executeTask(PinotTaskConfig pinotTaskConfig
     }
   }
 
+  private void pushSegment(String tableName, Map<String, String> taskConfigs, URI outputSegmentTarURI,
+      List<Header> headers, List<NameValuePair> parameters) throws Exception {
+    String pushMode = taskConfigs.get(BatchConfigProperties.PUSH_MODE);
+    LOGGER.info("Trying to push Pinot segment with push mode {} from {}", pushMode, outputSegmentTarURI);
+
+    PushJobSpec pushJobSpec = new PushJobSpec();
+    pushJobSpec.setPushAttempts(DEFUALT_PUSH_ATTEMPTS);
+    pushJobSpec.setPushParallelism(DEFAULT_PUSH_PARALLELISM);
+    pushJobSpec.setPushRetryIntervalMillis(DEFAULT_PUSH_RETRY_INTERVAL_MILLIS);
+    pushJobSpec.setSegmentUriPrefix(taskConfigs.get(BatchConfigProperties.PUSH_SEGMENT_URI_PREFIX));
+    pushJobSpec.setSegmentUriSuffix(taskConfigs.get(BatchConfigProperties.PUSH_SEGMENT_URI_SUFFIX));
+
+    SegmentGenerationJobSpec spec = generatePushJobSpec(tableName, taskConfigs, pushJobSpec);
+
+    URI outputSegmentDirURI = null;
+    if (taskConfigs.containsKey(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI)) {
+      outputSegmentDirURI = URI.create(taskConfigs.get(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI));
+    }
+    try (PinotFS outputFileFS = MinionTaskUtils.getOutputPinotFS(taskConfigs, outputSegmentDirURI)) {
+      switch (BatchConfigProperties.SegmentPushType.valueOf(pushMode.toUpperCase())) {
+        case TAR:
+          try (PinotFS pinotFS = MinionTaskUtils.getLocalPinotFs()) {
+            SegmentPushUtils.pushSegments(
+                spec, pinotFS, Arrays.asList(outputSegmentTarURI.toString()), headers, parameters);
+          } catch (RetriableOperationException | AttemptsExceededException e) {
+            throw new RuntimeException(e);
+          }
+          break;
+        case METADATA:
+          try {
+            Map<String, String> segmentUriToTarPathMap =
+                SegmentPushUtils.getSegmentUriToTarPathMap(outputSegmentDirURI, pushJobSpec,
+                    new String[]{outputSegmentTarURI.toString()});
+            SegmentPushUtils.sendSegmentUriAndMetadata(spec, outputFileFS, segmentUriToTarPathMap, headers, parameters);
+          } catch (RetriableOperationException | AttemptsExceededException e) {
+            throw new RuntimeException(e);
+          }
+          break;
+        default:
+          throw new UnsupportedOperationException("Unrecognized push mode - " + pushMode);
+      }
+    }
+  }
+
+  private SegmentGenerationJobSpec generatePushJobSpec(String tableName, Map<String, String> taskConfigs,
+      PushJobSpec pushJobSpec) {
+
+    TableSpec tableSpec = new TableSpec();
+    tableSpec.setTableName(tableName);
+
+    PinotClusterSpec pinotClusterSpec = new PinotClusterSpec();
+    pinotClusterSpec.setControllerURI(taskConfigs.get(BatchConfigProperties.PUSH_CONTROLLER_URI));
+    PinotClusterSpec[] pinotClusterSpecs = new PinotClusterSpec[]{pinotClusterSpec};
+
+    SegmentGenerationJobSpec spec = new SegmentGenerationJobSpec();
+    spec.setPushJobSpec(pushJobSpec);
+    spec.setTableSpec(tableSpec);
+    spec.setPinotClusterSpecs(pinotClusterSpecs);
+    spec.setAuthToken(taskConfigs.get(BatchConfigProperties.AUTH_TOKEN));
+
+    return spec;
+  }
+
+  private URI moveSegmentToOutputPinotFS(Map<String, String> taskConfigs, File localSegmentTarFile)
+      throws Exception {
+    if (!taskConfigs.containsKey(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI)) {

Review Comment:
   1/ I feel that the task generator should always make sure the output segment dir is provided?
   2/ using the local segment tar file does not help here, because most of the time, servers cannot download segment from it.



##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java:
##########
@@ -67,7 +72,35 @@ static PinotFS getOutputPinotFS(Map<String, String> taskConfigs, URI fileURI)
     return PinotFSFactory.create(fileURIScheme);
   }
 
-  static PinotFS getLocalPinotFs() {
+  public static Map<String, String> getPushTaskConfig(String tableName, Map<String, String> taskConfigs,
+      ClusterInfoAccessor clusterInfoAccessor) {
+    try {
+      URI outputDirURI = URI.create(clusterInfoAccessor.getDataDir() + "/" + tableName);

Review Comment:
   I agree that it is not really used in the case when push mode is TAR. However, I feel the current logic is a little bit hard to follow if we don't have the full context. 
   
   If we can check push mode first then decide what to do, it's easier to follow. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] KKcorps commented on a diff in pull request #9825: Allow segment upload via Metadata in MergeRollup Minion task

Posted by GitBox <gi...@apache.org>.
KKcorps commented on code in PR #9825:
URL: https://github.com/apache/pinot/pull/9825#discussion_r1038164529


##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java:
##########
@@ -67,7 +72,35 @@ static PinotFS getOutputPinotFS(Map<String, String> taskConfigs, URI fileURI)
     return PinotFSFactory.create(fileURIScheme);
   }
 
-  static PinotFS getLocalPinotFs() {
+  public static Map<String, String> getPushTaskConfig(String tableName, Map<String, String> taskConfigs,
+      ClusterInfoAccessor clusterInfoAccessor) {
+    try {
+      URI outputDirURI = URI.create(clusterInfoAccessor.getDataDir() + "/" + tableName);

Review Comment:
   Cool. Made the changes. A lot of the code used here has been borrowed from SegmentGenerationAndPushTask. I will also raise another PR to port some of these changes from here to there.



##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java:
##########
@@ -79,15 +81,18 @@ public class MergeRollupMinionClusterIntegrationTest extends BaseClusterIntegrat
   protected final File _segmentDir1 = new File(_tempDir, "segmentDir1");
   protected final File _segmentDir2 = new File(_tempDir, "segmentDir2");
   protected final File _segmentDir3 = new File(_tempDir, "segmentDir3");
+  protected final File _segmentDir4 = new File(_tempDir, "segmentDir4");
   protected final File _tarDir1 = new File(_tempDir, "tarDir1");
   protected final File _tarDir2 = new File(_tempDir, "tarDir2");
   protected final File _tarDir3 = new File(_tempDir, "tarDir3");
+  protected final File _tarDir4 = new File(_tempDir, "tarDir4");
 
   @BeforeClass
   public void setUp()
       throws Exception {
-    TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir1, _segmentDir2, _segmentDir3, _tarDir1, _tarDir2,
-        _tarDir3);
+    TestUtils.ensureDirectoriesExistAndEmpty(_tempDir,

Review Comment:
   fixed!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] KKcorps commented on a diff in pull request #9825: Allow segment upload via Metadata in MergeRollup Minion tas

Posted by GitBox <gi...@apache.org>.
KKcorps commented on code in PR #9825:
URL: https://github.com/apache/pinot/pull/9825#discussion_r1025584569


##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionPushUtils.java:
##########
@@ -0,0 +1,371 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.minion.tasks;
+
+import com.google.common.base.Preconditions;
+import java.io.File;
+import java.io.InputStream;
+import java.io.Serializable;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.FileSystems;
+import java.nio.file.PathMatcher;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.http.Header;
+import org.apache.http.NameValuePair;
+import org.apache.http.message.BasicHeader;
+import org.apache.pinot.common.exception.HttpErrorStatusException;
+import org.apache.pinot.common.utils.FileUploadDownloadClient;
+import org.apache.pinot.common.utils.SimpleHttpResponse;
+import org.apache.pinot.common.utils.TarGzCompressionUtils;
+import org.apache.pinot.common.utils.http.HttpClient;
+import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.creator.name.SegmentNameUtils;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import org.apache.pinot.spi.ingestion.batch.spec.Constants;
+import org.apache.pinot.spi.ingestion.batch.spec.PinotClusterSpec;
+import org.apache.pinot.spi.ingestion.batch.spec.PushJobSpec;
+import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
+import org.apache.pinot.spi.utils.retry.AttemptsExceededException;
+import org.apache.pinot.spi.utils.retry.RetriableOperationException;
+import org.apache.pinot.spi.utils.retry.RetryPolicies;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class MinionPushUtils implements Serializable {

Review Comment:
   Yeah, I will remove this. I created this class assuming a lot of changes might be required in the SegmentPushUtils methods but turns out that was not the case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] KKcorps commented on a diff in pull request #9825: Allow segment upload via Metadata in MergeRollup Minion task

Posted by GitBox <gi...@apache.org>.
KKcorps commented on code in PR #9825:
URL: https://github.com/apache/pinot/pull/9825#discussion_r1026059190


##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java:
##########
@@ -242,6 +256,10 @@ public List<SegmentConversionResult> executeTask(PinotTaskConfig pinotTaskConfig
             new BasicHeader(FileUploadDownloadClient.CustomHeaders.SEGMENT_ZK_METADATA_CUSTOM_MAP_MODIFIER,
                 segmentZKMetadataCustomMapModifier.toJsonString());
 
+        URI outputSegmentTarURI = moveSegmentToOutputPinotFS(pinotTaskConfig.getConfigs(), convertedTarredSegmentFile);
+        LOGGER.info("Moved generated segment from [{}] to location: [{}]",

Review Comment:
   One solution can be to always mention a seperate OUTPUT_DIR and always clean that dir in case the task fails. Even if we fail to clean the outputDir it should not be a problem if it is seperate from the deep store.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] zhtaoxiang commented on a diff in pull request #9825: Allow segment upload via Metadata in MergeRollup Minion task

Posted by GitBox <gi...@apache.org>.
zhtaoxiang commented on code in PR #9825:
URL: https://github.com/apache/pinot/pull/9825#discussion_r1026701592


##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java:
##########
@@ -571,6 +577,40 @@ private List<PinotTaskConfig> createPinotTaskConfigs(List<SegmentZKMetadata> sel
     return pinotTaskConfigs;
   }
 
+  private Map<String, String> getPushTaskConfig(Map<String, String> batchConfigMap, String downloadUrls) {
+    try {
+      String[] downloadURLList = downloadUrls.split(MinionConstants.SEGMENT_NAME_SEPARATOR);
+      if (downloadURLList.length > 0) {
+        String downloadUrl = downloadURLList[0];
+        URI downloadURI = URI.create(downloadUrl);
+        URI outputDirURI = null;
+        if (!downloadURI.getScheme().contentEquals("http")) {

Review Comment:
   If that's the goal, then we just need to use `controllerConfig.getDataDir()/tableName`. What do you think?



##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java:
##########
@@ -242,6 +256,10 @@ public List<SegmentConversionResult> executeTask(PinotTaskConfig pinotTaskConfig
             new BasicHeader(FileUploadDownloadClient.CustomHeaders.SEGMENT_ZK_METADATA_CUSTOM_MAP_MODIFIER,
                 segmentZKMetadataCustomMapModifier.toJsonString());
 
+        URI outputSegmentTarURI = moveSegmentToOutputPinotFS(pinotTaskConfig.getConfigs(), convertedTarredSegmentFile);
+        LOGGER.info("Moved generated segment from [{}] to location: [{}]",

Review Comment:
   Personally, I think this may not be a problem. Because Minion task should be idempotent, a later retry should generate segments with the same name and the existing ones will be reused or be override.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] KKcorps commented on a diff in pull request #9825: Allow segment upload via Metadata in MergeRollup Minion task

Posted by GitBox <gi...@apache.org>.
KKcorps commented on code in PR #9825:
URL: https://github.com/apache/pinot/pull/9825#discussion_r1027867544


##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java:
##########
@@ -571,6 +577,40 @@ private List<PinotTaskConfig> createPinotTaskConfigs(List<SegmentZKMetadata> sel
     return pinotTaskConfigs;
   }
 
+  private Map<String, String> getPushTaskConfig(Map<String, String> batchConfigMap, String downloadUrls) {
+    try {
+      String[] downloadURLList = downloadUrls.split(MinionConstants.SEGMENT_NAME_SEPARATOR);
+      if (downloadURLList.length > 0) {
+        String downloadUrl = downloadURLList[0];
+        URI downloadURI = URI.create(downloadUrl);
+        URI outputDirURI = null;
+        if (!downloadURI.getScheme().contentEquals("http")) {

Review Comment:
   Added the commit to use dataDir.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] zhtaoxiang commented on a diff in pull request #9825: Allow segment upload via Metadata in MergeRollup Minion task

Posted by GitBox <gi...@apache.org>.
zhtaoxiang commented on code in PR #9825:
URL: https://github.com/apache/pinot/pull/9825#discussion_r1028352023


##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java:
##########
@@ -67,7 +72,35 @@ static PinotFS getOutputPinotFS(Map<String, String> taskConfigs, URI fileURI)
     return PinotFSFactory.create(fileURIScheme);
   }
 
-  static PinotFS getLocalPinotFs() {
+  public static Map<String, String> getPushTaskConfig(String tableName, Map<String, String> taskConfigs,
+      ClusterInfoAccessor clusterInfoAccessor) {
+    try {
+      URI outputDirURI = URI.create(clusterInfoAccessor.getDataDir() + "/" + tableName);

Review Comment:
   I think we should check push mode first before deciding the outputDir? 
   
   If push mode is tar, we simply need to build segment locally and tar push to Pinot.
   
   Let me know if I misunderstood something



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org