You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2021/05/25 19:08:33 UTC

[GitHub] [incubator-pinot] jtao15 opened a new pull request #6975: Merge rollup executor enhancement

jtao15 opened a new pull request #6975:
URL: https://github.com/apache/incubator-pinot/pull/6975


   Part of #2715, depends on #6932.
   
    Merge/Rollup task executor enhancement:
       1. Wire SegmentProcessorFramework
       2. Wire segment replacement protocol
       3. Write bucket granularity info in resulted segment ZK metadata.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] snleee commented on a change in pull request #6975: Merge rollup executor enhancement

Posted by GitBox <gi...@apache.org>.
snleee commented on a change in pull request #6975:
URL: https://github.com/apache/incubator-pinot/pull/6975#discussion_r641728244



##########
File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
##########
@@ -138,11 +149,32 @@ protected void postProcess(PinotTaskConfig pinotTaskConfig) {
             taskType + " on table: " + tableNameWithType + ", segments: " + inputSegmentNames + " got cancelled");
       }
 
+      // Update the segment lineage to indicate that the segment replacement is in progress.
+      String lineageEntryId = null;
+      if (replaceSegmentsEnabled) {
+        List<String> segmentsFrom =
+            Arrays.stream(inputSegmentNames.split(",")).map(String::trim).collect(Collectors.toList());
+        List<String> segmentsTo =
+            segmentConversionResults.stream().map(SegmentConversionResult::getSegmentName).collect(Collectors.toList());
+        lineageEntryId = SegmentConversionUtils
+            .startSegmentReplace(tableNameWithType, uploadURL, new StartReplaceSegmentsRequest(segmentsFrom, segmentsTo));
+      }
+
       // Upload the tarred segments
       for (int i = 0; i < numOutputSegments; i++) {
         File convertedTarredSegmentFile = tarredSegmentFiles.get(i);
         String resultSegmentName = segmentConversionResults.get(i).getSegmentName();
 
+        // Set segment ZK metadata custom map modifier into HTTP header to modify the segment ZK metadata
+        SegmentZKMetadataCustomMapModifier segmentZKMetadataCustomMapModifier = getSegmentZKMetadataCustomMapModifier(pinotTaskConfig);
+        Header segmentZKMetadataCustomMapModifierHeader =
+            new BasicHeader(FileUploadDownloadClient.CustomHeaders.SEGMENT_ZK_METADATA_CUSTOM_MAP_MODIFIER,
+                segmentZKMetadataCustomMapModifier.toJsonString());
+
+        List<Header> httpHeaders = new ArrayList<>();
+        httpHeaders.add(segmentZKMetadataCustomMapModifierHeader);
+        httpHeaders.addAll(FileUploadDownloadClient.makeAuthHeader(authToken));

Review comment:
       I'm asking about `authToken` :) 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] snleee commented on a change in pull request #6975: Merge rollup executor enhancement

Posted by GitBox <gi...@apache.org>.
snleee commented on a change in pull request #6975:
URL: https://github.com/apache/incubator-pinot/pull/6975#discussion_r644255925



##########
File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/purge/PurgeTaskExecutor.java
##########
@@ -68,7 +68,7 @@ protected SegmentConversionResult convert(PinotTaskConfig pinotTaskConfig, File
 
   @Override
   protected SegmentZKMetadataCustomMapModifier getSegmentZKMetadataCustomMapModifier() {
-    return new SegmentZKMetadataCustomMapModifier(SegmentZKMetadataCustomMapModifier.ModifyMode.REPLACE, Collections
+    return new SegmentZKMetadataCustomMapModifier(SegmentZKMetadataCustomMapModifier.ModifyMode.UPDATE, Collections

Review comment:
       Let's add the test for `SegmentZKMetadataCustomMapModifier` in the future PR when we add the integration test for segment merge/roll-up task.
   
   Let's add `TODO` comment (e.g. `BaseMultipleSegmentsConversionExecutor`)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] snleee commented on a change in pull request #6975: Merge rollup executor enhancement

Posted by GitBox <gi...@apache.org>.
snleee commented on a change in pull request #6975:
URL: https://github.com/apache/incubator-pinot/pull/6975#discussion_r644342355



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/MergeRollupSegmentProcessor.java
##########
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.framework;
+
+import com.google.common.base.Preconditions;
+import java.io.File;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.core.segment.processing.collector.CollectorConfig;
+import org.apache.pinot.core.segment.processing.collector.CollectorFactory;
+import org.apache.pinot.core.segment.processing.collector.ValueAggregatorFactory;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+
+
+/**
+ * Merge rollup segment processor framework takes a list of segments and concatenates/rolls up segments based on
+ * the configuration.
+ *
+ * TODO:
+ *   1. Add the support for roll-up
+ *   2. Add the support to make result segments time aligned
+ *   3. Add merge/roll-up prefixes for result segments
+ */
+public class MergeRollupSegmentProcessor {

Review comment:
       Let's actually put this under `org.apche.pinot.core.minion` and Rename it to `MergeRollupConverter`.
   
   We can delete `org.apche.pinot.core.minion.rollup` and `org.apche.pinot.core.minion.segment` in the following PR as a clean-up.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] jtao15 commented on a change in pull request #6975: Merge rollup executor enhancement

Posted by GitBox <gi...@apache.org>.
jtao15 commented on a change in pull request #6975:
URL: https://github.com/apache/incubator-pinot/pull/6975#discussion_r644326007



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/MergeRollupSegmentProcessorFramework.java
##########
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.framework;
+
+import com.google.common.base.Preconditions;
+import java.io.File;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.core.segment.processing.collector.CollectorConfig;
+import org.apache.pinot.core.segment.processing.collector.CollectorFactory;
+import org.apache.pinot.core.segment.processing.collector.ValueAggregatorFactory;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+
+
+/**
+ * Merge rollup segment processor framework takes a list of segments and concatenates/rolls up segments based on
+ * the configuration.
+ *
+ * TODO:
+ *   1. Add the support for roll-up
+ *   2. Add the support to make result segments time aligned
+ *   3. Add merge/roll-up prefixes for result segments
+ */
+public class MergeRollupSegmentProcessorFramework {
+  private List<File> _originalIndexDirs;
+  private final File _workingDir;
+  private final String _inputSegmentDir;
+  private final String _outputSegmentDir;
+  private final SegmentProcessorConfig _segmentProcessorConfig;
+
+  private MergeRollupSegmentProcessorFramework(TableConfig tableConfig, Schema schema, String mergeType,

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] jtao15 commented on a change in pull request #6975: Merge rollup executor enhancement

Posted by GitBox <gi...@apache.org>.
jtao15 commented on a change in pull request #6975:
URL: https://github.com/apache/incubator-pinot/pull/6975#discussion_r641733204



##########
File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
##########
@@ -138,11 +149,32 @@ protected void postProcess(PinotTaskConfig pinotTaskConfig) {
             taskType + " on table: " + tableNameWithType + ", segments: " + inputSegmentNames + " got cancelled");
       }
 
+      // Update the segment lineage to indicate that the segment replacement is in progress.
+      String lineageEntryId = null;
+      if (replaceSegmentsEnabled) {
+        List<String> segmentsFrom =
+            Arrays.stream(inputSegmentNames.split(",")).map(String::trim).collect(Collectors.toList());
+        List<String> segmentsTo =
+            segmentConversionResults.stream().map(SegmentConversionResult::getSegmentName).collect(Collectors.toList());
+        lineageEntryId = SegmentConversionUtils
+            .startSegmentReplace(tableNameWithType, uploadURL, new StartReplaceSegmentsRequest(segmentsFrom, segmentsTo));
+      }
+
       // Upload the tarred segments
       for (int i = 0; i < numOutputSegments; i++) {
         File convertedTarredSegmentFile = tarredSegmentFiles.get(i);
         String resultSegmentName = segmentConversionResults.get(i).getSegmentName();
 
+        // Set segment ZK metadata custom map modifier into HTTP header to modify the segment ZK metadata
+        SegmentZKMetadataCustomMapModifier segmentZKMetadataCustomMapModifier = getSegmentZKMetadataCustomMapModifier(pinotTaskConfig);
+        Header segmentZKMetadataCustomMapModifierHeader =
+            new BasicHeader(FileUploadDownloadClient.CustomHeaders.SEGMENT_ZK_METADATA_CUSTOM_MAP_MODIFIER,
+                segmentZKMetadataCustomMapModifier.toJsonString());
+
+        List<Header> httpHeaders = new ArrayList<>();
+        httpHeaders.add(segmentZKMetadataCustomMapModifierHeader);
+        httpHeaders.addAll(FileUploadDownloadClient.makeAuthHeader(authToken));

Review comment:
       Oh, `authToken` was passed before as well.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] codecov-commenter edited a comment on pull request #6975: Merge rollup executor enhancement

Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #6975:
URL: https://github.com/apache/incubator-pinot/pull/6975#issuecomment-848278058


   # [Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6975?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#6975](https://codecov.io/gh/apache/incubator-pinot/pull/6975?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (ea4d245) into [master](https://codecov.io/gh/apache/incubator-pinot/commit/4944731579de1b17be23404618e1bb4dd8e5e511?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (4944731) will **decrease** coverage by `0.08%`.
   > The diff coverage is `59.21%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-pinot/pull/6975/graphs/tree.svg?width=650&height=150&src=pr&token=4ibza2ugkz&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/incubator-pinot/pull/6975?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #6975      +/-   ##
   ============================================
   - Coverage     73.20%   73.11%   -0.09%     
     Complexity       12       12              
   ============================================
     Files          1451     1452       +1     
     Lines         71832    71969     +137     
     Branches      10415    10423       +8     
   ============================================
   + Hits          52585    52621      +36     
   - Misses        15715    15817     +102     
   + Partials       3532     3531       -1     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | integration | `41.73% <10.52%> (-0.17%)` | :arrow_down: |
   | unittests | `65.37% <48.68%> (+<0.01%)` | :arrow_up: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-pinot/pull/6975?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [.../org/apache/pinot/core/common/MinionConstants.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9jb21tb24vTWluaW9uQ29uc3RhbnRzLmphdmE=) | `0.00% <ø> (ø)` | |
   | [...he/pinot/plugin/minion/tasks/BaseTaskExecutor.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1taW5pb24tdGFza3MvcGlub3QtbWluaW9uLWJ1aWx0aW4tdGFza3Mvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3Bpbm90L3BsdWdpbi9taW5pb24vdGFza3MvQmFzZVRhc2tFeGVjdXRvci5qYXZh) | `83.33% <ø> (ø)` | |
   | [...ot/plugin/minion/tasks/SegmentConversionUtils.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1taW5pb24tdGFza3MvcGlub3QtbWluaW9uLWJ1aWx0aW4tdGFza3Mvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3Bpbm90L3BsdWdpbi9taW5pb24vdGFza3MvU2VnbWVudENvbnZlcnNpb25VdGlscy5qYXZh) | `32.14% <0.00%> (-20.80%)` | :arrow_down: |
   | [...rt\_to\_raw\_index/ConvertToRawIndexTaskExecutor.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1taW5pb24tdGFza3MvcGlub3QtbWluaW9uLWJ1aWx0aW4tdGFza3Mvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3Bpbm90L3BsdWdpbi9taW5pb24vdGFza3MvY29udmVydF90b19yYXdfaW5kZXgvQ29udmVydFRvUmF3SW5kZXhUYXNrRXhlY3V0b3IuamF2YQ==) | `100.00% <ø> (ø)` | |
   | [...t/plugin/minion/tasks/purge/PurgeTaskExecutor.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1taW5pb24tdGFza3MvcGlub3QtbWluaW9uLWJ1aWx0aW4tdGFza3Mvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3Bpbm90L3BsdWdpbi9taW5pb24vdGFza3MvcHVyZ2UvUHVyZ2VUYXNrRXhlY3V0b3IuamF2YQ==) | `70.83% <0.00%> (ø)` | |
   | [...and\_push/SegmentGenerationAndPushTaskExecutor.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1taW5pb24tdGFza3MvcGlub3QtbWluaW9uLWJ1aWx0aW4tdGFza3Mvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3Bpbm90L3BsdWdpbi9taW5pb24vdGFza3Mvc2VnbWVudF9nZW5lcmF0aW9uX2FuZF9wdXNoL1NlZ21lbnRHZW5lcmF0aW9uQW5kUHVzaFRhc2tFeGVjdXRvci5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [...e/pinot/common/utils/FileUploadDownloadClient.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vdXRpbHMvRmlsZVVwbG9hZERvd25sb2FkQ2xpZW50LmphdmE=) | `53.98% <5.26%> (-4.40%)` | :arrow_down: |
   | [.../tasks/BaseMultipleSegmentsConversionExecutor.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1taW5pb24tdGFza3MvcGlub3QtbWluaW9uLWJ1aWx0aW4tdGFza3Mvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3Bpbm90L3BsdWdpbi9taW5pb24vdGFza3MvQmFzZU11bHRpcGxlU2VnbWVudHNDb252ZXJzaW9uRXhlY3V0b3IuamF2YQ==) | `81.01% <57.14%> (-8.82%)` | :arrow_down: |
   | [...on/tasks/merge\_rollup/MergeRollupTaskExecutor.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1taW5pb24tdGFza3MvcGlub3QtbWluaW9uLWJ1aWx0aW4tdGFza3Mvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3Bpbm90L3BsdWdpbi9taW5pb24vdGFza3MvbWVyZ2Vfcm9sbHVwL01lcmdlUm9sbHVwVGFza0V4ZWN1dG9yLmphdmE=) | `88.57% <82.60%> (-6.89%)` | :arrow_down: |
   | [...apache/pinot/core/minion/MergeRollupConverter.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9taW5pb24vTWVyZ2VSb2xsdXBDb252ZXJ0ZXIuamF2YQ==) | `88.52% <88.52%> (ø)` | |
   | ... and [37 more](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6975?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6975?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [4944731...ea4d245](https://codecov.io/gh/apache/incubator-pinot/pull/6975?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] snleee commented on a change in pull request #6975: Merge rollup executor enhancement

Posted by GitBox <gi...@apache.org>.
snleee commented on a change in pull request #6975:
URL: https://github.com/apache/incubator-pinot/pull/6975#discussion_r644252489



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/MergeRollupSegmentProcessorFramework.java
##########
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.framework;
+
+import com.google.common.base.Preconditions;
+import java.io.File;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.core.segment.processing.collector.CollectorConfig;
+import org.apache.pinot.core.segment.processing.collector.CollectorFactory;
+import org.apache.pinot.core.segment.processing.collector.ValueAggregatorFactory;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+
+
+/**
+ * Merge rollup segment processor framework takes a list of segments and concatenates/rolls up segments based on
+ * the configuration.
+ *
+ * TODO:
+ *   1. Add the support for roll-up
+ *   2. Add the support to make result segments time aligned
+ *   3. Add merge/roll-up prefixes for result segments
+ */
+public class MergeRollupSegmentProcessorFramework {
+  private List<File> _originalIndexDirs;
+  private final File _workingDir;
+  private final String _inputSegmentDir;
+  private final String _outputSegmentDir;
+  private final SegmentProcessorConfig _segmentProcessorConfig;
+
+  private MergeRollupSegmentProcessorFramework(TableConfig tableConfig, Schema schema, String mergeType,

Review comment:
       Let's rename this to `MergeRollupSegmentProcessor`. 

##########
File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
##########
@@ -138,11 +149,32 @@ protected void postProcess(PinotTaskConfig pinotTaskConfig) {
             taskType + " on table: " + tableNameWithType + ", segments: " + inputSegmentNames + " got cancelled");
       }
 
+      // Update the segment lineage to indicate that the segment replacement is in progress.
+      String lineageEntryId = null;
+      if (replaceSegmentsEnabled) {
+        List<String> segmentsFrom =
+            Arrays.stream(inputSegmentNames.split(",")).map(String::trim).collect(Collectors.toList());
+        List<String> segmentsTo =
+            segmentConversionResults.stream().map(SegmentConversionResult::getSegmentName).collect(Collectors.toList());
+        lineageEntryId = SegmentConversionUtils
+            .startSegmentReplace(tableNameWithType, uploadURL, new StartReplaceSegmentsRequest(segmentsFrom, segmentsTo));
+      }
+
       // Upload the tarred segments
       for (int i = 0; i < numOutputSegments; i++) {
         File convertedTarredSegmentFile = tarredSegmentFiles.get(i);
         String resultSegmentName = segmentConversionResults.get(i).getSegmentName();
 
+        // Set segment ZK metadata custom map modifier into HTTP header to modify the segment ZK metadata
+        SegmentZKMetadataCustomMapModifier segmentZKMetadataCustomMapModifier = getSegmentZKMetadataCustomMapModifier(pinotTaskConfig);

Review comment:
       Does this mean that we will always modify the segmentZKMetadata? 
   
   How do we control when to modify when not to modify? (e.g. based on our design, there are some cases that we should not mark the granularity)

##########
File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/purge/PurgeTaskExecutor.java
##########
@@ -68,7 +68,7 @@ protected SegmentConversionResult convert(PinotTaskConfig pinotTaskConfig, File
 
   @Override
   protected SegmentZKMetadataCustomMapModifier getSegmentZKMetadataCustomMapModifier() {
-    return new SegmentZKMetadataCustomMapModifier(SegmentZKMetadataCustomMapModifier.ModifyMode.REPLACE, Collections
+    return new SegmentZKMetadataCustomMapModifier(SegmentZKMetadataCustomMapModifier.ModifyMode.UPDATE, Collections

Review comment:
       Let's add the test for `SegmentZKMetadataCustomMapModifier` in the future PR when we add the integration test for segment merge/roll-up task.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] jtao15 commented on a change in pull request #6975: Merge rollup executor enhancement

Posted by GitBox <gi...@apache.org>.
jtao15 commented on a change in pull request #6975:
URL: https://github.com/apache/incubator-pinot/pull/6975#discussion_r641692165



##########
File path: pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
##########
@@ -172,7 +185,7 @@ public static URI getDeleteSegmentHttpUri(String host, int port, String rawTable
       String tableType)
       throws URISyntaxException {
     return new URI(StringUtil.join("/", StringUtils.chomp(HTTP + "://" + host + ":" + port, "/"), OLD_SEGMENT_PATH,
-        rawTableName + "/" + URIUtils.encode(segmentName) + TYPE_DELIMITER + tableType));
+        rawTableName + "/" + URIUtils.encode(segmentName) + "?" + TYPE_DELIMITER + tableType));

Review comment:
       I'm changing `TYPE_DELIMITER` because I need to use `type=` instead of `?type=`. The current `getURI(String protocol, String host, int port, String path)` will quote `?`, I created another `getURI()` function to get the correct uri for start/end segment replacement api. Maybe we can merge theses two `getURI()` function.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] codecov-commenter commented on pull request #6975: Merge rollup executor enhancement

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on pull request #6975:
URL: https://github.com/apache/incubator-pinot/pull/6975#issuecomment-848278058


   # [Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6975?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#6975](https://codecov.io/gh/apache/incubator-pinot/pull/6975?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (374ba8e) into [master](https://codecov.io/gh/apache/incubator-pinot/commit/3c7dfcd8d65bbfeceb4af77d23298515251eaae6?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (3c7dfcd) will **decrease** coverage by `0.04%`.
   > The diff coverage is `27.98%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-pinot/pull/6975/graphs/tree.svg?width=650&height=150&src=pr&token=4ibza2ugkz&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/incubator-pinot/pull/6975?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #6975      +/-   ##
   ============================================
   - Coverage     73.23%   73.19%   -0.05%     
     Complexity       12       12              
   ============================================
     Files          1437     1441       +4     
     Lines         71320    71523     +203     
     Branches      10332    10354      +22     
   ============================================
   + Hits          52230    52349     +119     
   - Misses        15586    15661      +75     
   - Partials       3504     3513       +9     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | integration | `41.94% <5.04%> (+0.02%)` | :arrow_up: |
   | unittests | `65.21% <22.93%> (-0.10%)` | :arrow_down: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-pinot/pull/6975?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...e/pinot/common/minion/MergeRollupTaskMetadata.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vbWluaW9uL01lcmdlUm9sbHVwVGFza01ldGFkYXRhLmphdmE=) | `0.00% <0.00%> (ø)` | |
   | [...e/pinot/common/minion/MinionTaskMetadataUtils.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vbWluaW9uL01pbmlvblRhc2tNZXRhZGF0YVV0aWxzLmphdmE=) | `50.00% <0.00%> (-25.00%)` | :arrow_down: |
   | [...troller/helix/core/minion/ClusterInfoAccessor.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29udHJvbGxlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29udHJvbGxlci9oZWxpeC9jb3JlL21pbmlvbi9DbHVzdGVySW5mb0FjY2Vzc29yLmphdmE=) | `71.42% <0.00%> (-8.58%)` | :arrow_down: |
   | [.../org/apache/pinot/core/common/MinionConstants.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9jb21tb24vTWluaW9uQ29uc3RhbnRzLmphdmE=) | `0.00% <ø> (ø)` | |
   | [...ot/plugin/minion/tasks/SegmentConversionUtils.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1taW5pb24tdGFza3MvcGlub3QtbWluaW9uLWJ1aWx0aW4tdGFza3Mvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3Bpbm90L3BsdWdpbi9taW5pb24vdGFza3MvU2VnbWVudENvbnZlcnNpb25VdGlscy5qYXZh) | `32.14% <0.00%> (-20.80%)` | :arrow_down: |
   | [...t/plugin/minion/tasks/purge/PurgeTaskExecutor.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1taW5pb24tdGFza3MvcGlub3QtbWluaW9uLWJ1aWx0aW4tdGFza3Mvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3Bpbm90L3BsdWdpbi9taW5pb24vdGFza3MvcHVyZ2UvUHVyZ2VUYXNrRXhlY3V0b3IuamF2YQ==) | `70.83% <0.00%> (ø)` | |
   | [...e/pinot/common/utils/FileUploadDownloadClient.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vdXRpbHMvRmlsZVVwbG9hZERvd25sb2FkQ2xpZW50LmphdmE=) | `53.98% <5.26%> (-4.40%)` | :arrow_down: |
   | [...inion/tasks/merge\_rollup/MergeRollupTaskUtils.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1taW5pb24tdGFza3MvcGlub3QtbWluaW9uLWJ1aWx0aW4tdGFza3Mvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3Bpbm90L3BsdWdpbi9taW5pb24vdGFza3MvbWVyZ2Vfcm9sbHVwL01lcmdlUm9sbHVwVGFza1V0aWxzLmphdmE=) | `11.11% <11.11%> (ø)` | |
   | [.../tasks/BaseMultipleSegmentsConversionExecutor.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1taW5pb24tdGFza3MvcGlub3QtbWluaW9uLWJ1aWx0aW4tdGFza3Mvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3Bpbm90L3BsdWdpbi9taW5pb24vdGFza3MvQmFzZU11bHRpcGxlU2VnbWVudHNDb252ZXJzaW9uRXhlY3V0b3IuamF2YQ==) | `80.51% <50.00%> (-9.32%)` | :arrow_down: |
   | [...apache/pinot/core/minion/rollup/CollectorType.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9taW5pb24vcm9sbHVwL0NvbGxlY3RvclR5cGUuamF2YQ==) | `60.00% <57.14%> (ø)` | |
   | ... and [40 more](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6975?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6975?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [3c7dfcd...374ba8e](https://codecov.io/gh/apache/incubator-pinot/pull/6975?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] snleee commented on a change in pull request #6975: Merge rollup executor enhancement

Posted by GitBox <gi...@apache.org>.
snleee commented on a change in pull request #6975:
URL: https://github.com/apache/incubator-pinot/pull/6975#discussion_r641760045



##########
File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/merge_rollup/MergeRollupTaskExecutor.java
##########
@@ -40,40 +53,133 @@
  * TODO:
  *   1. Add the support for roll-up
  *   2. Add the support for time split to provide backfill support for merged segments
- *   3. Change the way to decide the number of output segments (explicit numPartition config -> maxNumRowsPerSegment)
+ *   3. Add merge/rollup name prefixes for generated segments
+ *   4. Add the support for realtime table
  */
 public class MergeRollupTaskExecutor extends BaseMultipleSegmentsConversionExecutor {
   private static final Logger LOGGER = LoggerFactory.getLogger(MergeRollupTaskExecutor.class);
+  private static final String INPUT_SEGMENTS_DIR = "input_segments";
+  private static final String OUTPUT_SEGMENTS_DIR = "output_segments";
 
   @Override
   protected List<SegmentConversionResult> convert(PinotTaskConfig pinotTaskConfig, List<File> originalIndexDirs,
       File workingDir)
       throws Exception {
+    String taskType = pinotTaskConfig.getTaskType();
     Map<String, String> configs = pinotTaskConfig.getConfigs();
+    LOGGER.info("Starting task: {} with configs: {}", taskType, configs);
+    long startMillis = System.currentTimeMillis();
+
     String mergeTypeString = configs.get(MinionConstants.MergeRollupTask.MERGE_TYPE_KEY);
     // TODO: add the support for rollup
     Preconditions.checkNotNull(mergeTypeString, "MergeType cannot be null");
 
-    MergeType mergeType = MergeType.fromString(mergeTypeString);
-    Preconditions.checkState(mergeType == MergeType.CONCATENATE, "Only 'CONCATENATE' mode is currently supported.");
-
-    String mergedSegmentName = configs.get(MinionConstants.MergeRollupTask.MERGED_SEGMENT_NAME_KEY);
     String tableNameWithType = configs.get(MinionConstants.TABLE_NAME_KEY);
-
     TableConfig tableConfig = getTableConfig(tableNameWithType);
+    Schema schema = getSchema(tableNameWithType);
+    Set<String> schemaColumns = schema.getPhysicalColumnNames();
+
+    Map<String, ValueAggregatorFactory.ValueAggregatorType> aggregatorConfigs =
+        MergeRollupTaskUtils.getRollupAggregationTypeMap(configs);
+    String numRecordsPerSegment = configs.get(MinionConstants.MergeRollupTask.MAX_NUM_RECORDS_PER_SEGMENT);
+
+    SegmentProcessorConfig.Builder segmentProcessorConfigBuilder =
+        new SegmentProcessorConfig.Builder().setTableConfig(tableConfig).setSchema(schema);
+
+    // Partition config from tableConfig
+    if (tableConfig.getIndexingConfig().getSegmentPartitionConfig() != null) {
+      Map<String, ColumnPartitionConfig> columnPartitionMap =
+          tableConfig.getIndexingConfig().getSegmentPartitionConfig().getColumnPartitionMap();
+      PartitionerConfig partitionerConfig = getPartitionerConfig(columnPartitionMap, tableNameWithType, schemaColumns);
+      segmentProcessorConfigBuilder.setPartitionerConfigs(Lists.newArrayList(partitionerConfig));
+    }
 
-    MergeRollupSegmentConverter rollupSegmentConverter =
-        new MergeRollupSegmentConverter.Builder().setMergeType(mergeType).setTableName(tableNameWithType)
-            .setSegmentName(mergedSegmentName).setInputIndexDirs(originalIndexDirs).setWorkingDir(workingDir)
-            .setTableConfig(tableConfig).build();
+    // Aggregations using configured Collector
+    List<String> sortedColumns = tableConfig.getIndexingConfig().getSortedColumn();
+    CollectorConfig collectorConfig =
+        getCollectorConfig(mergeTypeString, aggregatorConfigs, schemaColumns, sortedColumns);
+    Preconditions.checkState(collectorConfig.getCollectorType() == CollectorFactory.CollectorType.CONCAT,
+        "Only 'CONCAT' mode is currently supported.");
+    segmentProcessorConfigBuilder.setCollectorConfig(collectorConfig);
 
-    List<File> resultFiles = rollupSegmentConverter.convert();
+    // Segment config
+    if (numRecordsPerSegment != null) {
+      SegmentConfig segmentConfig = getSegmentConfig(numRecordsPerSegment);
+      segmentProcessorConfigBuilder.setSegmentConfig(segmentConfig);
+    }
+
+    SegmentProcessorConfig segmentProcessorConfig = segmentProcessorConfigBuilder.build();
+
+    File inputSegmentsDir = new File(workingDir, INPUT_SEGMENTS_DIR);
+    Preconditions.checkState(inputSegmentsDir.mkdirs(), "Failed to create input directory: %s for task: %s",
+        inputSegmentsDir.getAbsolutePath(), taskType);
+    for (File indexDir : originalIndexDirs) {
+      FileUtils.copyDirectoryToDirectory(indexDir, inputSegmentsDir);
+    }
+    File outputSegmentsDir = new File(workingDir, OUTPUT_SEGMENTS_DIR);
+    Preconditions.checkState(outputSegmentsDir.mkdirs(), "Failed to create output directory: %s for task: %s",
+        outputSegmentsDir.getAbsolutePath(), taskType);
+
+    SegmentProcessorFramework segmentProcessorFramework =
+        new SegmentProcessorFramework(inputSegmentsDir, segmentProcessorConfig, outputSegmentsDir);
+    try {
+      segmentProcessorFramework.processSegments();
+    } finally {
+      segmentProcessorFramework.cleanup();
+    }
+
+    long endMillis = System.currentTimeMillis();
+    LOGGER.info("Finished task: {} with configs: {}. Total time: {}ms", taskType, configs, (endMillis - startMillis));
     List<SegmentConversionResult> results = new ArrayList<>();
-    for (File file : resultFiles) {
+    for (File file : outputSegmentsDir.listFiles()) {
       String outputSegmentName = file.getName();
       results.add(new SegmentConversionResult.Builder().setFile(file).setSegmentName(outputSegmentName)
           .setTableNameWithType(tableNameWithType).build());
     }
     return results;
   }
+
+  @Override
+  protected SegmentZKMetadataCustomMapModifier getSegmentZKMetadataCustomMapModifier(PinotTaskConfig pinotTaskConfig) {
+    return new SegmentZKMetadataCustomMapModifier(SegmentZKMetadataCustomMapModifier.ModifyMode.UPDATE, Collections
+        .singletonMap(MinionConstants.MergeRollupTask.TASK_TYPE + MinionConstants.TASK_BUCKET_GRANULARITY_SUFFIX,
+            pinotTaskConfig.getConfigs().get(MinionConstants.MergeRollupTask.GRANULARITY_KEY)));

Review comment:
       Do we have a guarantee that granularity values are always upper case (e.g. `DAILY`, `HOURLY`) ? 
   
   I think that the following input should all work: `hourly, Hourly, HOURLY...etc`

##########
File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/merge_rollup/MergeRollupTaskExecutor.java
##########
@@ -40,40 +53,133 @@
  * TODO:
  *   1. Add the support for roll-up
  *   2. Add the support for time split to provide backfill support for merged segments
- *   3. Change the way to decide the number of output segments (explicit numPartition config -> maxNumRowsPerSegment)
+ *   3. Add merge/rollup name prefixes for generated segments
+ *   4. Add the support for realtime table
  */
 public class MergeRollupTaskExecutor extends BaseMultipleSegmentsConversionExecutor {
   private static final Logger LOGGER = LoggerFactory.getLogger(MergeRollupTaskExecutor.class);
+  private static final String INPUT_SEGMENTS_DIR = "input_segments";
+  private static final String OUTPUT_SEGMENTS_DIR = "output_segments";
 
   @Override
   protected List<SegmentConversionResult> convert(PinotTaskConfig pinotTaskConfig, List<File> originalIndexDirs,
       File workingDir)
       throws Exception {
+    String taskType = pinotTaskConfig.getTaskType();
     Map<String, String> configs = pinotTaskConfig.getConfigs();
+    LOGGER.info("Starting task: {} with configs: {}", taskType, configs);
+    long startMillis = System.currentTimeMillis();
+
     String mergeTypeString = configs.get(MinionConstants.MergeRollupTask.MERGE_TYPE_KEY);
     // TODO: add the support for rollup
     Preconditions.checkNotNull(mergeTypeString, "MergeType cannot be null");
 
-    MergeType mergeType = MergeType.fromString(mergeTypeString);
-    Preconditions.checkState(mergeType == MergeType.CONCATENATE, "Only 'CONCATENATE' mode is currently supported.");
-
-    String mergedSegmentName = configs.get(MinionConstants.MergeRollupTask.MERGED_SEGMENT_NAME_KEY);
     String tableNameWithType = configs.get(MinionConstants.TABLE_NAME_KEY);
-
     TableConfig tableConfig = getTableConfig(tableNameWithType);
+    Schema schema = getSchema(tableNameWithType);
+    Set<String> schemaColumns = schema.getPhysicalColumnNames();
+
+    Map<String, ValueAggregatorFactory.ValueAggregatorType> aggregatorConfigs =
+        MergeRollupTaskUtils.getRollupAggregationTypeMap(configs);
+    String numRecordsPerSegment = configs.get(MinionConstants.MergeRollupTask.MAX_NUM_RECORDS_PER_SEGMENT);
+
+    SegmentProcessorConfig.Builder segmentProcessorConfigBuilder =
+        new SegmentProcessorConfig.Builder().setTableConfig(tableConfig).setSchema(schema);
+
+    // Partition config from tableConfig
+    if (tableConfig.getIndexingConfig().getSegmentPartitionConfig() != null) {
+      Map<String, ColumnPartitionConfig> columnPartitionMap =
+          tableConfig.getIndexingConfig().getSegmentPartitionConfig().getColumnPartitionMap();
+      PartitionerConfig partitionerConfig = getPartitionerConfig(columnPartitionMap, tableNameWithType, schemaColumns);
+      segmentProcessorConfigBuilder.setPartitionerConfigs(Lists.newArrayList(partitionerConfig));
+    }
 
-    MergeRollupSegmentConverter rollupSegmentConverter =
-        new MergeRollupSegmentConverter.Builder().setMergeType(mergeType).setTableName(tableNameWithType)
-            .setSegmentName(mergedSegmentName).setInputIndexDirs(originalIndexDirs).setWorkingDir(workingDir)
-            .setTableConfig(tableConfig).build();
+    // Aggregations using configured Collector
+    List<String> sortedColumns = tableConfig.getIndexingConfig().getSortedColumn();
+    CollectorConfig collectorConfig =
+        getCollectorConfig(mergeTypeString, aggregatorConfigs, schemaColumns, sortedColumns);
+    Preconditions.checkState(collectorConfig.getCollectorType() == CollectorFactory.CollectorType.CONCAT,
+        "Only 'CONCAT' mode is currently supported.");
+    segmentProcessorConfigBuilder.setCollectorConfig(collectorConfig);
 
-    List<File> resultFiles = rollupSegmentConverter.convert();
+    // Segment config
+    if (numRecordsPerSegment != null) {

Review comment:
       Let's parse the string to integer here instead of passing string value to `getSegmentConfig()`.  Also, we can use `1000000` as the default value.

##########
File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/merge_rollup/MergeRollupTaskExecutor.java
##########
@@ -40,40 +53,133 @@
  * TODO:
  *   1. Add the support for roll-up
  *   2. Add the support for time split to provide backfill support for merged segments
- *   3. Change the way to decide the number of output segments (explicit numPartition config -> maxNumRowsPerSegment)
+ *   3. Add merge/rollup name prefixes for generated segments
+ *   4. Add the support for realtime table
  */
 public class MergeRollupTaskExecutor extends BaseMultipleSegmentsConversionExecutor {
   private static final Logger LOGGER = LoggerFactory.getLogger(MergeRollupTaskExecutor.class);
+  private static final String INPUT_SEGMENTS_DIR = "input_segments";
+  private static final String OUTPUT_SEGMENTS_DIR = "output_segments";
 
   @Override
   protected List<SegmentConversionResult> convert(PinotTaskConfig pinotTaskConfig, List<File> originalIndexDirs,
       File workingDir)
       throws Exception {
+    String taskType = pinotTaskConfig.getTaskType();
     Map<String, String> configs = pinotTaskConfig.getConfigs();
+    LOGGER.info("Starting task: {} with configs: {}", taskType, configs);
+    long startMillis = System.currentTimeMillis();
+
     String mergeTypeString = configs.get(MinionConstants.MergeRollupTask.MERGE_TYPE_KEY);
     // TODO: add the support for rollup
     Preconditions.checkNotNull(mergeTypeString, "MergeType cannot be null");
 
-    MergeType mergeType = MergeType.fromString(mergeTypeString);
-    Preconditions.checkState(mergeType == MergeType.CONCATENATE, "Only 'CONCATENATE' mode is currently supported.");
-
-    String mergedSegmentName = configs.get(MinionConstants.MergeRollupTask.MERGED_SEGMENT_NAME_KEY);
     String tableNameWithType = configs.get(MinionConstants.TABLE_NAME_KEY);
-
     TableConfig tableConfig = getTableConfig(tableNameWithType);
+    Schema schema = getSchema(tableNameWithType);
+    Set<String> schemaColumns = schema.getPhysicalColumnNames();
+
+    Map<String, ValueAggregatorFactory.ValueAggregatorType> aggregatorConfigs =
+        MergeRollupTaskUtils.getRollupAggregationTypeMap(configs);
+    String numRecordsPerSegment = configs.get(MinionConstants.MergeRollupTask.MAX_NUM_RECORDS_PER_SEGMENT);
+
+    SegmentProcessorConfig.Builder segmentProcessorConfigBuilder =
+        new SegmentProcessorConfig.Builder().setTableConfig(tableConfig).setSchema(schema);
+
+    // Partition config from tableConfig
+    if (tableConfig.getIndexingConfig().getSegmentPartitionConfig() != null) {
+      Map<String, ColumnPartitionConfig> columnPartitionMap =
+          tableConfig.getIndexingConfig().getSegmentPartitionConfig().getColumnPartitionMap();
+      PartitionerConfig partitionerConfig = getPartitionerConfig(columnPartitionMap, tableNameWithType, schemaColumns);
+      segmentProcessorConfigBuilder.setPartitionerConfigs(Lists.newArrayList(partitionerConfig));
+    }
 
-    MergeRollupSegmentConverter rollupSegmentConverter =
-        new MergeRollupSegmentConverter.Builder().setMergeType(mergeType).setTableName(tableNameWithType)
-            .setSegmentName(mergedSegmentName).setInputIndexDirs(originalIndexDirs).setWorkingDir(workingDir)
-            .setTableConfig(tableConfig).build();
+    // Aggregations using configured Collector
+    List<String> sortedColumns = tableConfig.getIndexingConfig().getSortedColumn();
+    CollectorConfig collectorConfig =
+        getCollectorConfig(mergeTypeString, aggregatorConfigs, schemaColumns, sortedColumns);
+    Preconditions.checkState(collectorConfig.getCollectorType() == CollectorFactory.CollectorType.CONCAT,
+        "Only 'CONCAT' mode is currently supported.");
+    segmentProcessorConfigBuilder.setCollectorConfig(collectorConfig);
 
-    List<File> resultFiles = rollupSegmentConverter.convert();
+    // Segment config
+    if (numRecordsPerSegment != null) {
+      SegmentConfig segmentConfig = getSegmentConfig(numRecordsPerSegment);
+      segmentProcessorConfigBuilder.setSegmentConfig(segmentConfig);

Review comment:
       What happens if we don't set `SegmentConfig` ?

##########
File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/merge_rollup/MergeRollupTaskExecutor.java
##########
@@ -40,40 +53,133 @@
  * TODO:
  *   1. Add the support for roll-up
  *   2. Add the support for time split to provide backfill support for merged segments
- *   3. Change the way to decide the number of output segments (explicit numPartition config -> maxNumRowsPerSegment)
+ *   3. Add merge/rollup name prefixes for generated segments
+ *   4. Add the support for realtime table
  */
 public class MergeRollupTaskExecutor extends BaseMultipleSegmentsConversionExecutor {
   private static final Logger LOGGER = LoggerFactory.getLogger(MergeRollupTaskExecutor.class);
+  private static final String INPUT_SEGMENTS_DIR = "input_segments";
+  private static final String OUTPUT_SEGMENTS_DIR = "output_segments";
 
   @Override
   protected List<SegmentConversionResult> convert(PinotTaskConfig pinotTaskConfig, List<File> originalIndexDirs,

Review comment:
       Instead of directly using `SegmentProcessingFramework`, can we wrap this to a separate class something similar to `MergeRollupSegmentConverter`?
   
   In that case, we can reuse `MergeRollupSegmentConverter` in the `SegmentMergeCommand`.
   

##########
File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/purge/PurgeTaskExecutor.java
##########
@@ -68,7 +68,7 @@ protected SegmentConversionResult convert(PinotTaskConfig pinotTaskConfig, File
 
   @Override
   protected SegmentZKMetadataCustomMapModifier getSegmentZKMetadataCustomMapModifier() {
-    return new SegmentZKMetadataCustomMapModifier(SegmentZKMetadataCustomMapModifier.ModifyMode.REPLACE, Collections
+    return new SegmentZKMetadataCustomMapModifier(SegmentZKMetadataCustomMapModifier.ModifyMode.UPDATE, Collections

Review comment:
       Is there an existing test for segment upload API? Can you check if there's a test for `SegmentZKMetadataCustomMapModifier`? If so, we can add the test in regarding to this change (`REPLACE -> UPDATE`)

##########
File path: pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
##########
@@ -296,6 +309,21 @@ public static URI getUploadSegmentURI(URI controllerURI)
     return getURI(controllerURI.getScheme(), controllerURI.getHost(), controllerURI.getPort(), SEGMENT_PATH);
   }
 
+  public static URI getStartReplaceSegmentsURI(URI controllerURI, String rawTableName, String tableType)
+      throws URISyntaxException {
+    return getURI(controllerURI.getScheme(), controllerURI.getHost(), controllerURI.getPort(),

Review comment:
       I remember that using `getUri()` somehow didn't work so I ended up using `new URI`. Can you double check on this https://github.com/apache/incubator-pinot/pull/6094?
   
   ```
   http://localhost:18998/segments/mytable/startReplaceSegments%3Ftype=OFFLINE <- getURI(),
   http://localhost:18998/segments/mytable/startReplaceSegments?type=OFFLINE  <- new URI(..)
   ``` 

##########
File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/merge_rollup/MergeRollupTaskExecutor.java
##########
@@ -40,40 +53,133 @@
  * TODO:
  *   1. Add the support for roll-up
  *   2. Add the support for time split to provide backfill support for merged segments
- *   3. Change the way to decide the number of output segments (explicit numPartition config -> maxNumRowsPerSegment)
+ *   3. Add merge/rollup name prefixes for generated segments
+ *   4. Add the support for realtime table
  */
 public class MergeRollupTaskExecutor extends BaseMultipleSegmentsConversionExecutor {
   private static final Logger LOGGER = LoggerFactory.getLogger(MergeRollupTaskExecutor.class);
+  private static final String INPUT_SEGMENTS_DIR = "input_segments";
+  private static final String OUTPUT_SEGMENTS_DIR = "output_segments";
 
   @Override
   protected List<SegmentConversionResult> convert(PinotTaskConfig pinotTaskConfig, List<File> originalIndexDirs,
       File workingDir)
       throws Exception {
+    String taskType = pinotTaskConfig.getTaskType();
     Map<String, String> configs = pinotTaskConfig.getConfigs();
+    LOGGER.info("Starting task: {} with configs: {}", taskType, configs);
+    long startMillis = System.currentTimeMillis();
+
     String mergeTypeString = configs.get(MinionConstants.MergeRollupTask.MERGE_TYPE_KEY);
     // TODO: add the support for rollup
     Preconditions.checkNotNull(mergeTypeString, "MergeType cannot be null");
 
-    MergeType mergeType = MergeType.fromString(mergeTypeString);
-    Preconditions.checkState(mergeType == MergeType.CONCATENATE, "Only 'CONCATENATE' mode is currently supported.");
-
-    String mergedSegmentName = configs.get(MinionConstants.MergeRollupTask.MERGED_SEGMENT_NAME_KEY);
     String tableNameWithType = configs.get(MinionConstants.TABLE_NAME_KEY);
-
     TableConfig tableConfig = getTableConfig(tableNameWithType);
+    Schema schema = getSchema(tableNameWithType);
+    Set<String> schemaColumns = schema.getPhysicalColumnNames();
+
+    Map<String, ValueAggregatorFactory.ValueAggregatorType> aggregatorConfigs =
+        MergeRollupTaskUtils.getRollupAggregationTypeMap(configs);
+    String numRecordsPerSegment = configs.get(MinionConstants.MergeRollupTask.MAX_NUM_RECORDS_PER_SEGMENT);
+
+    SegmentProcessorConfig.Builder segmentProcessorConfigBuilder =
+        new SegmentProcessorConfig.Builder().setTableConfig(tableConfig).setSchema(schema);
+
+    // Partition config from tableConfig
+    if (tableConfig.getIndexingConfig().getSegmentPartitionConfig() != null) {
+      Map<String, ColumnPartitionConfig> columnPartitionMap =
+          tableConfig.getIndexingConfig().getSegmentPartitionConfig().getColumnPartitionMap();
+      PartitionerConfig partitionerConfig = getPartitionerConfig(columnPartitionMap, tableNameWithType, schemaColumns);

Review comment:
       Why do we read the partition config from the table config and set it to the partition for the segment processing framework?
   
   We should handle this on the task scheduler. The scheduler should only pick the segments from the same partition when the custom partitioning is enabled.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] codecov-commenter edited a comment on pull request #6975: Merge rollup executor enhancement

Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #6975:
URL: https://github.com/apache/incubator-pinot/pull/6975#issuecomment-848278058


   # [Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6975?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#6975](https://codecov.io/gh/apache/incubator-pinot/pull/6975?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (4b54e4f) into [master](https://codecov.io/gh/apache/incubator-pinot/commit/211cf8a8b9476a9756d2b40f1d0660cfcb1d4235?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (211cf8a) will **decrease** coverage by `7.83%`.
   > The diff coverage is `29.16%`.
   
   > :exclamation: Current head 4b54e4f differs from pull request most recent head 4806a5f. Consider uploading reports for the commit 4806a5f to get more accurate results
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-pinot/pull/6975/graphs/tree.svg?width=650&height=150&src=pr&token=4ibza2ugkz&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/incubator-pinot/pull/6975?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #6975      +/-   ##
   ============================================
   - Coverage     73.30%   65.47%   -7.84%     
     Complexity       12       12              
   ============================================
     Files          1447     1447              
     Lines         71627    71734     +107     
     Branches      10391    10400       +9     
   ============================================
   - Hits          52506    46965    -5541     
   - Misses        15585    21366    +5781     
   + Partials       3536     3403     -133     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | integration | `?` | |
   | unittests | `65.47% <29.16%> (-0.07%)` | :arrow_down: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-pinot/pull/6975?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [.../org/apache/pinot/core/common/MinionConstants.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9jb21tb24vTWluaW9uQ29uc3RhbnRzLmphdmE=) | `0.00% <ø> (ø)` | |
   | [.../tasks/BaseMultipleSegmentsConversionExecutor.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1taW5pb24tdGFza3MvcGlub3QtbWluaW9uLWJ1aWx0aW4tdGFza3Mvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3Bpbm90L3BsdWdpbi9taW5pb24vdGFza3MvQmFzZU11bHRpcGxlU2VnbWVudHNDb252ZXJzaW9uRXhlY3V0b3IuamF2YQ==) | `2.59% <0.00%> (-87.24%)` | :arrow_down: |
   | [...ot/plugin/minion/tasks/SegmentConversionUtils.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1taW5pb24tdGFza3MvcGlub3QtbWluaW9uLWJ1aWx0aW4tdGFza3Mvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3Bpbm90L3BsdWdpbi9taW5pb24vdGFza3MvU2VnbWVudENvbnZlcnNpb25VdGlscy5qYXZh) | `0.00% <0.00%> (-52.95%)` | :arrow_down: |
   | [...t/plugin/minion/tasks/purge/PurgeTaskExecutor.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1taW5pb24tdGFza3MvcGlub3QtbWluaW9uLWJ1aWx0aW4tdGFza3Mvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3Bpbm90L3BsdWdpbi9taW5pb24vdGFza3MvcHVyZ2UvUHVyZ2VUYXNrRXhlY3V0b3IuamF2YQ==) | `70.83% <0.00%> (ø)` | |
   | [...egments/RealtimeToOfflineSegmentsTaskExecutor.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1taW5pb24tdGFza3MvcGlub3QtbWluaW9uLWJ1aWx0aW4tdGFza3Mvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3Bpbm90L3BsdWdpbi9taW5pb24vdGFza3MvcmVhbHRpbWVfdG9fb2ZmbGluZV9zZWdtZW50cy9SZWFsdGltZVRvT2ZmbGluZVNlZ21lbnRzVGFza0V4ZWN1dG9yLmphdmE=) | `78.26% <0.00%> (-11.45%)` | :arrow_down: |
   | [...e/pinot/common/utils/FileUploadDownloadClient.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vdXRpbHMvRmlsZVVwbG9hZERvd25sb2FkQ2xpZW50LmphdmE=) | `19.46% <5.26%> (-38.91%)` | :arrow_down: |
   | [...on/tasks/merge\_rollup/MergeRollupTaskExecutor.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1taW5pb24tdGFza3MvcGlub3QtbWluaW9uLWJ1aWx0aW4tdGFza3Mvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3Bpbm90L3BsdWdpbi9taW5pb24vdGFza3MvbWVyZ2Vfcm9sbHVwL01lcmdlUm9sbHVwVGFza0V4ZWN1dG9yLmphdmE=) | `65.71% <58.62%> (-29.75%)` | :arrow_down: |
   | [...a/org/apache/pinot/minion/metrics/MinionMeter.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtbWluaW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9taW5pb24vbWV0cmljcy9NaW5pb25NZXRlci5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [.../apache/pinot/common/metrics/BrokerQueryPhase.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vbWV0cmljcy9Ccm9rZXJRdWVyeVBoYXNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [.../apache/pinot/minion/metrics/MinionQueryPhase.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtbWluaW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9taW5pb24vbWV0cmljcy9NaW5pb25RdWVyeVBoYXNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | ... and [344 more](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6975?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6975?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [211cf8a...4806a5f](https://codecov.io/gh/apache/incubator-pinot/pull/6975?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] snleee commented on a change in pull request #6975: Merge rollup executor enhancement

Posted by GitBox <gi...@apache.org>.
snleee commented on a change in pull request #6975:
URL: https://github.com/apache/incubator-pinot/pull/6975#discussion_r644338809



##########
File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
##########
@@ -138,11 +149,32 @@ protected void postProcess(PinotTaskConfig pinotTaskConfig) {
             taskType + " on table: " + tableNameWithType + ", segments: " + inputSegmentNames + " got cancelled");
       }
 
+      // Update the segment lineage to indicate that the segment replacement is in progress.
+      String lineageEntryId = null;
+      if (replaceSegmentsEnabled) {
+        List<String> segmentsFrom =
+            Arrays.stream(inputSegmentNames.split(",")).map(String::trim).collect(Collectors.toList());
+        List<String> segmentsTo =
+            segmentConversionResults.stream().map(SegmentConversionResult::getSegmentName).collect(Collectors.toList());
+        lineageEntryId = SegmentConversionUtils
+            .startSegmentReplace(tableNameWithType, uploadURL, new StartReplaceSegmentsRequest(segmentsFrom, segmentsTo));
+      }
+
       // Upload the tarred segments
       for (int i = 0; i < numOutputSegments; i++) {
         File convertedTarredSegmentFile = tarredSegmentFiles.get(i);
         String resultSegmentName = segmentConversionResults.get(i).getSegmentName();
 
+        // Set segment ZK metadata custom map modifier into HTTP header to modify the segment ZK metadata
+        SegmentZKMetadataCustomMapModifier segmentZKMetadataCustomMapModifier = getSegmentZKMetadataCustomMapModifier(pinotTaskConfig);

Review comment:
       I think that we should pass `SegmentConversionResult` instead of `resultSegmentName`.

##########
File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
##########
@@ -138,11 +149,32 @@ protected void postProcess(PinotTaskConfig pinotTaskConfig) {
             taskType + " on table: " + tableNameWithType + ", segments: " + inputSegmentNames + " got cancelled");
       }
 
+      // Update the segment lineage to indicate that the segment replacement is in progress.
+      String lineageEntryId = null;
+      if (replaceSegmentsEnabled) {
+        List<String> segmentsFrom =
+            Arrays.stream(inputSegmentNames.split(",")).map(String::trim).collect(Collectors.toList());
+        List<String> segmentsTo =
+            segmentConversionResults.stream().map(SegmentConversionResult::getSegmentName).collect(Collectors.toList());
+        lineageEntryId = SegmentConversionUtils
+            .startSegmentReplace(tableNameWithType, uploadURL, new StartReplaceSegmentsRequest(segmentsFrom, segmentsTo));
+      }
+
       // Upload the tarred segments
       for (int i = 0; i < numOutputSegments; i++) {
         File convertedTarredSegmentFile = tarredSegmentFiles.get(i);
         String resultSegmentName = segmentConversionResults.get(i).getSegmentName();
 
+        // Set segment ZK metadata custom map modifier into HTTP header to modify the segment ZK metadata
+        SegmentZKMetadataCustomMapModifier segmentZKMetadataCustomMapModifier = getSegmentZKMetadataCustomMapModifier(pinotTaskConfig);

Review comment:
       I think that we should pass `SegmentConversionResult` instead of `resultSegmentName`. There's a `customProperties` and we can probably use that field.

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/MergeRollupSegmentProcessor.java
##########
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.framework;
+
+import com.google.common.base.Preconditions;
+import java.io.File;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.core.segment.processing.collector.CollectorConfig;
+import org.apache.pinot.core.segment.processing.collector.CollectorFactory;
+import org.apache.pinot.core.segment.processing.collector.ValueAggregatorFactory;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+
+
+/**
+ * Merge rollup segment processor framework takes a list of segments and concatenates/rolls up segments based on
+ * the configuration.
+ *
+ * TODO:
+ *   1. Add the support for roll-up
+ *   2. Add the support to make result segments time aligned
+ *   3. Add merge/roll-up prefixes for result segments
+ */
+public class MergeRollupSegmentProcessor {

Review comment:
       Let's actually put this under `org.apche.pinot.core.minion` and Rename it to `MergeRollupConverter`.
   
   We can delete `org.apche.pinot.core.minion.rollup` and `org.apche.pinot.core.minion.segment` in the following PR as a clean-up.

##########
File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
##########
@@ -64,6 +70,12 @@
       File workingDir)
       throws Exception;
 
+  /**
+   * Returns the segment ZK metadata custom map modifier.
+   */
+  protected abstract SegmentZKMetadataCustomMapModifier getSegmentZKMetadataCustomMapModifier(

Review comment:
       Can we move this to `BaseTaskExecutor`?
   
   We have the same abstract function defined in `BaseSingleSegmentConversionExecutor`. After we move this to `BaseTaskExecutor`, we can remove this from both `BaseSingleSegmentConversionExecutor` and `BaseMultipleSegmentsConversionExecutor`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] jtao15 commented on pull request #6975: Merge rollup executor enhancement

Posted by GitBox <gi...@apache.org>.
jtao15 commented on pull request #6975:
URL: https://github.com/apache/incubator-pinot/pull/6975#issuecomment-851707013


   Update the pr based on previous comments. @snleee 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] snleee commented on pull request #6975: Merge rollup executor enhancement

Posted by GitBox <gi...@apache.org>.
snleee commented on pull request #6975:
URL: https://github.com/apache/incubator-pinot/pull/6975#issuecomment-854092399


   LGTM. Thanks for addressing all the comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] jtao15 commented on a change in pull request #6975: Merge rollup executor enhancement

Posted by GitBox <gi...@apache.org>.
jtao15 commented on a change in pull request #6975:
URL: https://github.com/apache/incubator-pinot/pull/6975#discussion_r644370006



##########
File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
##########
@@ -138,11 +149,32 @@ protected void postProcess(PinotTaskConfig pinotTaskConfig) {
             taskType + " on table: " + tableNameWithType + ", segments: " + inputSegmentNames + " got cancelled");
       }
 
+      // Update the segment lineage to indicate that the segment replacement is in progress.
+      String lineageEntryId = null;
+      if (replaceSegmentsEnabled) {
+        List<String> segmentsFrom =
+            Arrays.stream(inputSegmentNames.split(",")).map(String::trim).collect(Collectors.toList());
+        List<String> segmentsTo =
+            segmentConversionResults.stream().map(SegmentConversionResult::getSegmentName).collect(Collectors.toList());
+        lineageEntryId = SegmentConversionUtils
+            .startSegmentReplace(tableNameWithType, uploadURL, new StartReplaceSegmentsRequest(segmentsFrom, segmentsTo));
+      }
+
       // Upload the tarred segments
       for (int i = 0; i < numOutputSegments; i++) {
         File convertedTarredSegmentFile = tarredSegmentFiles.get(i);
         String resultSegmentName = segmentConversionResults.get(i).getSegmentName();
 
+        // Set segment ZK metadata custom map modifier into HTTP header to modify the segment ZK metadata
+        SegmentZKMetadataCustomMapModifier segmentZKMetadataCustomMapModifier = getSegmentZKMetadataCustomMapModifier(pinotTaskConfig);

Review comment:
       Updated.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] jtao15 commented on a change in pull request #6975: Merge rollup executor enhancement

Posted by GitBox <gi...@apache.org>.
jtao15 commented on a change in pull request #6975:
URL: https://github.com/apache/incubator-pinot/pull/6975#discussion_r641685492



##########
File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
##########
@@ -138,11 +149,32 @@ protected void postProcess(PinotTaskConfig pinotTaskConfig) {
             taskType + " on table: " + tableNameWithType + ", segments: " + inputSegmentNames + " got cancelled");
       }
 
+      // Update the segment lineage to indicate that the segment replacement is in progress.
+      String lineageEntryId = null;
+      if (replaceSegmentsEnabled) {
+        List<String> segmentsFrom =
+            Arrays.stream(inputSegmentNames.split(",")).map(String::trim).collect(Collectors.toList());
+        List<String> segmentsTo =
+            segmentConversionResults.stream().map(SegmentConversionResult::getSegmentName).collect(Collectors.toList());
+        lineageEntryId = SegmentConversionUtils
+            .startSegmentReplace(tableNameWithType, uploadURL, new StartReplaceSegmentsRequest(segmentsFrom, segmentsTo));
+      }
+
       // Upload the tarred segments
       for (int i = 0; i < numOutputSegments; i++) {
         File convertedTarredSegmentFile = tarredSegmentFiles.get(i);
         String resultSegmentName = segmentConversionResults.get(i).getSegmentName();
 
+        // Set segment ZK metadata custom map modifier into HTTP header to modify the segment ZK metadata
+        SegmentZKMetadataCustomMapModifier segmentZKMetadataCustomMapModifier = getSegmentZKMetadataCustomMapModifier(pinotTaskConfig);
+        Header segmentZKMetadataCustomMapModifierHeader =
+            new BasicHeader(FileUploadDownloadClient.CustomHeaders.SEGMENT_ZK_METADATA_CUSTOM_MAP_MODIFIER,
+                segmentZKMetadataCustomMapModifier.toJsonString());
+
+        List<Header> httpHeaders = new ArrayList<>();
+        httpHeaders.add(segmentZKMetadataCustomMapModifierHeader);
+        httpHeaders.addAll(FileUploadDownloadClient.makeAuthHeader(authToken));

Review comment:
       I'm adding the SegmentZKMetadataCustomMapModifier to the header, so upload segment api will update the custom map accordingly. For merged segments, we want to mark their bucket granularity in custom map, so the scheduler can differentiate which segments are merged already.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] codecov-commenter edited a comment on pull request #6975: Merge rollup executor enhancement

Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #6975:
URL: https://github.com/apache/incubator-pinot/pull/6975#issuecomment-848278058


   # [Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6975?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#6975](https://codecov.io/gh/apache/incubator-pinot/pull/6975?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (4b54e4f) into [master](https://codecov.io/gh/apache/incubator-pinot/commit/211cf8a8b9476a9756d2b40f1d0660cfcb1d4235?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (211cf8a) will **decrease** coverage by `0.02%`.
   > The diff coverage is `38.33%`.
   
   > :exclamation: Current head 4b54e4f differs from pull request most recent head 4806a5f. Consider uploading reports for the commit 4806a5f to get more accurate results
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-pinot/pull/6975/graphs/tree.svg?width=650&height=150&src=pr&token=4ibza2ugkz&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/incubator-pinot/pull/6975?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #6975      +/-   ##
   ============================================
   - Coverage     73.30%   73.28%   -0.03%     
     Complexity       12       12              
   ============================================
     Files          1447     1447              
     Lines         71627    71734     +107     
     Branches      10391    10400       +9     
   ============================================
   + Hits          52506    52567      +61     
   - Misses        15585    15625      +40     
   - Partials       3536     3542       +6     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | integration | `41.86% <9.16%> (+0.09%)` | :arrow_up: |
   | unittests | `65.47% <29.16%> (-0.07%)` | :arrow_down: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-pinot/pull/6975?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [.../org/apache/pinot/core/common/MinionConstants.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9jb21tb24vTWluaW9uQ29uc3RhbnRzLmphdmE=) | `0.00% <ø> (ø)` | |
   | [...ot/plugin/minion/tasks/SegmentConversionUtils.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1taW5pb24tdGFza3MvcGlub3QtbWluaW9uLWJ1aWx0aW4tdGFza3Mvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3Bpbm90L3BsdWdpbi9taW5pb24vdGFza3MvU2VnbWVudENvbnZlcnNpb25VdGlscy5qYXZh) | `32.14% <0.00%> (-20.80%)` | :arrow_down: |
   | [...t/plugin/minion/tasks/purge/PurgeTaskExecutor.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1taW5pb24tdGFza3MvcGlub3QtbWluaW9uLWJ1aWx0aW4tdGFza3Mvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3Bpbm90L3BsdWdpbi9taW5pb24vdGFza3MvcHVyZ2UvUHVyZ2VUYXNrRXhlY3V0b3IuamF2YQ==) | `70.83% <0.00%> (ø)` | |
   | [...e/pinot/common/utils/FileUploadDownloadClient.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vdXRpbHMvRmlsZVVwbG9hZERvd25sb2FkQ2xpZW50LmphdmE=) | `53.98% <5.26%> (-4.40%)` | :arrow_down: |
   | [.../tasks/BaseMultipleSegmentsConversionExecutor.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1taW5pb24tdGFza3MvcGlub3QtbWluaW9uLWJ1aWx0aW4tdGFza3Mvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3Bpbm90L3BsdWdpbi9taW5pb24vdGFza3MvQmFzZU11bHRpcGxlU2VnbWVudHNDb252ZXJzaW9uRXhlY3V0b3IuamF2YQ==) | `80.51% <50.00%> (-9.32%)` | :arrow_down: |
   | [...on/tasks/merge\_rollup/MergeRollupTaskExecutor.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1taW5pb24tdGFza3MvcGlub3QtbWluaW9uLWJ1aWx0aW4tdGFza3Mvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3Bpbm90L3BsdWdpbi9taW5pb24vdGFza3MvbWVyZ2Vfcm9sbHVwL01lcmdlUm9sbHVwVGFza0V4ZWN1dG9yLmphdmE=) | `65.71% <58.62%> (-29.75%)` | :arrow_down: |
   | [...egments/RealtimeToOfflineSegmentsTaskExecutor.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1taW5pb24tdGFza3MvcGlub3QtbWluaW9uLWJ1aWx0aW4tdGFza3Mvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3Bpbm90L3BsdWdpbi9taW5pb24vdGFza3MvcmVhbHRpbWVfdG9fb2ZmbGluZV9zZWdtZW50cy9SZWFsdGltZVRvT2ZmbGluZVNlZ21lbnRzVGFza0V4ZWN1dG9yLmphdmE=) | `89.85% <100.00%> (+0.14%)` | :arrow_up: |
   | [...org/apache/pinot/core/minion/rollup/MergeType.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9taW5pb24vcm9sbHVwL01lcmdlVHlwZS5qYXZh) | `60.00% <0.00%> (-20.00%)` | :arrow_down: |
   | [...ller/helix/core/minion/TaskTypeMetricsUpdater.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29udHJvbGxlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29udHJvbGxlci9oZWxpeC9jb3JlL21pbmlvbi9UYXNrVHlwZU1ldHJpY3NVcGRhdGVyLmphdmE=) | `80.00% <0.00%> (-20.00%)` | :arrow_down: |
   | [...operator/filter/RangeIndexBasedFilterOperator.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9vcGVyYXRvci9maWx0ZXIvUmFuZ2VJbmRleEJhc2VkRmlsdGVyT3BlcmF0b3IuamF2YQ==) | `40.42% <0.00%> (-4.26%)` | :arrow_down: |
   | ... and [23 more](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6975?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6975?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [211cf8a...4806a5f](https://codecov.io/gh/apache/incubator-pinot/pull/6975?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] jtao15 commented on a change in pull request #6975: Merge rollup executor enhancement

Posted by GitBox <gi...@apache.org>.
jtao15 commented on a change in pull request #6975:
URL: https://github.com/apache/incubator-pinot/pull/6975#discussion_r641846397



##########
File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/merge_rollup/MergeRollupTaskExecutor.java
##########
@@ -40,40 +53,133 @@
  * TODO:
  *   1. Add the support for roll-up
  *   2. Add the support for time split to provide backfill support for merged segments
- *   3. Change the way to decide the number of output segments (explicit numPartition config -> maxNumRowsPerSegment)
+ *   3. Add merge/rollup name prefixes for generated segments
+ *   4. Add the support for realtime table
  */
 public class MergeRollupTaskExecutor extends BaseMultipleSegmentsConversionExecutor {
   private static final Logger LOGGER = LoggerFactory.getLogger(MergeRollupTaskExecutor.class);
+  private static final String INPUT_SEGMENTS_DIR = "input_segments";
+  private static final String OUTPUT_SEGMENTS_DIR = "output_segments";
 
   @Override
   protected List<SegmentConversionResult> convert(PinotTaskConfig pinotTaskConfig, List<File> originalIndexDirs,
       File workingDir)
       throws Exception {
+    String taskType = pinotTaskConfig.getTaskType();
     Map<String, String> configs = pinotTaskConfig.getConfigs();
+    LOGGER.info("Starting task: {} with configs: {}", taskType, configs);
+    long startMillis = System.currentTimeMillis();
+
     String mergeTypeString = configs.get(MinionConstants.MergeRollupTask.MERGE_TYPE_KEY);
     // TODO: add the support for rollup
     Preconditions.checkNotNull(mergeTypeString, "MergeType cannot be null");
 
-    MergeType mergeType = MergeType.fromString(mergeTypeString);
-    Preconditions.checkState(mergeType == MergeType.CONCATENATE, "Only 'CONCATENATE' mode is currently supported.");
-
-    String mergedSegmentName = configs.get(MinionConstants.MergeRollupTask.MERGED_SEGMENT_NAME_KEY);
     String tableNameWithType = configs.get(MinionConstants.TABLE_NAME_KEY);
-
     TableConfig tableConfig = getTableConfig(tableNameWithType);
+    Schema schema = getSchema(tableNameWithType);
+    Set<String> schemaColumns = schema.getPhysicalColumnNames();
+
+    Map<String, ValueAggregatorFactory.ValueAggregatorType> aggregatorConfigs =
+        MergeRollupTaskUtils.getRollupAggregationTypeMap(configs);
+    String numRecordsPerSegment = configs.get(MinionConstants.MergeRollupTask.MAX_NUM_RECORDS_PER_SEGMENT);
+
+    SegmentProcessorConfig.Builder segmentProcessorConfigBuilder =
+        new SegmentProcessorConfig.Builder().setTableConfig(tableConfig).setSchema(schema);
+
+    // Partition config from tableConfig
+    if (tableConfig.getIndexingConfig().getSegmentPartitionConfig() != null) {
+      Map<String, ColumnPartitionConfig> columnPartitionMap =
+          tableConfig.getIndexingConfig().getSegmentPartitionConfig().getColumnPartitionMap();
+      PartitionerConfig partitionerConfig = getPartitionerConfig(columnPartitionMap, tableNameWithType, schemaColumns);

Review comment:
       You are right, will remove this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] snleee merged pull request #6975: Merge rollup executor enhancement

Posted by GitBox <gi...@apache.org>.
snleee merged pull request #6975:
URL: https://github.com/apache/incubator-pinot/pull/6975


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] codecov-commenter edited a comment on pull request #6975: Merge rollup executor enhancement

Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #6975:
URL: https://github.com/apache/incubator-pinot/pull/6975#issuecomment-848278058


   # [Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6975?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#6975](https://codecov.io/gh/apache/incubator-pinot/pull/6975?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (ea4d245) into [master](https://codecov.io/gh/apache/incubator-pinot/commit/4944731579de1b17be23404618e1bb4dd8e5e511?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (4944731) will **decrease** coverage by `0.08%`.
   > The diff coverage is `59.21%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-pinot/pull/6975/graphs/tree.svg?width=650&height=150&src=pr&token=4ibza2ugkz&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/incubator-pinot/pull/6975?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #6975      +/-   ##
   ============================================
   - Coverage     73.20%   73.11%   -0.09%     
     Complexity       12       12              
   ============================================
     Files          1451     1452       +1     
     Lines         71832    71969     +137     
     Branches      10415    10423       +8     
   ============================================
   + Hits          52585    52621      +36     
   - Misses        15715    15817     +102     
   + Partials       3532     3531       -1     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | integration | `41.73% <10.52%> (-0.17%)` | :arrow_down: |
   | unittests | `65.37% <48.68%> (+<0.01%)` | :arrow_up: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-pinot/pull/6975?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [.../org/apache/pinot/core/common/MinionConstants.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9jb21tb24vTWluaW9uQ29uc3RhbnRzLmphdmE=) | `0.00% <ø> (ø)` | |
   | [...he/pinot/plugin/minion/tasks/BaseTaskExecutor.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1taW5pb24tdGFza3MvcGlub3QtbWluaW9uLWJ1aWx0aW4tdGFza3Mvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3Bpbm90L3BsdWdpbi9taW5pb24vdGFza3MvQmFzZVRhc2tFeGVjdXRvci5qYXZh) | `83.33% <ø> (ø)` | |
   | [...ot/plugin/minion/tasks/SegmentConversionUtils.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1taW5pb24tdGFza3MvcGlub3QtbWluaW9uLWJ1aWx0aW4tdGFza3Mvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3Bpbm90L3BsdWdpbi9taW5pb24vdGFza3MvU2VnbWVudENvbnZlcnNpb25VdGlscy5qYXZh) | `32.14% <0.00%> (-20.80%)` | :arrow_down: |
   | [...rt\_to\_raw\_index/ConvertToRawIndexTaskExecutor.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1taW5pb24tdGFza3MvcGlub3QtbWluaW9uLWJ1aWx0aW4tdGFza3Mvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3Bpbm90L3BsdWdpbi9taW5pb24vdGFza3MvY29udmVydF90b19yYXdfaW5kZXgvQ29udmVydFRvUmF3SW5kZXhUYXNrRXhlY3V0b3IuamF2YQ==) | `100.00% <ø> (ø)` | |
   | [...t/plugin/minion/tasks/purge/PurgeTaskExecutor.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1taW5pb24tdGFza3MvcGlub3QtbWluaW9uLWJ1aWx0aW4tdGFza3Mvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3Bpbm90L3BsdWdpbi9taW5pb24vdGFza3MvcHVyZ2UvUHVyZ2VUYXNrRXhlY3V0b3IuamF2YQ==) | `70.83% <0.00%> (ø)` | |
   | [...and\_push/SegmentGenerationAndPushTaskExecutor.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1taW5pb24tdGFza3MvcGlub3QtbWluaW9uLWJ1aWx0aW4tdGFza3Mvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3Bpbm90L3BsdWdpbi9taW5pb24vdGFza3Mvc2VnbWVudF9nZW5lcmF0aW9uX2FuZF9wdXNoL1NlZ21lbnRHZW5lcmF0aW9uQW5kUHVzaFRhc2tFeGVjdXRvci5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [...e/pinot/common/utils/FileUploadDownloadClient.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vdXRpbHMvRmlsZVVwbG9hZERvd25sb2FkQ2xpZW50LmphdmE=) | `53.98% <5.26%> (-4.40%)` | :arrow_down: |
   | [.../tasks/BaseMultipleSegmentsConversionExecutor.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1taW5pb24tdGFza3MvcGlub3QtbWluaW9uLWJ1aWx0aW4tdGFza3Mvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3Bpbm90L3BsdWdpbi9taW5pb24vdGFza3MvQmFzZU11bHRpcGxlU2VnbWVudHNDb252ZXJzaW9uRXhlY3V0b3IuamF2YQ==) | `81.01% <57.14%> (-8.82%)` | :arrow_down: |
   | [...on/tasks/merge\_rollup/MergeRollupTaskExecutor.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1taW5pb24tdGFza3MvcGlub3QtbWluaW9uLWJ1aWx0aW4tdGFza3Mvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3Bpbm90L3BsdWdpbi9taW5pb24vdGFza3MvbWVyZ2Vfcm9sbHVwL01lcmdlUm9sbHVwVGFza0V4ZWN1dG9yLmphdmE=) | `88.57% <82.60%> (-6.89%)` | :arrow_down: |
   | [...apache/pinot/core/minion/MergeRollupConverter.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9taW5pb24vTWVyZ2VSb2xsdXBDb252ZXJ0ZXIuamF2YQ==) | `88.52% <88.52%> (ø)` | |
   | ... and [37 more](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6975?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6975?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [4944731...ea4d245](https://codecov.io/gh/apache/incubator-pinot/pull/6975?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] jtao15 commented on a change in pull request #6975: Merge rollup executor enhancement

Posted by GitBox <gi...@apache.org>.
jtao15 commented on a change in pull request #6975:
URL: https://github.com/apache/incubator-pinot/pull/6975#discussion_r641844363



##########
File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/merge_rollup/MergeRollupTaskExecutor.java
##########
@@ -40,40 +53,133 @@
  * TODO:
  *   1. Add the support for roll-up
  *   2. Add the support for time split to provide backfill support for merged segments
- *   3. Change the way to decide the number of output segments (explicit numPartition config -> maxNumRowsPerSegment)
+ *   3. Add merge/rollup name prefixes for generated segments
+ *   4. Add the support for realtime table
  */
 public class MergeRollupTaskExecutor extends BaseMultipleSegmentsConversionExecutor {
   private static final Logger LOGGER = LoggerFactory.getLogger(MergeRollupTaskExecutor.class);
+  private static final String INPUT_SEGMENTS_DIR = "input_segments";
+  private static final String OUTPUT_SEGMENTS_DIR = "output_segments";
 
   @Override
   protected List<SegmentConversionResult> convert(PinotTaskConfig pinotTaskConfig, List<File> originalIndexDirs,
       File workingDir)
       throws Exception {
+    String taskType = pinotTaskConfig.getTaskType();
     Map<String, String> configs = pinotTaskConfig.getConfigs();
+    LOGGER.info("Starting task: {} with configs: {}", taskType, configs);
+    long startMillis = System.currentTimeMillis();
+
     String mergeTypeString = configs.get(MinionConstants.MergeRollupTask.MERGE_TYPE_KEY);
     // TODO: add the support for rollup
     Preconditions.checkNotNull(mergeTypeString, "MergeType cannot be null");
 
-    MergeType mergeType = MergeType.fromString(mergeTypeString);
-    Preconditions.checkState(mergeType == MergeType.CONCATENATE, "Only 'CONCATENATE' mode is currently supported.");
-
-    String mergedSegmentName = configs.get(MinionConstants.MergeRollupTask.MERGED_SEGMENT_NAME_KEY);
     String tableNameWithType = configs.get(MinionConstants.TABLE_NAME_KEY);
-
     TableConfig tableConfig = getTableConfig(tableNameWithType);
+    Schema schema = getSchema(tableNameWithType);
+    Set<String> schemaColumns = schema.getPhysicalColumnNames();
+
+    Map<String, ValueAggregatorFactory.ValueAggregatorType> aggregatorConfigs =
+        MergeRollupTaskUtils.getRollupAggregationTypeMap(configs);
+    String numRecordsPerSegment = configs.get(MinionConstants.MergeRollupTask.MAX_NUM_RECORDS_PER_SEGMENT);
+
+    SegmentProcessorConfig.Builder segmentProcessorConfigBuilder =
+        new SegmentProcessorConfig.Builder().setTableConfig(tableConfig).setSchema(schema);
+
+    // Partition config from tableConfig
+    if (tableConfig.getIndexingConfig().getSegmentPartitionConfig() != null) {
+      Map<String, ColumnPartitionConfig> columnPartitionMap =
+          tableConfig.getIndexingConfig().getSegmentPartitionConfig().getColumnPartitionMap();
+      PartitionerConfig partitionerConfig = getPartitionerConfig(columnPartitionMap, tableNameWithType, schemaColumns);
+      segmentProcessorConfigBuilder.setPartitionerConfigs(Lists.newArrayList(partitionerConfig));
+    }
 
-    MergeRollupSegmentConverter rollupSegmentConverter =
-        new MergeRollupSegmentConverter.Builder().setMergeType(mergeType).setTableName(tableNameWithType)
-            .setSegmentName(mergedSegmentName).setInputIndexDirs(originalIndexDirs).setWorkingDir(workingDir)
-            .setTableConfig(tableConfig).build();
+    // Aggregations using configured Collector
+    List<String> sortedColumns = tableConfig.getIndexingConfig().getSortedColumn();
+    CollectorConfig collectorConfig =
+        getCollectorConfig(mergeTypeString, aggregatorConfigs, schemaColumns, sortedColumns);
+    Preconditions.checkState(collectorConfig.getCollectorType() == CollectorFactory.CollectorType.CONCAT,
+        "Only 'CONCAT' mode is currently supported.");
+    segmentProcessorConfigBuilder.setCollectorConfig(collectorConfig);
 
-    List<File> resultFiles = rollupSegmentConverter.convert();
+    // Segment config
+    if (numRecordsPerSegment != null) {
+      SegmentConfig segmentConfig = getSegmentConfig(numRecordsPerSegment);
+      segmentProcessorConfigBuilder.setSegmentConfig(segmentConfig);
+    }
+
+    SegmentProcessorConfig segmentProcessorConfig = segmentProcessorConfigBuilder.build();
+
+    File inputSegmentsDir = new File(workingDir, INPUT_SEGMENTS_DIR);
+    Preconditions.checkState(inputSegmentsDir.mkdirs(), "Failed to create input directory: %s for task: %s",
+        inputSegmentsDir.getAbsolutePath(), taskType);
+    for (File indexDir : originalIndexDirs) {
+      FileUtils.copyDirectoryToDirectory(indexDir, inputSegmentsDir);
+    }
+    File outputSegmentsDir = new File(workingDir, OUTPUT_SEGMENTS_DIR);
+    Preconditions.checkState(outputSegmentsDir.mkdirs(), "Failed to create output directory: %s for task: %s",
+        outputSegmentsDir.getAbsolutePath(), taskType);
+
+    SegmentProcessorFramework segmentProcessorFramework =
+        new SegmentProcessorFramework(inputSegmentsDir, segmentProcessorConfig, outputSegmentsDir);
+    try {
+      segmentProcessorFramework.processSegments();
+    } finally {
+      segmentProcessorFramework.cleanup();
+    }
+
+    long endMillis = System.currentTimeMillis();
+    LOGGER.info("Finished task: {} with configs: {}. Total time: {}ms", taskType, configs, (endMillis - startMillis));
     List<SegmentConversionResult> results = new ArrayList<>();
-    for (File file : resultFiles) {
+    for (File file : outputSegmentsDir.listFiles()) {
       String outputSegmentName = file.getName();
       results.add(new SegmentConversionResult.Builder().setFile(file).setSegmentName(outputSegmentName)
           .setTableNameWithType(tableNameWithType).build());
     }
     return results;
   }
+
+  @Override
+  protected SegmentZKMetadataCustomMapModifier getSegmentZKMetadataCustomMapModifier(PinotTaskConfig pinotTaskConfig) {
+    return new SegmentZKMetadataCustomMapModifier(SegmentZKMetadataCustomMapModifier.ModifyMode.UPDATE, Collections
+        .singletonMap(MinionConstants.MergeRollupTask.TASK_TYPE + MinionConstants.TASK_BUCKET_GRANULARITY_SUFFIX,
+            pinotTaskConfig.getConfigs().get(MinionConstants.MergeRollupTask.GRANULARITY_KEY)));

Review comment:
       This will be controlled on scheduler side, the scheduler should call `MergeRollupTaskUtils.getAllMergeProperties()` to get the granularity, and use upper case to create the task config. Maybe we can use `pinotTaskConfig.getConfigs().get(MinionConstants.MergeRollupTask.GRANULARITY_KEY).toUpperCase()` to make it less error-prone.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] jtao15 commented on a change in pull request #6975: Merge rollup executor enhancement

Posted by GitBox <gi...@apache.org>.
jtao15 commented on a change in pull request #6975:
URL: https://github.com/apache/incubator-pinot/pull/6975#discussion_r641846049



##########
File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/merge_rollup/MergeRollupTaskExecutor.java
##########
@@ -40,40 +53,133 @@
  * TODO:
  *   1. Add the support for roll-up
  *   2. Add the support for time split to provide backfill support for merged segments
- *   3. Change the way to decide the number of output segments (explicit numPartition config -> maxNumRowsPerSegment)
+ *   3. Add merge/rollup name prefixes for generated segments
+ *   4. Add the support for realtime table
  */
 public class MergeRollupTaskExecutor extends BaseMultipleSegmentsConversionExecutor {
   private static final Logger LOGGER = LoggerFactory.getLogger(MergeRollupTaskExecutor.class);
+  private static final String INPUT_SEGMENTS_DIR = "input_segments";
+  private static final String OUTPUT_SEGMENTS_DIR = "output_segments";
 
   @Override
   protected List<SegmentConversionResult> convert(PinotTaskConfig pinotTaskConfig, List<File> originalIndexDirs,
       File workingDir)
       throws Exception {
+    String taskType = pinotTaskConfig.getTaskType();
     Map<String, String> configs = pinotTaskConfig.getConfigs();
+    LOGGER.info("Starting task: {} with configs: {}", taskType, configs);
+    long startMillis = System.currentTimeMillis();
+
     String mergeTypeString = configs.get(MinionConstants.MergeRollupTask.MERGE_TYPE_KEY);
     // TODO: add the support for rollup
     Preconditions.checkNotNull(mergeTypeString, "MergeType cannot be null");
 
-    MergeType mergeType = MergeType.fromString(mergeTypeString);
-    Preconditions.checkState(mergeType == MergeType.CONCATENATE, "Only 'CONCATENATE' mode is currently supported.");
-
-    String mergedSegmentName = configs.get(MinionConstants.MergeRollupTask.MERGED_SEGMENT_NAME_KEY);
     String tableNameWithType = configs.get(MinionConstants.TABLE_NAME_KEY);
-
     TableConfig tableConfig = getTableConfig(tableNameWithType);
+    Schema schema = getSchema(tableNameWithType);
+    Set<String> schemaColumns = schema.getPhysicalColumnNames();
+
+    Map<String, ValueAggregatorFactory.ValueAggregatorType> aggregatorConfigs =
+        MergeRollupTaskUtils.getRollupAggregationTypeMap(configs);
+    String numRecordsPerSegment = configs.get(MinionConstants.MergeRollupTask.MAX_NUM_RECORDS_PER_SEGMENT);
+
+    SegmentProcessorConfig.Builder segmentProcessorConfigBuilder =
+        new SegmentProcessorConfig.Builder().setTableConfig(tableConfig).setSchema(schema);
+
+    // Partition config from tableConfig
+    if (tableConfig.getIndexingConfig().getSegmentPartitionConfig() != null) {
+      Map<String, ColumnPartitionConfig> columnPartitionMap =
+          tableConfig.getIndexingConfig().getSegmentPartitionConfig().getColumnPartitionMap();
+      PartitionerConfig partitionerConfig = getPartitionerConfig(columnPartitionMap, tableNameWithType, schemaColumns);
+      segmentProcessorConfigBuilder.setPartitionerConfigs(Lists.newArrayList(partitionerConfig));
+    }
 
-    MergeRollupSegmentConverter rollupSegmentConverter =
-        new MergeRollupSegmentConverter.Builder().setMergeType(mergeType).setTableName(tableNameWithType)
-            .setSegmentName(mergedSegmentName).setInputIndexDirs(originalIndexDirs).setWorkingDir(workingDir)
-            .setTableConfig(tableConfig).build();
+    // Aggregations using configured Collector
+    List<String> sortedColumns = tableConfig.getIndexingConfig().getSortedColumn();
+    CollectorConfig collectorConfig =
+        getCollectorConfig(mergeTypeString, aggregatorConfigs, schemaColumns, sortedColumns);
+    Preconditions.checkState(collectorConfig.getCollectorType() == CollectorFactory.CollectorType.CONCAT,
+        "Only 'CONCAT' mode is currently supported.");
+    segmentProcessorConfigBuilder.setCollectorConfig(collectorConfig);
 
-    List<File> resultFiles = rollupSegmentConverter.convert();
+    // Segment config
+    if (numRecordsPerSegment != null) {
+      SegmentConfig segmentConfig = getSegmentConfig(numRecordsPerSegment);
+      segmentProcessorConfigBuilder.setSegmentConfig(segmentConfig);

Review comment:
       If we don't set `SegmentConfig` the framework will use the default one which has `DEFAULT_MAX_NUM_RECORDS_PER_SEGMENT = 5_000_000` 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] snleee commented on a change in pull request #6975: Merge rollup executor enhancement

Posted by GitBox <gi...@apache.org>.
snleee commented on a change in pull request #6975:
URL: https://github.com/apache/incubator-pinot/pull/6975#discussion_r644338809



##########
File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
##########
@@ -138,11 +149,32 @@ protected void postProcess(PinotTaskConfig pinotTaskConfig) {
             taskType + " on table: " + tableNameWithType + ", segments: " + inputSegmentNames + " got cancelled");
       }
 
+      // Update the segment lineage to indicate that the segment replacement is in progress.
+      String lineageEntryId = null;
+      if (replaceSegmentsEnabled) {
+        List<String> segmentsFrom =
+            Arrays.stream(inputSegmentNames.split(",")).map(String::trim).collect(Collectors.toList());
+        List<String> segmentsTo =
+            segmentConversionResults.stream().map(SegmentConversionResult::getSegmentName).collect(Collectors.toList());
+        lineageEntryId = SegmentConversionUtils
+            .startSegmentReplace(tableNameWithType, uploadURL, new StartReplaceSegmentsRequest(segmentsFrom, segmentsTo));
+      }
+
       // Upload the tarred segments
       for (int i = 0; i < numOutputSegments; i++) {
         File convertedTarredSegmentFile = tarredSegmentFiles.get(i);
         String resultSegmentName = segmentConversionResults.get(i).getSegmentName();
 
+        // Set segment ZK metadata custom map modifier into HTTP header to modify the segment ZK metadata
+        SegmentZKMetadataCustomMapModifier segmentZKMetadataCustomMapModifier = getSegmentZKMetadataCustomMapModifier(pinotTaskConfig);

Review comment:
       I think that we should pass `SegmentConversionResult` instead of `resultSegmentName`. There's a `customProperties` and we can probably use that field.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] snleee commented on a change in pull request #6975: Merge rollup executor enhancement

Posted by GitBox <gi...@apache.org>.
snleee commented on a change in pull request #6975:
URL: https://github.com/apache/incubator-pinot/pull/6975#discussion_r644338809



##########
File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
##########
@@ -138,11 +149,32 @@ protected void postProcess(PinotTaskConfig pinotTaskConfig) {
             taskType + " on table: " + tableNameWithType + ", segments: " + inputSegmentNames + " got cancelled");
       }
 
+      // Update the segment lineage to indicate that the segment replacement is in progress.
+      String lineageEntryId = null;
+      if (replaceSegmentsEnabled) {
+        List<String> segmentsFrom =
+            Arrays.stream(inputSegmentNames.split(",")).map(String::trim).collect(Collectors.toList());
+        List<String> segmentsTo =
+            segmentConversionResults.stream().map(SegmentConversionResult::getSegmentName).collect(Collectors.toList());
+        lineageEntryId = SegmentConversionUtils
+            .startSegmentReplace(tableNameWithType, uploadURL, new StartReplaceSegmentsRequest(segmentsFrom, segmentsTo));
+      }
+
       // Upload the tarred segments
       for (int i = 0; i < numOutputSegments; i++) {
         File convertedTarredSegmentFile = tarredSegmentFiles.get(i);
         String resultSegmentName = segmentConversionResults.get(i).getSegmentName();
 
+        // Set segment ZK metadata custom map modifier into HTTP header to modify the segment ZK metadata
+        SegmentZKMetadataCustomMapModifier segmentZKMetadataCustomMapModifier = getSegmentZKMetadataCustomMapModifier(pinotTaskConfig);

Review comment:
       I think that we should pass `SegmentConversionResult` instead of `resultSegmentName`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] snleee merged pull request #6975: Merge rollup executor enhancement

Posted by GitBox <gi...@apache.org>.
snleee merged pull request #6975:
URL: https://github.com/apache/incubator-pinot/pull/6975


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] jtao15 commented on a change in pull request #6975: Merge rollup executor enhancement

Posted by GitBox <gi...@apache.org>.
jtao15 commented on a change in pull request #6975:
URL: https://github.com/apache/incubator-pinot/pull/6975#discussion_r644327186



##########
File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
##########
@@ -138,11 +149,32 @@ protected void postProcess(PinotTaskConfig pinotTaskConfig) {
             taskType + " on table: " + tableNameWithType + ", segments: " + inputSegmentNames + " got cancelled");
       }
 
+      // Update the segment lineage to indicate that the segment replacement is in progress.
+      String lineageEntryId = null;
+      if (replaceSegmentsEnabled) {
+        List<String> segmentsFrom =
+            Arrays.stream(inputSegmentNames.split(",")).map(String::trim).collect(Collectors.toList());
+        List<String> segmentsTo =
+            segmentConversionResults.stream().map(SegmentConversionResult::getSegmentName).collect(Collectors.toList());
+        lineageEntryId = SegmentConversionUtils
+            .startSegmentReplace(tableNameWithType, uploadURL, new StartReplaceSegmentsRequest(segmentsFrom, segmentsTo));
+      }
+
       // Upload the tarred segments
       for (int i = 0; i < numOutputSegments; i++) {
         File convertedTarredSegmentFile = tarredSegmentFiles.get(i);
         String resultSegmentName = segmentConversionResults.get(i).getSegmentName();
 
+        // Set segment ZK metadata custom map modifier into HTTP header to modify the segment ZK metadata
+        SegmentZKMetadataCustomMapModifier segmentZKMetadataCustomMapModifier = getSegmentZKMetadataCustomMapModifier(pinotTaskConfig);

Review comment:
       Changed to pass the `resultSegmentName` in. `MergeRollupTaskExecutor` can decide to update the metadata or not based on the segment name once the time partitioning is introduced.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] jtao15 commented on a change in pull request #6975: Merge rollup executor enhancement

Posted by GitBox <gi...@apache.org>.
jtao15 commented on a change in pull request #6975:
URL: https://github.com/apache/incubator-pinot/pull/6975#discussion_r641859333



##########
File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/purge/PurgeTaskExecutor.java
##########
@@ -68,7 +68,7 @@ protected SegmentConversionResult convert(PinotTaskConfig pinotTaskConfig, File
 
   @Override
   protected SegmentZKMetadataCustomMapModifier getSegmentZKMetadataCustomMapModifier() {
-    return new SegmentZKMetadataCustomMapModifier(SegmentZKMetadataCustomMapModifier.ModifyMode.REPLACE, Collections
+    return new SegmentZKMetadataCustomMapModifier(SegmentZKMetadataCustomMapModifier.ModifyMode.UPDATE, Collections

Review comment:
       `ConvertToRawIndexMinionClusterIntegrationTest` is checking the map field of uploaded segment ZK metadata. The existing cluster integration tests (e.g. `OfflineClusterIntegrationTest`) will cover the upload api, but they are uploading segments without `SegmentZKMetadataCustomMapModifier`. Maybe I can modify one of the cluster integration test to upload segments with the modifier and add tests for it?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] snleee commented on a change in pull request #6975: Merge rollup executor enhancement

Posted by GitBox <gi...@apache.org>.
snleee commented on a change in pull request #6975:
URL: https://github.com/apache/incubator-pinot/pull/6975#discussion_r644371094



##########
File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
##########
@@ -64,6 +70,12 @@
       File workingDir)
       throws Exception;
 
+  /**
+   * Returns the segment ZK metadata custom map modifier.
+   */
+  protected abstract SegmentZKMetadataCustomMapModifier getSegmentZKMetadataCustomMapModifier(

Review comment:
       Can we move this to `BaseTaskExecutor`?
   
   We have the same abstract function defined in `BaseSingleSegmentConversionExecutor`. After we move this to `BaseTaskExecutor`, we can remove this from both `BaseSingleSegmentConversionExecutor` and `BaseMultipleSegmentsConversionExecutor`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] snleee commented on pull request #6975: Merge rollup executor enhancement

Posted by GitBox <gi...@apache.org>.
snleee commented on pull request #6975:
URL: https://github.com/apache/incubator-pinot/pull/6975#issuecomment-854092399


   LGTM. Thanks for addressing all the comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] jtao15 commented on a change in pull request #6975: Merge rollup executor enhancement

Posted by GitBox <gi...@apache.org>.
jtao15 commented on a change in pull request #6975:
URL: https://github.com/apache/incubator-pinot/pull/6975#discussion_r641842651



##########
File path: pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
##########
@@ -296,6 +309,21 @@ public static URI getUploadSegmentURI(URI controllerURI)
     return getURI(controllerURI.getScheme(), controllerURI.getHost(), controllerURI.getPort(), SEGMENT_PATH);
   }
 
+  public static URI getStartReplaceSegmentsURI(URI controllerURI, String rawTableName, String tableType)
+      throws URISyntaxException {
+    return getURI(controllerURI.getScheme(), controllerURI.getHost(), controllerURI.getPort(),

Review comment:
       Yes, the original `getUri()` will not work since `segments/mytable/startReplaceSegments?type=OFFLINE` is passed as `path`
   ```
   private static URI getURI(String protocol, String host, int port, String path)      
       throws URISyntaxException {    
     if (!SUPPORTED_PROTOCOLS.contains(protocol)) {      
       throw new IllegalArgumentException(String.format("Unsupported protocol '%s'", protocol));    
     }    
     return new URI(protocol, null, host, port, path, null, null);  
   }
   ```
   So I added the new function `getURI()` which will pass `segments/mytable/startReplaceSegments` as `path`, and `type=OFFLINE` as `query`.
   ```
   private static URI getURI(String protocol, String host, int port, String path, String query)      
       throws URISyntaxException {    
     if (!SUPPORTED_PROTOCOLS.contains(protocol)) {      
       throw new IllegalArgumentException(String.format("Unsupported protocol '%s'", protocol));    
     }    
     return new URI(protocol, null, host, port, path, query, null);  
   }
   ```
   I tried on local, it will return the correct uri.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] snleee commented on a change in pull request #6975: Merge rollup executor enhancement

Posted by GitBox <gi...@apache.org>.
snleee commented on a change in pull request #6975:
URL: https://github.com/apache/incubator-pinot/pull/6975#discussion_r641299163



##########
File path: pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
##########
@@ -172,7 +185,7 @@ public static URI getDeleteSegmentHttpUri(String host, int port, String rawTable
       String tableType)
       throws URISyntaxException {
     return new URI(StringUtil.join("/", StringUtils.chomp(HTTP + "://" + host + ":" + port, "/"), OLD_SEGMENT_PATH,
-        rawTableName + "/" + URIUtils.encode(segmentName) + TYPE_DELIMITER + tableType));
+        rawTableName + "/" + URIUtils.encode(segmentName) + "?" + TYPE_DELIMITER + tableType));

Review comment:
       This API looks to be deprecated. Why do we need this change? 

##########
File path: pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
##########
@@ -185,7 +198,7 @@ public static URI getRetrieveAllSegmentWithTableTypeHttpUri(String host, int por
       String tableType)
       throws URISyntaxException {
     return new URI(StringUtil.join("/", StringUtils.chomp(HTTP + "://" + host + ":" + port, "/"), OLD_SEGMENT_PATH,
-        rawTableName + TYPE_DELIMITER + tableType));
+        rawTableName + "?" + TYPE_DELIMITER + tableType));

Review comment:
       Same here

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
##########
@@ -46,6 +48,7 @@ private MinionConstants() {
   public static final String RETRY_SCALE_FACTOR_KEY = "retryScaleFactor";
 
   public static final String TABLE_MAX_NUM_TASKS_KEY = "tableMaxNumTasks";
+  public static final String REPLACE_SEGMENTS_KEY = "replaceSegments";

Review comment:
       `enableSegmentsReplacement`?
   

##########
File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
##########
@@ -138,11 +149,32 @@ protected void postProcess(PinotTaskConfig pinotTaskConfig) {
             taskType + " on table: " + tableNameWithType + ", segments: " + inputSegmentNames + " got cancelled");
       }
 
+      // Update the segment lineage to indicate that the segment replacement is in progress.
+      String lineageEntryId = null;
+      if (replaceSegmentsEnabled) {
+        List<String> segmentsFrom =
+            Arrays.stream(inputSegmentNames.split(",")).map(String::trim).collect(Collectors.toList());
+        List<String> segmentsTo =
+            segmentConversionResults.stream().map(SegmentConversionResult::getSegmentName).collect(Collectors.toList());
+        lineageEntryId = SegmentConversionUtils
+            .startSegmentReplace(tableNameWithType, uploadURL, new StartReplaceSegmentsRequest(segmentsFrom, segmentsTo));
+      }
+
       // Upload the tarred segments
       for (int i = 0; i < numOutputSegments; i++) {
         File convertedTarredSegmentFile = tarredSegmentFiles.get(i);
         String resultSegmentName = segmentConversionResults.get(i).getSegmentName();
 
+        // Set segment ZK metadata custom map modifier into HTTP header to modify the segment ZK metadata
+        SegmentZKMetadataCustomMapModifier segmentZKMetadataCustomMapModifier = getSegmentZKMetadataCustomMapModifier(pinotTaskConfig);
+        Header segmentZKMetadataCustomMapModifierHeader =
+            new BasicHeader(FileUploadDownloadClient.CustomHeaders.SEGMENT_ZK_METADATA_CUSTOM_MAP_MODIFIER,
+                segmentZKMetadataCustomMapModifier.toJsonString());
+
+        List<Header> httpHeaders = new ArrayList<>();
+        httpHeaders.add(segmentZKMetadataCustomMapModifierHeader);
+        httpHeaders.addAll(FileUploadDownloadClient.makeAuthHeader(authToken));

Review comment:
       Can you explain why we need this?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] codecov-commenter edited a comment on pull request #6975: Merge rollup executor enhancement

Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #6975:
URL: https://github.com/apache/incubator-pinot/pull/6975#issuecomment-848278058


   # [Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6975?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#6975](https://codecov.io/gh/apache/incubator-pinot/pull/6975?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (690e902) into [master](https://codecov.io/gh/apache/incubator-pinot/commit/d6154188ed401fdbdea8f78c9675fea0237bb6a6?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (d615418) will **decrease** coverage by `7.86%`.
   > The diff coverage is `42.79%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-pinot/pull/6975/graphs/tree.svg?width=650&height=150&src=pr&token=4ibza2ugkz&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/incubator-pinot/pull/6975?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #6975      +/-   ##
   ============================================
   - Coverage     73.31%   65.45%   -7.87%     
     Complexity       12       12              
   ============================================
     Files          1441     1445       +4     
     Lines         71450    71652     +202     
     Branches      10354    10376      +22     
   ============================================
   - Hits          52385    46898    -5487     
   - Misses        15541    21357    +5816     
   + Partials       3524     3397     -127     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | integration | `?` | |
   | unittests | `65.45% <42.79%> (-0.07%)` | :arrow_down: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-pinot/pull/6975?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...e/pinot/common/minion/MergeRollupTaskMetadata.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vbWluaW9uL01lcmdlUm9sbHVwVGFza01ldGFkYXRhLmphdmE=) | `0.00% <0.00%> (ø)` | |
   | [...e/pinot/common/minion/MinionTaskMetadataUtils.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vbWluaW9uL01pbmlvblRhc2tNZXRhZGF0YVV0aWxzLmphdmE=) | `0.00% <0.00%> (-75.00%)` | :arrow_down: |
   | [...troller/helix/core/minion/ClusterInfoAccessor.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29udHJvbGxlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29udHJvbGxlci9oZWxpeC9jb3JlL21pbmlvbi9DbHVzdGVySW5mb0FjY2Vzc29yLmphdmE=) | `32.14% <0.00%> (-47.86%)` | :arrow_down: |
   | [.../org/apache/pinot/core/common/MinionConstants.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9jb21tb24vTWluaW9uQ29uc3RhbnRzLmphdmE=) | `0.00% <ø> (ø)` | |
   | [...ore/minion/rollup/MergeRollupSegmentConverter.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9taW5pb24vcm9sbHVwL01lcmdlUm9sbHVwU2VnbWVudENvbnZlcnRlci5qYXZh) | `89.28% <ø> (ø)` | |
   | [.../tasks/BaseMultipleSegmentsConversionExecutor.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1taW5pb24tdGFza3MvcGlub3QtbWluaW9uLWJ1aWx0aW4tdGFza3Mvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3Bpbm90L3BsdWdpbi9taW5pb24vdGFza3MvQmFzZU11bHRpcGxlU2VnbWVudHNDb252ZXJzaW9uRXhlY3V0b3IuamF2YQ==) | `2.59% <0.00%> (-87.24%)` | :arrow_down: |
   | [...ot/plugin/minion/tasks/SegmentConversionUtils.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1taW5pb24tdGFza3MvcGlub3QtbWluaW9uLWJ1aWx0aW4tdGFza3Mvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3Bpbm90L3BsdWdpbi9taW5pb24vdGFza3MvU2VnbWVudENvbnZlcnNpb25VdGlscy5qYXZh) | `0.00% <0.00%> (-52.95%)` | :arrow_down: |
   | [...t/plugin/minion/tasks/purge/PurgeTaskExecutor.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1taW5pb24tdGFza3MvcGlub3QtbWluaW9uLWJ1aWx0aW4tdGFza3Mvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3Bpbm90L3BsdWdpbi9taW5pb24vdGFza3MvcHVyZ2UvUHVyZ2VUYXNrRXhlY3V0b3IuamF2YQ==) | `70.83% <0.00%> (ø)` | |
   | [...egments/RealtimeToOfflineSegmentsTaskExecutor.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1taW5pb24tdGFza3MvcGlub3QtbWluaW9uLWJ1aWx0aW4tdGFza3Mvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3Bpbm90L3BsdWdpbi9taW5pb24vdGFza3MvcmVhbHRpbWVfdG9fb2ZmbGluZV9zZWdtZW50cy9SZWFsdGltZVRvT2ZmbGluZVNlZ21lbnRzVGFza0V4ZWN1dG9yLmphdmE=) | `78.26% <0.00%> (-11.45%)` | :arrow_down: |
   | [...e/pinot/common/utils/FileUploadDownloadClient.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vdXRpbHMvRmlsZVVwbG9hZERvd25sb2FkQ2xpZW50LmphdmE=) | `19.46% <5.26%> (-38.91%)` | :arrow_down: |
   | ... and [353 more](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6975?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6975?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [d615418...690e902](https://codecov.io/gh/apache/incubator-pinot/pull/6975?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] jtao15 commented on a change in pull request #6975: Merge rollup executor enhancement

Posted by GitBox <gi...@apache.org>.
jtao15 commented on a change in pull request #6975:
URL: https://github.com/apache/incubator-pinot/pull/6975#discussion_r644326007



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/MergeRollupSegmentProcessorFramework.java
##########
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.framework;
+
+import com.google.common.base.Preconditions;
+import java.io.File;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.core.segment.processing.collector.CollectorConfig;
+import org.apache.pinot.core.segment.processing.collector.CollectorFactory;
+import org.apache.pinot.core.segment.processing.collector.ValueAggregatorFactory;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+
+
+/**
+ * Merge rollup segment processor framework takes a list of segments and concatenates/rolls up segments based on
+ * the configuration.
+ *
+ * TODO:
+ *   1. Add the support for roll-up
+ *   2. Add the support to make result segments time aligned
+ *   3. Add merge/roll-up prefixes for result segments
+ */
+public class MergeRollupSegmentProcessorFramework {
+  private List<File> _originalIndexDirs;
+  private final File _workingDir;
+  private final String _inputSegmentDir;
+  private final String _outputSegmentDir;
+  private final SegmentProcessorConfig _segmentProcessorConfig;
+
+  private MergeRollupSegmentProcessorFramework(TableConfig tableConfig, Schema schema, String mergeType,

Review comment:
       Done

##########
File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
##########
@@ -138,11 +149,32 @@ protected void postProcess(PinotTaskConfig pinotTaskConfig) {
             taskType + " on table: " + tableNameWithType + ", segments: " + inputSegmentNames + " got cancelled");
       }
 
+      // Update the segment lineage to indicate that the segment replacement is in progress.
+      String lineageEntryId = null;
+      if (replaceSegmentsEnabled) {
+        List<String> segmentsFrom =
+            Arrays.stream(inputSegmentNames.split(",")).map(String::trim).collect(Collectors.toList());
+        List<String> segmentsTo =
+            segmentConversionResults.stream().map(SegmentConversionResult::getSegmentName).collect(Collectors.toList());
+        lineageEntryId = SegmentConversionUtils
+            .startSegmentReplace(tableNameWithType, uploadURL, new StartReplaceSegmentsRequest(segmentsFrom, segmentsTo));
+      }
+
       // Upload the tarred segments
       for (int i = 0; i < numOutputSegments; i++) {
         File convertedTarredSegmentFile = tarredSegmentFiles.get(i);
         String resultSegmentName = segmentConversionResults.get(i).getSegmentName();
 
+        // Set segment ZK metadata custom map modifier into HTTP header to modify the segment ZK metadata
+        SegmentZKMetadataCustomMapModifier segmentZKMetadataCustomMapModifier = getSegmentZKMetadataCustomMapModifier(pinotTaskConfig);

Review comment:
       Changed to pass the `resultSegmentName` in. `MergeRollupTaskExecutor` can decide to update the metadata or not based on the segment name once the time partitioning is introduced.

##########
File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
##########
@@ -138,11 +149,32 @@ protected void postProcess(PinotTaskConfig pinotTaskConfig) {
             taskType + " on table: " + tableNameWithType + ", segments: " + inputSegmentNames + " got cancelled");
       }
 
+      // Update the segment lineage to indicate that the segment replacement is in progress.
+      String lineageEntryId = null;
+      if (replaceSegmentsEnabled) {
+        List<String> segmentsFrom =
+            Arrays.stream(inputSegmentNames.split(",")).map(String::trim).collect(Collectors.toList());
+        List<String> segmentsTo =
+            segmentConversionResults.stream().map(SegmentConversionResult::getSegmentName).collect(Collectors.toList());
+        lineageEntryId = SegmentConversionUtils
+            .startSegmentReplace(tableNameWithType, uploadURL, new StartReplaceSegmentsRequest(segmentsFrom, segmentsTo));
+      }
+
       // Upload the tarred segments
       for (int i = 0; i < numOutputSegments; i++) {
         File convertedTarredSegmentFile = tarredSegmentFiles.get(i);
         String resultSegmentName = segmentConversionResults.get(i).getSegmentName();
 
+        // Set segment ZK metadata custom map modifier into HTTP header to modify the segment ZK metadata
+        SegmentZKMetadataCustomMapModifier segmentZKMetadataCustomMapModifier = getSegmentZKMetadataCustomMapModifier(pinotTaskConfig);

Review comment:
       Updated.

##########
File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
##########
@@ -64,6 +70,12 @@
       File workingDir)
       throws Exception;
 
+  /**
+   * Returns the segment ZK metadata custom map modifier.
+   */
+  protected abstract SegmentZKMetadataCustomMapModifier getSegmentZKMetadataCustomMapModifier(

Review comment:
       Updated.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] jtao15 commented on a change in pull request #6975: Merge rollup executor enhancement

Posted by GitBox <gi...@apache.org>.
jtao15 commented on a change in pull request #6975:
URL: https://github.com/apache/incubator-pinot/pull/6975#discussion_r641846225



##########
File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/merge_rollup/MergeRollupTaskExecutor.java
##########
@@ -40,40 +53,133 @@
  * TODO:
  *   1. Add the support for roll-up
  *   2. Add the support for time split to provide backfill support for merged segments
- *   3. Change the way to decide the number of output segments (explicit numPartition config -> maxNumRowsPerSegment)
+ *   3. Add merge/rollup name prefixes for generated segments
+ *   4. Add the support for realtime table
  */
 public class MergeRollupTaskExecutor extends BaseMultipleSegmentsConversionExecutor {
   private static final Logger LOGGER = LoggerFactory.getLogger(MergeRollupTaskExecutor.class);
+  private static final String INPUT_SEGMENTS_DIR = "input_segments";
+  private static final String OUTPUT_SEGMENTS_DIR = "output_segments";
 
   @Override
   protected List<SegmentConversionResult> convert(PinotTaskConfig pinotTaskConfig, List<File> originalIndexDirs,

Review comment:
       Yes, will update this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] jtao15 commented on a change in pull request #6975: Merge rollup executor enhancement

Posted by GitBox <gi...@apache.org>.
jtao15 commented on a change in pull request #6975:
URL: https://github.com/apache/incubator-pinot/pull/6975#discussion_r644379499



##########
File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
##########
@@ -64,6 +70,12 @@
       File workingDir)
       throws Exception;
 
+  /**
+   * Returns the segment ZK metadata custom map modifier.
+   */
+  protected abstract SegmentZKMetadataCustomMapModifier getSegmentZKMetadataCustomMapModifier(

Review comment:
       Updated.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org