You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2021/05/25 19:08:33 UTC
[GitHub] [incubator-pinot] jtao15 opened a new pull request #6975: Merge rollup executor enhancement
jtao15 opened a new pull request #6975:
URL: https://github.com/apache/incubator-pinot/pull/6975
Part of #2715, depends on #6932.
Merge/Rollup task executor enhancement:
1. Wire SegmentProcessorFramework
2. Wire segment replacement protocol
3. Write bucket granularity info in resulted segment ZK metadata.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org
[GitHub] [incubator-pinot] snleee commented on a change in pull request #6975: Merge rollup executor enhancement
Posted by GitBox <gi...@apache.org>.
snleee commented on a change in pull request #6975:
URL: https://github.com/apache/incubator-pinot/pull/6975#discussion_r641728244
##########
File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
##########
@@ -138,11 +149,32 @@ protected void postProcess(PinotTaskConfig pinotTaskConfig) {
taskType + " on table: " + tableNameWithType + ", segments: " + inputSegmentNames + " got cancelled");
}
+ // Update the segment lineage to indicate that the segment replacement is in progress.
+ String lineageEntryId = null;
+ if (replaceSegmentsEnabled) {
+ List<String> segmentsFrom =
+ Arrays.stream(inputSegmentNames.split(",")).map(String::trim).collect(Collectors.toList());
+ List<String> segmentsTo =
+ segmentConversionResults.stream().map(SegmentConversionResult::getSegmentName).collect(Collectors.toList());
+ lineageEntryId = SegmentConversionUtils
+ .startSegmentReplace(tableNameWithType, uploadURL, new StartReplaceSegmentsRequest(segmentsFrom, segmentsTo));
+ }
+
// Upload the tarred segments
for (int i = 0; i < numOutputSegments; i++) {
File convertedTarredSegmentFile = tarredSegmentFiles.get(i);
String resultSegmentName = segmentConversionResults.get(i).getSegmentName();
+ // Set segment ZK metadata custom map modifier into HTTP header to modify the segment ZK metadata
+ SegmentZKMetadataCustomMapModifier segmentZKMetadataCustomMapModifier = getSegmentZKMetadataCustomMapModifier(pinotTaskConfig);
+ Header segmentZKMetadataCustomMapModifierHeader =
+ new BasicHeader(FileUploadDownloadClient.CustomHeaders.SEGMENT_ZK_METADATA_CUSTOM_MAP_MODIFIER,
+ segmentZKMetadataCustomMapModifier.toJsonString());
+
+ List<Header> httpHeaders = new ArrayList<>();
+ httpHeaders.add(segmentZKMetadataCustomMapModifierHeader);
+ httpHeaders.addAll(FileUploadDownloadClient.makeAuthHeader(authToken));
Review comment:
I'm asking about `authToken` :)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org
[GitHub] [incubator-pinot] snleee commented on a change in pull request #6975: Merge rollup executor enhancement
Posted by GitBox <gi...@apache.org>.
snleee commented on a change in pull request #6975:
URL: https://github.com/apache/incubator-pinot/pull/6975#discussion_r644255925
##########
File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/purge/PurgeTaskExecutor.java
##########
@@ -68,7 +68,7 @@ protected SegmentConversionResult convert(PinotTaskConfig pinotTaskConfig, File
@Override
protected SegmentZKMetadataCustomMapModifier getSegmentZKMetadataCustomMapModifier() {
- return new SegmentZKMetadataCustomMapModifier(SegmentZKMetadataCustomMapModifier.ModifyMode.REPLACE, Collections
+ return new SegmentZKMetadataCustomMapModifier(SegmentZKMetadataCustomMapModifier.ModifyMode.UPDATE, Collections
Review comment:
Let's add the test for `SegmentZKMetadataCustomMapModifier` in the future PR when we add the integration test for segment merge/roll-up task.
Let's add `TODO` comment (e.g. `BaseMultipleSegmentsConversionExecutor`)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org
[GitHub] [incubator-pinot] snleee commented on a change in pull request #6975: Merge rollup executor enhancement
Posted by GitBox <gi...@apache.org>.
snleee commented on a change in pull request #6975:
URL: https://github.com/apache/incubator-pinot/pull/6975#discussion_r644342355
##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/MergeRollupSegmentProcessor.java
##########
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.framework;
+
+import com.google.common.base.Preconditions;
+import java.io.File;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.core.segment.processing.collector.CollectorConfig;
+import org.apache.pinot.core.segment.processing.collector.CollectorFactory;
+import org.apache.pinot.core.segment.processing.collector.ValueAggregatorFactory;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+
+
+/**
+ * Merge rollup segment processor framework takes a list of segments and concatenates/rolls up segments based on
+ * the configuration.
+ *
+ * TODO:
+ * 1. Add the support for roll-up
+ * 2. Add the support to make result segments time aligned
+ * 3. Add merge/roll-up prefixes for result segments
+ */
+public class MergeRollupSegmentProcessor {
Review comment:
Let's actually put this under `org.apche.pinot.core.minion` and Rename it to `MergeRollupConverter`.
We can delete `org.apche.pinot.core.minion.rollup` and `org.apche.pinot.core.minion.segment` in the following PR as a clean-up.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org
[GitHub] [incubator-pinot] jtao15 commented on a change in pull request #6975: Merge rollup executor enhancement
Posted by GitBox <gi...@apache.org>.
jtao15 commented on a change in pull request #6975:
URL: https://github.com/apache/incubator-pinot/pull/6975#discussion_r644326007
##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/MergeRollupSegmentProcessorFramework.java
##########
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.framework;
+
+import com.google.common.base.Preconditions;
+import java.io.File;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.core.segment.processing.collector.CollectorConfig;
+import org.apache.pinot.core.segment.processing.collector.CollectorFactory;
+import org.apache.pinot.core.segment.processing.collector.ValueAggregatorFactory;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+
+
+/**
+ * Merge rollup segment processor framework takes a list of segments and concatenates/rolls up segments based on
+ * the configuration.
+ *
+ * TODO:
+ * 1. Add the support for roll-up
+ * 2. Add the support to make result segments time aligned
+ * 3. Add merge/roll-up prefixes for result segments
+ */
+public class MergeRollupSegmentProcessorFramework {
+ private List<File> _originalIndexDirs;
+ private final File _workingDir;
+ private final String _inputSegmentDir;
+ private final String _outputSegmentDir;
+ private final SegmentProcessorConfig _segmentProcessorConfig;
+
+ private MergeRollupSegmentProcessorFramework(TableConfig tableConfig, Schema schema, String mergeType,
Review comment:
Done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org
[GitHub] [incubator-pinot] jtao15 commented on a change in pull request #6975: Merge rollup executor enhancement
Posted by GitBox <gi...@apache.org>.
jtao15 commented on a change in pull request #6975:
URL: https://github.com/apache/incubator-pinot/pull/6975#discussion_r641733204
##########
File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
##########
@@ -138,11 +149,32 @@ protected void postProcess(PinotTaskConfig pinotTaskConfig) {
taskType + " on table: " + tableNameWithType + ", segments: " + inputSegmentNames + " got cancelled");
}
+ // Update the segment lineage to indicate that the segment replacement is in progress.
+ String lineageEntryId = null;
+ if (replaceSegmentsEnabled) {
+ List<String> segmentsFrom =
+ Arrays.stream(inputSegmentNames.split(",")).map(String::trim).collect(Collectors.toList());
+ List<String> segmentsTo =
+ segmentConversionResults.stream().map(SegmentConversionResult::getSegmentName).collect(Collectors.toList());
+ lineageEntryId = SegmentConversionUtils
+ .startSegmentReplace(tableNameWithType, uploadURL, new StartReplaceSegmentsRequest(segmentsFrom, segmentsTo));
+ }
+
// Upload the tarred segments
for (int i = 0; i < numOutputSegments; i++) {
File convertedTarredSegmentFile = tarredSegmentFiles.get(i);
String resultSegmentName = segmentConversionResults.get(i).getSegmentName();
+ // Set segment ZK metadata custom map modifier into HTTP header to modify the segment ZK metadata
+ SegmentZKMetadataCustomMapModifier segmentZKMetadataCustomMapModifier = getSegmentZKMetadataCustomMapModifier(pinotTaskConfig);
+ Header segmentZKMetadataCustomMapModifierHeader =
+ new BasicHeader(FileUploadDownloadClient.CustomHeaders.SEGMENT_ZK_METADATA_CUSTOM_MAP_MODIFIER,
+ segmentZKMetadataCustomMapModifier.toJsonString());
+
+ List<Header> httpHeaders = new ArrayList<>();
+ httpHeaders.add(segmentZKMetadataCustomMapModifierHeader);
+ httpHeaders.addAll(FileUploadDownloadClient.makeAuthHeader(authToken));
Review comment:
Oh, `authToken` was passed before as well.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org
[GitHub] [incubator-pinot] codecov-commenter edited a comment on pull request #6975: Merge rollup executor enhancement
Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #6975:
URL: https://github.com/apache/incubator-pinot/pull/6975#issuecomment-848278058
# [Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6975?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
> Merging [#6975](https://codecov.io/gh/apache/incubator-pinot/pull/6975?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (ea4d245) into [master](https://codecov.io/gh/apache/incubator-pinot/commit/4944731579de1b17be23404618e1bb4dd8e5e511?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (4944731) will **decrease** coverage by `0.08%`.
> The diff coverage is `59.21%`.
[![Impacted file tree graph](https://codecov.io/gh/apache/incubator-pinot/pull/6975/graphs/tree.svg?width=650&height=150&src=pr&token=4ibza2ugkz&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/incubator-pinot/pull/6975?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
```diff
@@ Coverage Diff @@
## master #6975 +/- ##
============================================
- Coverage 73.20% 73.11% -0.09%
Complexity 12 12
============================================
Files 1451 1452 +1
Lines 71832 71969 +137
Branches 10415 10423 +8
============================================
+ Hits 52585 52621 +36
- Misses 15715 15817 +102
+ Partials 3532 3531 -1
```
| Flag | Coverage Δ | |
|---|---|---|
| integration | `41.73% <10.52%> (-0.17%)` | :arrow_down: |
| unittests | `65.37% <48.68%> (+<0.01%)` | :arrow_up: |
Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
| [Impacted Files](https://codecov.io/gh/apache/incubator-pinot/pull/6975?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
|---|---|---|
| [.../org/apache/pinot/core/common/MinionConstants.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9jb21tb24vTWluaW9uQ29uc3RhbnRzLmphdmE=) | `0.00% <ø> (ø)` | |
| [...he/pinot/plugin/minion/tasks/BaseTaskExecutor.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1taW5pb24tdGFza3MvcGlub3QtbWluaW9uLWJ1aWx0aW4tdGFza3Mvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3Bpbm90L3BsdWdpbi9taW5pb24vdGFza3MvQmFzZVRhc2tFeGVjdXRvci5qYXZh) | `83.33% <ø> (ø)` | |
| [...ot/plugin/minion/tasks/SegmentConversionUtils.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1taW5pb24tdGFza3MvcGlub3QtbWluaW9uLWJ1aWx0aW4tdGFza3Mvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3Bpbm90L3BsdWdpbi9taW5pb24vdGFza3MvU2VnbWVudENvbnZlcnNpb25VdGlscy5qYXZh) | `32.14% <0.00%> (-20.80%)` | :arrow_down: |
| [...rt\_to\_raw\_index/ConvertToRawIndexTaskExecutor.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1taW5pb24tdGFza3MvcGlub3QtbWluaW9uLWJ1aWx0aW4tdGFza3Mvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3Bpbm90L3BsdWdpbi9taW5pb24vdGFza3MvY29udmVydF90b19yYXdfaW5kZXgvQ29udmVydFRvUmF3SW5kZXhUYXNrRXhlY3V0b3IuamF2YQ==) | `100.00% <ø> (ø)` | |
| [...t/plugin/minion/tasks/purge/PurgeTaskExecutor.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1taW5pb24tdGFza3MvcGlub3QtbWluaW9uLWJ1aWx0aW4tdGFza3Mvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3Bpbm90L3BsdWdpbi9taW5pb24vdGFza3MvcHVyZ2UvUHVyZ2VUYXNrRXhlY3V0b3IuamF2YQ==) | `70.83% <0.00%> (ø)` | |
| [...and\_push/SegmentGenerationAndPushTaskExecutor.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1taW5pb24tdGFza3MvcGlub3QtbWluaW9uLWJ1aWx0aW4tdGFza3Mvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3Bpbm90L3BsdWdpbi9taW5pb24vdGFza3Mvc2VnbWVudF9nZW5lcmF0aW9uX2FuZF9wdXNoL1NlZ21lbnRHZW5lcmF0aW9uQW5kUHVzaFRhc2tFeGVjdXRvci5qYXZh) | `0.00% <0.00%> (ø)` | |
| [...e/pinot/common/utils/FileUploadDownloadClient.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vdXRpbHMvRmlsZVVwbG9hZERvd25sb2FkQ2xpZW50LmphdmE=) | `53.98% <5.26%> (-4.40%)` | :arrow_down: |
| [.../tasks/BaseMultipleSegmentsConversionExecutor.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1taW5pb24tdGFza3MvcGlub3QtbWluaW9uLWJ1aWx0aW4tdGFza3Mvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3Bpbm90L3BsdWdpbi9taW5pb24vdGFza3MvQmFzZU11bHRpcGxlU2VnbWVudHNDb252ZXJzaW9uRXhlY3V0b3IuamF2YQ==) | `81.01% <57.14%> (-8.82%)` | :arrow_down: |
| [...on/tasks/merge\_rollup/MergeRollupTaskExecutor.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1taW5pb24tdGFza3MvcGlub3QtbWluaW9uLWJ1aWx0aW4tdGFza3Mvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3Bpbm90L3BsdWdpbi9taW5pb24vdGFza3MvbWVyZ2Vfcm9sbHVwL01lcmdlUm9sbHVwVGFza0V4ZWN1dG9yLmphdmE=) | `88.57% <82.60%> (-6.89%)` | :arrow_down: |
| [...apache/pinot/core/minion/MergeRollupConverter.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9taW5pb24vTWVyZ2VSb2xsdXBDb252ZXJ0ZXIuamF2YQ==) | `88.52% <88.52%> (ø)` | |
| ... and [37 more](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
------
[Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6975?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
> **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
> `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
> Powered by [Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6975?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [4944731...ea4d245](https://codecov.io/gh/apache/incubator-pinot/pull/6975?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org
[GitHub] [incubator-pinot] snleee commented on a change in pull request #6975: Merge rollup executor enhancement
Posted by GitBox <gi...@apache.org>.
snleee commented on a change in pull request #6975:
URL: https://github.com/apache/incubator-pinot/pull/6975#discussion_r644252489
##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/MergeRollupSegmentProcessorFramework.java
##########
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.framework;
+
+import com.google.common.base.Preconditions;
+import java.io.File;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.core.segment.processing.collector.CollectorConfig;
+import org.apache.pinot.core.segment.processing.collector.CollectorFactory;
+import org.apache.pinot.core.segment.processing.collector.ValueAggregatorFactory;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+
+
+/**
+ * Merge rollup segment processor framework takes a list of segments and concatenates/rolls up segments based on
+ * the configuration.
+ *
+ * TODO:
+ * 1. Add the support for roll-up
+ * 2. Add the support to make result segments time aligned
+ * 3. Add merge/roll-up prefixes for result segments
+ */
+public class MergeRollupSegmentProcessorFramework {
+ private List<File> _originalIndexDirs;
+ private final File _workingDir;
+ private final String _inputSegmentDir;
+ private final String _outputSegmentDir;
+ private final SegmentProcessorConfig _segmentProcessorConfig;
+
+ private MergeRollupSegmentProcessorFramework(TableConfig tableConfig, Schema schema, String mergeType,
Review comment:
Let's rename this to `MergeRollupSegmentProcessor`.
##########
File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
##########
@@ -138,11 +149,32 @@ protected void postProcess(PinotTaskConfig pinotTaskConfig) {
taskType + " on table: " + tableNameWithType + ", segments: " + inputSegmentNames + " got cancelled");
}
+ // Update the segment lineage to indicate that the segment replacement is in progress.
+ String lineageEntryId = null;
+ if (replaceSegmentsEnabled) {
+ List<String> segmentsFrom =
+ Arrays.stream(inputSegmentNames.split(",")).map(String::trim).collect(Collectors.toList());
+ List<String> segmentsTo =
+ segmentConversionResults.stream().map(SegmentConversionResult::getSegmentName).collect(Collectors.toList());
+ lineageEntryId = SegmentConversionUtils
+ .startSegmentReplace(tableNameWithType, uploadURL, new StartReplaceSegmentsRequest(segmentsFrom, segmentsTo));
+ }
+
// Upload the tarred segments
for (int i = 0; i < numOutputSegments; i++) {
File convertedTarredSegmentFile = tarredSegmentFiles.get(i);
String resultSegmentName = segmentConversionResults.get(i).getSegmentName();
+ // Set segment ZK metadata custom map modifier into HTTP header to modify the segment ZK metadata
+ SegmentZKMetadataCustomMapModifier segmentZKMetadataCustomMapModifier = getSegmentZKMetadataCustomMapModifier(pinotTaskConfig);
Review comment:
Does this mean that we will always modify the segmentZKMetadata?
How do we control when to modify when not to modify? (e.g. based on our design, there are some cases that we should not mark the granularity)
##########
File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/purge/PurgeTaskExecutor.java
##########
@@ -68,7 +68,7 @@ protected SegmentConversionResult convert(PinotTaskConfig pinotTaskConfig, File
@Override
protected SegmentZKMetadataCustomMapModifier getSegmentZKMetadataCustomMapModifier() {
- return new SegmentZKMetadataCustomMapModifier(SegmentZKMetadataCustomMapModifier.ModifyMode.REPLACE, Collections
+ return new SegmentZKMetadataCustomMapModifier(SegmentZKMetadataCustomMapModifier.ModifyMode.UPDATE, Collections
Review comment:
Let's add the test for `SegmentZKMetadataCustomMapModifier` in the future PR when we add the integration test for segment merge/roll-up task.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org
[GitHub] [incubator-pinot] jtao15 commented on a change in pull request #6975: Merge rollup executor enhancement
Posted by GitBox <gi...@apache.org>.
jtao15 commented on a change in pull request #6975:
URL: https://github.com/apache/incubator-pinot/pull/6975#discussion_r641692165
##########
File path: pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
##########
@@ -172,7 +185,7 @@ public static URI getDeleteSegmentHttpUri(String host, int port, String rawTable
String tableType)
throws URISyntaxException {
return new URI(StringUtil.join("/", StringUtils.chomp(HTTP + "://" + host + ":" + port, "/"), OLD_SEGMENT_PATH,
- rawTableName + "/" + URIUtils.encode(segmentName) + TYPE_DELIMITER + tableType));
+ rawTableName + "/" + URIUtils.encode(segmentName) + "?" + TYPE_DELIMITER + tableType));
Review comment:
I'm changing `TYPE_DELIMITER` because I need to use `type=` instead of `?type=`. The current `getURI(String protocol, String host, int port, String path)` will quote `?`, I created another `getURI()` function to get the correct uri for start/end segment replacement api. Maybe we can merge theses two `getURI()` function.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org
[GitHub] [incubator-pinot] codecov-commenter commented on pull request #6975: Merge rollup executor enhancement
Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on pull request #6975:
URL: https://github.com/apache/incubator-pinot/pull/6975#issuecomment-848278058
# [Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6975?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
> Merging [#6975](https://codecov.io/gh/apache/incubator-pinot/pull/6975?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (374ba8e) into [master](https://codecov.io/gh/apache/incubator-pinot/commit/3c7dfcd8d65bbfeceb4af77d23298515251eaae6?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (3c7dfcd) will **decrease** coverage by `0.04%`.
> The diff coverage is `27.98%`.
[![Impacted file tree graph](https://codecov.io/gh/apache/incubator-pinot/pull/6975/graphs/tree.svg?width=650&height=150&src=pr&token=4ibza2ugkz&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/incubator-pinot/pull/6975?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
```diff
@@ Coverage Diff @@
## master #6975 +/- ##
============================================
- Coverage 73.23% 73.19% -0.05%
Complexity 12 12
============================================
Files 1437 1441 +4
Lines 71320 71523 +203
Branches 10332 10354 +22
============================================
+ Hits 52230 52349 +119
- Misses 15586 15661 +75
- Partials 3504 3513 +9
```
| Flag | Coverage Δ | |
|---|---|---|
| integration | `41.94% <5.04%> (+0.02%)` | :arrow_up: |
| unittests | `65.21% <22.93%> (-0.10%)` | :arrow_down: |
Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
| [Impacted Files](https://codecov.io/gh/apache/incubator-pinot/pull/6975?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
|---|---|---|
| [...e/pinot/common/minion/MergeRollupTaskMetadata.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vbWluaW9uL01lcmdlUm9sbHVwVGFza01ldGFkYXRhLmphdmE=) | `0.00% <0.00%> (ø)` | |
| [...e/pinot/common/minion/MinionTaskMetadataUtils.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vbWluaW9uL01pbmlvblRhc2tNZXRhZGF0YVV0aWxzLmphdmE=) | `50.00% <0.00%> (-25.00%)` | :arrow_down: |
| [...troller/helix/core/minion/ClusterInfoAccessor.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29udHJvbGxlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29udHJvbGxlci9oZWxpeC9jb3JlL21pbmlvbi9DbHVzdGVySW5mb0FjY2Vzc29yLmphdmE=) | `71.42% <0.00%> (-8.58%)` | :arrow_down: |
| [.../org/apache/pinot/core/common/MinionConstants.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9jb21tb24vTWluaW9uQ29uc3RhbnRzLmphdmE=) | `0.00% <ø> (ø)` | |
| [...ot/plugin/minion/tasks/SegmentConversionUtils.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1taW5pb24tdGFza3MvcGlub3QtbWluaW9uLWJ1aWx0aW4tdGFza3Mvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3Bpbm90L3BsdWdpbi9taW5pb24vdGFza3MvU2VnbWVudENvbnZlcnNpb25VdGlscy5qYXZh) | `32.14% <0.00%> (-20.80%)` | :arrow_down: |
| [...t/plugin/minion/tasks/purge/PurgeTaskExecutor.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1taW5pb24tdGFza3MvcGlub3QtbWluaW9uLWJ1aWx0aW4tdGFza3Mvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3Bpbm90L3BsdWdpbi9taW5pb24vdGFza3MvcHVyZ2UvUHVyZ2VUYXNrRXhlY3V0b3IuamF2YQ==) | `70.83% <0.00%> (ø)` | |
| [...e/pinot/common/utils/FileUploadDownloadClient.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vdXRpbHMvRmlsZVVwbG9hZERvd25sb2FkQ2xpZW50LmphdmE=) | `53.98% <5.26%> (-4.40%)` | :arrow_down: |
| [...inion/tasks/merge\_rollup/MergeRollupTaskUtils.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1taW5pb24tdGFza3MvcGlub3QtbWluaW9uLWJ1aWx0aW4tdGFza3Mvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3Bpbm90L3BsdWdpbi9taW5pb24vdGFza3MvbWVyZ2Vfcm9sbHVwL01lcmdlUm9sbHVwVGFza1V0aWxzLmphdmE=) | `11.11% <11.11%> (ø)` | |
| [.../tasks/BaseMultipleSegmentsConversionExecutor.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1taW5pb24tdGFza3MvcGlub3QtbWluaW9uLWJ1aWx0aW4tdGFza3Mvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3Bpbm90L3BsdWdpbi9taW5pb24vdGFza3MvQmFzZU11bHRpcGxlU2VnbWVudHNDb252ZXJzaW9uRXhlY3V0b3IuamF2YQ==) | `80.51% <50.00%> (-9.32%)` | :arrow_down: |
| [...apache/pinot/core/minion/rollup/CollectorType.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9taW5pb24vcm9sbHVwL0NvbGxlY3RvclR5cGUuamF2YQ==) | `60.00% <57.14%> (ø)` | |
| ... and [40 more](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
------
[Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6975?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
> **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
> `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
> Powered by [Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6975?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [3c7dfcd...374ba8e](https://codecov.io/gh/apache/incubator-pinot/pull/6975?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org
[GitHub] [incubator-pinot] snleee commented on a change in pull request #6975: Merge rollup executor enhancement
Posted by GitBox <gi...@apache.org>.
snleee commented on a change in pull request #6975:
URL: https://github.com/apache/incubator-pinot/pull/6975#discussion_r641760045
##########
File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/merge_rollup/MergeRollupTaskExecutor.java
##########
@@ -40,40 +53,133 @@
* TODO:
* 1. Add the support for roll-up
* 2. Add the support for time split to provide backfill support for merged segments
- * 3. Change the way to decide the number of output segments (explicit numPartition config -> maxNumRowsPerSegment)
+ * 3. Add merge/rollup name prefixes for generated segments
+ * 4. Add the support for realtime table
*/
public class MergeRollupTaskExecutor extends BaseMultipleSegmentsConversionExecutor {
private static final Logger LOGGER = LoggerFactory.getLogger(MergeRollupTaskExecutor.class);
+ private static final String INPUT_SEGMENTS_DIR = "input_segments";
+ private static final String OUTPUT_SEGMENTS_DIR = "output_segments";
@Override
protected List<SegmentConversionResult> convert(PinotTaskConfig pinotTaskConfig, List<File> originalIndexDirs,
File workingDir)
throws Exception {
+ String taskType = pinotTaskConfig.getTaskType();
Map<String, String> configs = pinotTaskConfig.getConfigs();
+ LOGGER.info("Starting task: {} with configs: {}", taskType, configs);
+ long startMillis = System.currentTimeMillis();
+
String mergeTypeString = configs.get(MinionConstants.MergeRollupTask.MERGE_TYPE_KEY);
// TODO: add the support for rollup
Preconditions.checkNotNull(mergeTypeString, "MergeType cannot be null");
- MergeType mergeType = MergeType.fromString(mergeTypeString);
- Preconditions.checkState(mergeType == MergeType.CONCATENATE, "Only 'CONCATENATE' mode is currently supported.");
-
- String mergedSegmentName = configs.get(MinionConstants.MergeRollupTask.MERGED_SEGMENT_NAME_KEY);
String tableNameWithType = configs.get(MinionConstants.TABLE_NAME_KEY);
-
TableConfig tableConfig = getTableConfig(tableNameWithType);
+ Schema schema = getSchema(tableNameWithType);
+ Set<String> schemaColumns = schema.getPhysicalColumnNames();
+
+ Map<String, ValueAggregatorFactory.ValueAggregatorType> aggregatorConfigs =
+ MergeRollupTaskUtils.getRollupAggregationTypeMap(configs);
+ String numRecordsPerSegment = configs.get(MinionConstants.MergeRollupTask.MAX_NUM_RECORDS_PER_SEGMENT);
+
+ SegmentProcessorConfig.Builder segmentProcessorConfigBuilder =
+ new SegmentProcessorConfig.Builder().setTableConfig(tableConfig).setSchema(schema);
+
+ // Partition config from tableConfig
+ if (tableConfig.getIndexingConfig().getSegmentPartitionConfig() != null) {
+ Map<String, ColumnPartitionConfig> columnPartitionMap =
+ tableConfig.getIndexingConfig().getSegmentPartitionConfig().getColumnPartitionMap();
+ PartitionerConfig partitionerConfig = getPartitionerConfig(columnPartitionMap, tableNameWithType, schemaColumns);
+ segmentProcessorConfigBuilder.setPartitionerConfigs(Lists.newArrayList(partitionerConfig));
+ }
- MergeRollupSegmentConverter rollupSegmentConverter =
- new MergeRollupSegmentConverter.Builder().setMergeType(mergeType).setTableName(tableNameWithType)
- .setSegmentName(mergedSegmentName).setInputIndexDirs(originalIndexDirs).setWorkingDir(workingDir)
- .setTableConfig(tableConfig).build();
+ // Aggregations using configured Collector
+ List<String> sortedColumns = tableConfig.getIndexingConfig().getSortedColumn();
+ CollectorConfig collectorConfig =
+ getCollectorConfig(mergeTypeString, aggregatorConfigs, schemaColumns, sortedColumns);
+ Preconditions.checkState(collectorConfig.getCollectorType() == CollectorFactory.CollectorType.CONCAT,
+ "Only 'CONCAT' mode is currently supported.");
+ segmentProcessorConfigBuilder.setCollectorConfig(collectorConfig);
- List<File> resultFiles = rollupSegmentConverter.convert();
+ // Segment config
+ if (numRecordsPerSegment != null) {
+ SegmentConfig segmentConfig = getSegmentConfig(numRecordsPerSegment);
+ segmentProcessorConfigBuilder.setSegmentConfig(segmentConfig);
+ }
+
+ SegmentProcessorConfig segmentProcessorConfig = segmentProcessorConfigBuilder.build();
+
+ File inputSegmentsDir = new File(workingDir, INPUT_SEGMENTS_DIR);
+ Preconditions.checkState(inputSegmentsDir.mkdirs(), "Failed to create input directory: %s for task: %s",
+ inputSegmentsDir.getAbsolutePath(), taskType);
+ for (File indexDir : originalIndexDirs) {
+ FileUtils.copyDirectoryToDirectory(indexDir, inputSegmentsDir);
+ }
+ File outputSegmentsDir = new File(workingDir, OUTPUT_SEGMENTS_DIR);
+ Preconditions.checkState(outputSegmentsDir.mkdirs(), "Failed to create output directory: %s for task: %s",
+ outputSegmentsDir.getAbsolutePath(), taskType);
+
+ SegmentProcessorFramework segmentProcessorFramework =
+ new SegmentProcessorFramework(inputSegmentsDir, segmentProcessorConfig, outputSegmentsDir);
+ try {
+ segmentProcessorFramework.processSegments();
+ } finally {
+ segmentProcessorFramework.cleanup();
+ }
+
+ long endMillis = System.currentTimeMillis();
+ LOGGER.info("Finished task: {} with configs: {}. Total time: {}ms", taskType, configs, (endMillis - startMillis));
List<SegmentConversionResult> results = new ArrayList<>();
- for (File file : resultFiles) {
+ for (File file : outputSegmentsDir.listFiles()) {
String outputSegmentName = file.getName();
results.add(new SegmentConversionResult.Builder().setFile(file).setSegmentName(outputSegmentName)
.setTableNameWithType(tableNameWithType).build());
}
return results;
}
+
+ @Override
+ protected SegmentZKMetadataCustomMapModifier getSegmentZKMetadataCustomMapModifier(PinotTaskConfig pinotTaskConfig) {
+ return new SegmentZKMetadataCustomMapModifier(SegmentZKMetadataCustomMapModifier.ModifyMode.UPDATE, Collections
+ .singletonMap(MinionConstants.MergeRollupTask.TASK_TYPE + MinionConstants.TASK_BUCKET_GRANULARITY_SUFFIX,
+ pinotTaskConfig.getConfigs().get(MinionConstants.MergeRollupTask.GRANULARITY_KEY)));
Review comment:
Do we have a guarantee that granularity values are always upper case (e.g. `DAILY`, `HOURLY`) ?
I think that the following input should all work: `hourly, Hourly, HOURLY...etc`
##########
File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/merge_rollup/MergeRollupTaskExecutor.java
##########
@@ -40,40 +53,133 @@
* TODO:
* 1. Add the support for roll-up
* 2. Add the support for time split to provide backfill support for merged segments
- * 3. Change the way to decide the number of output segments (explicit numPartition config -> maxNumRowsPerSegment)
+ * 3. Add merge/rollup name prefixes for generated segments
+ * 4. Add the support for realtime table
*/
public class MergeRollupTaskExecutor extends BaseMultipleSegmentsConversionExecutor {
private static final Logger LOGGER = LoggerFactory.getLogger(MergeRollupTaskExecutor.class);
+ private static final String INPUT_SEGMENTS_DIR = "input_segments";
+ private static final String OUTPUT_SEGMENTS_DIR = "output_segments";
@Override
protected List<SegmentConversionResult> convert(PinotTaskConfig pinotTaskConfig, List<File> originalIndexDirs,
File workingDir)
throws Exception {
+ String taskType = pinotTaskConfig.getTaskType();
Map<String, String> configs = pinotTaskConfig.getConfigs();
+ LOGGER.info("Starting task: {} with configs: {}", taskType, configs);
+ long startMillis = System.currentTimeMillis();
+
String mergeTypeString = configs.get(MinionConstants.MergeRollupTask.MERGE_TYPE_KEY);
// TODO: add the support for rollup
Preconditions.checkNotNull(mergeTypeString, "MergeType cannot be null");
- MergeType mergeType = MergeType.fromString(mergeTypeString);
- Preconditions.checkState(mergeType == MergeType.CONCATENATE, "Only 'CONCATENATE' mode is currently supported.");
-
- String mergedSegmentName = configs.get(MinionConstants.MergeRollupTask.MERGED_SEGMENT_NAME_KEY);
String tableNameWithType = configs.get(MinionConstants.TABLE_NAME_KEY);
-
TableConfig tableConfig = getTableConfig(tableNameWithType);
+ Schema schema = getSchema(tableNameWithType);
+ Set<String> schemaColumns = schema.getPhysicalColumnNames();
+
+ Map<String, ValueAggregatorFactory.ValueAggregatorType> aggregatorConfigs =
+ MergeRollupTaskUtils.getRollupAggregationTypeMap(configs);
+ String numRecordsPerSegment = configs.get(MinionConstants.MergeRollupTask.MAX_NUM_RECORDS_PER_SEGMENT);
+
+ SegmentProcessorConfig.Builder segmentProcessorConfigBuilder =
+ new SegmentProcessorConfig.Builder().setTableConfig(tableConfig).setSchema(schema);
+
+ // Partition config from tableConfig
+ if (tableConfig.getIndexingConfig().getSegmentPartitionConfig() != null) {
+ Map<String, ColumnPartitionConfig> columnPartitionMap =
+ tableConfig.getIndexingConfig().getSegmentPartitionConfig().getColumnPartitionMap();
+ PartitionerConfig partitionerConfig = getPartitionerConfig(columnPartitionMap, tableNameWithType, schemaColumns);
+ segmentProcessorConfigBuilder.setPartitionerConfigs(Lists.newArrayList(partitionerConfig));
+ }
- MergeRollupSegmentConverter rollupSegmentConverter =
- new MergeRollupSegmentConverter.Builder().setMergeType(mergeType).setTableName(tableNameWithType)
- .setSegmentName(mergedSegmentName).setInputIndexDirs(originalIndexDirs).setWorkingDir(workingDir)
- .setTableConfig(tableConfig).build();
+ // Aggregations using configured Collector
+ List<String> sortedColumns = tableConfig.getIndexingConfig().getSortedColumn();
+ CollectorConfig collectorConfig =
+ getCollectorConfig(mergeTypeString, aggregatorConfigs, schemaColumns, sortedColumns);
+ Preconditions.checkState(collectorConfig.getCollectorType() == CollectorFactory.CollectorType.CONCAT,
+ "Only 'CONCAT' mode is currently supported.");
+ segmentProcessorConfigBuilder.setCollectorConfig(collectorConfig);
- List<File> resultFiles = rollupSegmentConverter.convert();
+ // Segment config
+ if (numRecordsPerSegment != null) {
Review comment:
Let's parse the string to integer here instead of passing string value to `getSegmentConfig()`. Also, we can use `1000000` as the default value.
##########
File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/merge_rollup/MergeRollupTaskExecutor.java
##########
@@ -40,40 +53,133 @@
* TODO:
* 1. Add the support for roll-up
* 2. Add the support for time split to provide backfill support for merged segments
- * 3. Change the way to decide the number of output segments (explicit numPartition config -> maxNumRowsPerSegment)
+ * 3. Add merge/rollup name prefixes for generated segments
+ * 4. Add the support for realtime table
*/
public class MergeRollupTaskExecutor extends BaseMultipleSegmentsConversionExecutor {
private static final Logger LOGGER = LoggerFactory.getLogger(MergeRollupTaskExecutor.class);
+ private static final String INPUT_SEGMENTS_DIR = "input_segments";
+ private static final String OUTPUT_SEGMENTS_DIR = "output_segments";
@Override
protected List<SegmentConversionResult> convert(PinotTaskConfig pinotTaskConfig, List<File> originalIndexDirs,
File workingDir)
throws Exception {
+ String taskType = pinotTaskConfig.getTaskType();
Map<String, String> configs = pinotTaskConfig.getConfigs();
+ LOGGER.info("Starting task: {} with configs: {}", taskType, configs);
+ long startMillis = System.currentTimeMillis();
+
String mergeTypeString = configs.get(MinionConstants.MergeRollupTask.MERGE_TYPE_KEY);
// TODO: add the support for rollup
Preconditions.checkNotNull(mergeTypeString, "MergeType cannot be null");
- MergeType mergeType = MergeType.fromString(mergeTypeString);
- Preconditions.checkState(mergeType == MergeType.CONCATENATE, "Only 'CONCATENATE' mode is currently supported.");
-
- String mergedSegmentName = configs.get(MinionConstants.MergeRollupTask.MERGED_SEGMENT_NAME_KEY);
String tableNameWithType = configs.get(MinionConstants.TABLE_NAME_KEY);
-
TableConfig tableConfig = getTableConfig(tableNameWithType);
+ Schema schema = getSchema(tableNameWithType);
+ Set<String> schemaColumns = schema.getPhysicalColumnNames();
+
+ Map<String, ValueAggregatorFactory.ValueAggregatorType> aggregatorConfigs =
+ MergeRollupTaskUtils.getRollupAggregationTypeMap(configs);
+ String numRecordsPerSegment = configs.get(MinionConstants.MergeRollupTask.MAX_NUM_RECORDS_PER_SEGMENT);
+
+ SegmentProcessorConfig.Builder segmentProcessorConfigBuilder =
+ new SegmentProcessorConfig.Builder().setTableConfig(tableConfig).setSchema(schema);
+
+ // Partition config from tableConfig
+ if (tableConfig.getIndexingConfig().getSegmentPartitionConfig() != null) {
+ Map<String, ColumnPartitionConfig> columnPartitionMap =
+ tableConfig.getIndexingConfig().getSegmentPartitionConfig().getColumnPartitionMap();
+ PartitionerConfig partitionerConfig = getPartitionerConfig(columnPartitionMap, tableNameWithType, schemaColumns);
+ segmentProcessorConfigBuilder.setPartitionerConfigs(Lists.newArrayList(partitionerConfig));
+ }
- MergeRollupSegmentConverter rollupSegmentConverter =
- new MergeRollupSegmentConverter.Builder().setMergeType(mergeType).setTableName(tableNameWithType)
- .setSegmentName(mergedSegmentName).setInputIndexDirs(originalIndexDirs).setWorkingDir(workingDir)
- .setTableConfig(tableConfig).build();
+ // Aggregations using configured Collector
+ List<String> sortedColumns = tableConfig.getIndexingConfig().getSortedColumn();
+ CollectorConfig collectorConfig =
+ getCollectorConfig(mergeTypeString, aggregatorConfigs, schemaColumns, sortedColumns);
+ Preconditions.checkState(collectorConfig.getCollectorType() == CollectorFactory.CollectorType.CONCAT,
+ "Only 'CONCAT' mode is currently supported.");
+ segmentProcessorConfigBuilder.setCollectorConfig(collectorConfig);
- List<File> resultFiles = rollupSegmentConverter.convert();
+ // Segment config
+ if (numRecordsPerSegment != null) {
+ SegmentConfig segmentConfig = getSegmentConfig(numRecordsPerSegment);
+ segmentProcessorConfigBuilder.setSegmentConfig(segmentConfig);
Review comment:
What happens if we don't set `SegmentConfig` ?
##########
File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/merge_rollup/MergeRollupTaskExecutor.java
##########
@@ -40,40 +53,133 @@
* TODO:
* 1. Add the support for roll-up
* 2. Add the support for time split to provide backfill support for merged segments
- * 3. Change the way to decide the number of output segments (explicit numPartition config -> maxNumRowsPerSegment)
+ * 3. Add merge/rollup name prefixes for generated segments
+ * 4. Add the support for realtime table
*/
public class MergeRollupTaskExecutor extends BaseMultipleSegmentsConversionExecutor {
private static final Logger LOGGER = LoggerFactory.getLogger(MergeRollupTaskExecutor.class);
+ private static final String INPUT_SEGMENTS_DIR = "input_segments";
+ private static final String OUTPUT_SEGMENTS_DIR = "output_segments";
@Override
protected List<SegmentConversionResult> convert(PinotTaskConfig pinotTaskConfig, List<File> originalIndexDirs,
Review comment:
Instead of directly using `SegmentProcessingFramework`, can we wrap this to a separate class something similar to `MergeRollupSegmentConverter`?
In that case, we can reuse `MergeRollupSegmentConverter` in the `SegmentMergeCommand`.
##########
File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/purge/PurgeTaskExecutor.java
##########
@@ -68,7 +68,7 @@ protected SegmentConversionResult convert(PinotTaskConfig pinotTaskConfig, File
@Override
protected SegmentZKMetadataCustomMapModifier getSegmentZKMetadataCustomMapModifier() {
- return new SegmentZKMetadataCustomMapModifier(SegmentZKMetadataCustomMapModifier.ModifyMode.REPLACE, Collections
+ return new SegmentZKMetadataCustomMapModifier(SegmentZKMetadataCustomMapModifier.ModifyMode.UPDATE, Collections
Review comment:
Is there an existing test for segment upload API? Can you check if there's a test for `SegmentZKMetadataCustomMapModifier`? If so, we can add the test in regarding to this change (`REPLACE -> UPDATE`)
##########
File path: pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
##########
@@ -296,6 +309,21 @@ public static URI getUploadSegmentURI(URI controllerURI)
return getURI(controllerURI.getScheme(), controllerURI.getHost(), controllerURI.getPort(), SEGMENT_PATH);
}
+ public static URI getStartReplaceSegmentsURI(URI controllerURI, String rawTableName, String tableType)
+ throws URISyntaxException {
+ return getURI(controllerURI.getScheme(), controllerURI.getHost(), controllerURI.getPort(),
Review comment:
I remember that using `getUri()` somehow didn't work so I ended up using `new URI`. Can you double check on this https://github.com/apache/incubator-pinot/pull/6094?
```
http://localhost:18998/segments/mytable/startReplaceSegments%3Ftype=OFFLINE <- getURI(),
http://localhost:18998/segments/mytable/startReplaceSegments?type=OFFLINE <- new URI(..)
```
##########
File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/merge_rollup/MergeRollupTaskExecutor.java
##########
@@ -40,40 +53,133 @@
* TODO:
* 1. Add the support for roll-up
* 2. Add the support for time split to provide backfill support for merged segments
- * 3. Change the way to decide the number of output segments (explicit numPartition config -> maxNumRowsPerSegment)
+ * 3. Add merge/rollup name prefixes for generated segments
+ * 4. Add the support for realtime table
*/
public class MergeRollupTaskExecutor extends BaseMultipleSegmentsConversionExecutor {
private static final Logger LOGGER = LoggerFactory.getLogger(MergeRollupTaskExecutor.class);
+ private static final String INPUT_SEGMENTS_DIR = "input_segments";
+ private static final String OUTPUT_SEGMENTS_DIR = "output_segments";
@Override
protected List<SegmentConversionResult> convert(PinotTaskConfig pinotTaskConfig, List<File> originalIndexDirs,
File workingDir)
throws Exception {
+ String taskType = pinotTaskConfig.getTaskType();
Map<String, String> configs = pinotTaskConfig.getConfigs();
+ LOGGER.info("Starting task: {} with configs: {}", taskType, configs);
+ long startMillis = System.currentTimeMillis();
+
String mergeTypeString = configs.get(MinionConstants.MergeRollupTask.MERGE_TYPE_KEY);
// TODO: add the support for rollup
Preconditions.checkNotNull(mergeTypeString, "MergeType cannot be null");
- MergeType mergeType = MergeType.fromString(mergeTypeString);
- Preconditions.checkState(mergeType == MergeType.CONCATENATE, "Only 'CONCATENATE' mode is currently supported.");
-
- String mergedSegmentName = configs.get(MinionConstants.MergeRollupTask.MERGED_SEGMENT_NAME_KEY);
String tableNameWithType = configs.get(MinionConstants.TABLE_NAME_KEY);
-
TableConfig tableConfig = getTableConfig(tableNameWithType);
+ Schema schema = getSchema(tableNameWithType);
+ Set<String> schemaColumns = schema.getPhysicalColumnNames();
+
+ Map<String, ValueAggregatorFactory.ValueAggregatorType> aggregatorConfigs =
+ MergeRollupTaskUtils.getRollupAggregationTypeMap(configs);
+ String numRecordsPerSegment = configs.get(MinionConstants.MergeRollupTask.MAX_NUM_RECORDS_PER_SEGMENT);
+
+ SegmentProcessorConfig.Builder segmentProcessorConfigBuilder =
+ new SegmentProcessorConfig.Builder().setTableConfig(tableConfig).setSchema(schema);
+
+ // Partition config from tableConfig
+ if (tableConfig.getIndexingConfig().getSegmentPartitionConfig() != null) {
+ Map<String, ColumnPartitionConfig> columnPartitionMap =
+ tableConfig.getIndexingConfig().getSegmentPartitionConfig().getColumnPartitionMap();
+ PartitionerConfig partitionerConfig = getPartitionerConfig(columnPartitionMap, tableNameWithType, schemaColumns);
Review comment:
Why do we read the partition config from the table config and set it to the partition for the segment processing framework?
We should handle this on the task scheduler. The scheduler should only pick the segments from the same partition when the custom partitioning is enabled.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org
[GitHub] [incubator-pinot] codecov-commenter edited a comment on pull request #6975: Merge rollup executor enhancement
Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #6975:
URL: https://github.com/apache/incubator-pinot/pull/6975#issuecomment-848278058
# [Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6975?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
> Merging [#6975](https://codecov.io/gh/apache/incubator-pinot/pull/6975?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (4b54e4f) into [master](https://codecov.io/gh/apache/incubator-pinot/commit/211cf8a8b9476a9756d2b40f1d0660cfcb1d4235?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (211cf8a) will **decrease** coverage by `7.83%`.
> The diff coverage is `29.16%`.
> :exclamation: Current head 4b54e4f differs from pull request most recent head 4806a5f. Consider uploading reports for the commit 4806a5f to get more accurate results
[![Impacted file tree graph](https://codecov.io/gh/apache/incubator-pinot/pull/6975/graphs/tree.svg?width=650&height=150&src=pr&token=4ibza2ugkz&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/incubator-pinot/pull/6975?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
```diff
@@ Coverage Diff @@
## master #6975 +/- ##
============================================
- Coverage 73.30% 65.47% -7.84%
Complexity 12 12
============================================
Files 1447 1447
Lines 71627 71734 +107
Branches 10391 10400 +9
============================================
- Hits 52506 46965 -5541
- Misses 15585 21366 +5781
+ Partials 3536 3403 -133
```
| Flag | Coverage Δ | |
|---|---|---|
| integration | `?` | |
| unittests | `65.47% <29.16%> (-0.07%)` | :arrow_down: |
Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
| [Impacted Files](https://codecov.io/gh/apache/incubator-pinot/pull/6975?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
|---|---|---|
| [.../org/apache/pinot/core/common/MinionConstants.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9jb21tb24vTWluaW9uQ29uc3RhbnRzLmphdmE=) | `0.00% <ø> (ø)` | |
| [.../tasks/BaseMultipleSegmentsConversionExecutor.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1taW5pb24tdGFza3MvcGlub3QtbWluaW9uLWJ1aWx0aW4tdGFza3Mvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3Bpbm90L3BsdWdpbi9taW5pb24vdGFza3MvQmFzZU11bHRpcGxlU2VnbWVudHNDb252ZXJzaW9uRXhlY3V0b3IuamF2YQ==) | `2.59% <0.00%> (-87.24%)` | :arrow_down: |
| [...ot/plugin/minion/tasks/SegmentConversionUtils.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1taW5pb24tdGFza3MvcGlub3QtbWluaW9uLWJ1aWx0aW4tdGFza3Mvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3Bpbm90L3BsdWdpbi9taW5pb24vdGFza3MvU2VnbWVudENvbnZlcnNpb25VdGlscy5qYXZh) | `0.00% <0.00%> (-52.95%)` | :arrow_down: |
| [...t/plugin/minion/tasks/purge/PurgeTaskExecutor.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1taW5pb24tdGFza3MvcGlub3QtbWluaW9uLWJ1aWx0aW4tdGFza3Mvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3Bpbm90L3BsdWdpbi9taW5pb24vdGFza3MvcHVyZ2UvUHVyZ2VUYXNrRXhlY3V0b3IuamF2YQ==) | `70.83% <0.00%> (ø)` | |
| [...egments/RealtimeToOfflineSegmentsTaskExecutor.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1taW5pb24tdGFza3MvcGlub3QtbWluaW9uLWJ1aWx0aW4tdGFza3Mvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3Bpbm90L3BsdWdpbi9taW5pb24vdGFza3MvcmVhbHRpbWVfdG9fb2ZmbGluZV9zZWdtZW50cy9SZWFsdGltZVRvT2ZmbGluZVNlZ21lbnRzVGFza0V4ZWN1dG9yLmphdmE=) | `78.26% <0.00%> (-11.45%)` | :arrow_down: |
| [...e/pinot/common/utils/FileUploadDownloadClient.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vdXRpbHMvRmlsZVVwbG9hZERvd25sb2FkQ2xpZW50LmphdmE=) | `19.46% <5.26%> (-38.91%)` | :arrow_down: |
| [...on/tasks/merge\_rollup/MergeRollupTaskExecutor.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1taW5pb24tdGFza3MvcGlub3QtbWluaW9uLWJ1aWx0aW4tdGFza3Mvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3Bpbm90L3BsdWdpbi9taW5pb24vdGFza3MvbWVyZ2Vfcm9sbHVwL01lcmdlUm9sbHVwVGFza0V4ZWN1dG9yLmphdmE=) | `65.71% <58.62%> (-29.75%)` | :arrow_down: |
| [...a/org/apache/pinot/minion/metrics/MinionMeter.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtbWluaW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9taW5pb24vbWV0cmljcy9NaW5pb25NZXRlci5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
| [.../apache/pinot/common/metrics/BrokerQueryPhase.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vbWV0cmljcy9Ccm9rZXJRdWVyeVBoYXNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
| [.../apache/pinot/minion/metrics/MinionQueryPhase.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtbWluaW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9taW5pb24vbWV0cmljcy9NaW5pb25RdWVyeVBoYXNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
| ... and [344 more](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
------
[Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6975?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
> **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
> `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
> Powered by [Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6975?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [211cf8a...4806a5f](https://codecov.io/gh/apache/incubator-pinot/pull/6975?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org
[GitHub] [incubator-pinot] snleee commented on a change in pull request #6975: Merge rollup executor enhancement
Posted by GitBox <gi...@apache.org>.
snleee commented on a change in pull request #6975:
URL: https://github.com/apache/incubator-pinot/pull/6975#discussion_r644338809
##########
File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
##########
@@ -138,11 +149,32 @@ protected void postProcess(PinotTaskConfig pinotTaskConfig) {
taskType + " on table: " + tableNameWithType + ", segments: " + inputSegmentNames + " got cancelled");
}
+ // Update the segment lineage to indicate that the segment replacement is in progress.
+ String lineageEntryId = null;
+ if (replaceSegmentsEnabled) {
+ List<String> segmentsFrom =
+ Arrays.stream(inputSegmentNames.split(",")).map(String::trim).collect(Collectors.toList());
+ List<String> segmentsTo =
+ segmentConversionResults.stream().map(SegmentConversionResult::getSegmentName).collect(Collectors.toList());
+ lineageEntryId = SegmentConversionUtils
+ .startSegmentReplace(tableNameWithType, uploadURL, new StartReplaceSegmentsRequest(segmentsFrom, segmentsTo));
+ }
+
// Upload the tarred segments
for (int i = 0; i < numOutputSegments; i++) {
File convertedTarredSegmentFile = tarredSegmentFiles.get(i);
String resultSegmentName = segmentConversionResults.get(i).getSegmentName();
+ // Set segment ZK metadata custom map modifier into HTTP header to modify the segment ZK metadata
+ SegmentZKMetadataCustomMapModifier segmentZKMetadataCustomMapModifier = getSegmentZKMetadataCustomMapModifier(pinotTaskConfig);
Review comment:
I think that we should pass `SegmentConversionResult` instead of `resultSegmentName`.
##########
File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
##########
@@ -138,11 +149,32 @@ protected void postProcess(PinotTaskConfig pinotTaskConfig) {
taskType + " on table: " + tableNameWithType + ", segments: " + inputSegmentNames + " got cancelled");
}
+ // Update the segment lineage to indicate that the segment replacement is in progress.
+ String lineageEntryId = null;
+ if (replaceSegmentsEnabled) {
+ List<String> segmentsFrom =
+ Arrays.stream(inputSegmentNames.split(",")).map(String::trim).collect(Collectors.toList());
+ List<String> segmentsTo =
+ segmentConversionResults.stream().map(SegmentConversionResult::getSegmentName).collect(Collectors.toList());
+ lineageEntryId = SegmentConversionUtils
+ .startSegmentReplace(tableNameWithType, uploadURL, new StartReplaceSegmentsRequest(segmentsFrom, segmentsTo));
+ }
+
// Upload the tarred segments
for (int i = 0; i < numOutputSegments; i++) {
File convertedTarredSegmentFile = tarredSegmentFiles.get(i);
String resultSegmentName = segmentConversionResults.get(i).getSegmentName();
+ // Set segment ZK metadata custom map modifier into HTTP header to modify the segment ZK metadata
+ SegmentZKMetadataCustomMapModifier segmentZKMetadataCustomMapModifier = getSegmentZKMetadataCustomMapModifier(pinotTaskConfig);
Review comment:
I think that we should pass `SegmentConversionResult` instead of `resultSegmentName`. There's a `customProperties` and we can probably use that field.
##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/MergeRollupSegmentProcessor.java
##########
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.framework;
+
+import com.google.common.base.Preconditions;
+import java.io.File;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.core.segment.processing.collector.CollectorConfig;
+import org.apache.pinot.core.segment.processing.collector.CollectorFactory;
+import org.apache.pinot.core.segment.processing.collector.ValueAggregatorFactory;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+
+
+/**
+ * Merge rollup segment processor framework takes a list of segments and concatenates/rolls up segments based on
+ * the configuration.
+ *
+ * TODO:
+ * 1. Add the support for roll-up
+ * 2. Add the support to make result segments time aligned
+ * 3. Add merge/roll-up prefixes for result segments
+ */
+public class MergeRollupSegmentProcessor {
Review comment:
Let's actually put this under `org.apche.pinot.core.minion` and Rename it to `MergeRollupConverter`.
We can delete `org.apche.pinot.core.minion.rollup` and `org.apche.pinot.core.minion.segment` in the following PR as a clean-up.
##########
File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
##########
@@ -64,6 +70,12 @@
File workingDir)
throws Exception;
+ /**
+ * Returns the segment ZK metadata custom map modifier.
+ */
+ protected abstract SegmentZKMetadataCustomMapModifier getSegmentZKMetadataCustomMapModifier(
Review comment:
Can we move this to `BaseTaskExecutor`?
We have the same abstract function defined in `BaseSingleSegmentConversionExecutor`. After we move this to `BaseTaskExecutor`, we can remove this from both `BaseSingleSegmentConversionExecutor` and `BaseMultipleSegmentsConversionExecutor`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org
[GitHub] [incubator-pinot] jtao15 commented on pull request #6975: Merge rollup executor enhancement
Posted by GitBox <gi...@apache.org>.
jtao15 commented on pull request #6975:
URL: https://github.com/apache/incubator-pinot/pull/6975#issuecomment-851707013
Update the pr based on previous comments. @snleee
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org
[GitHub] [incubator-pinot] snleee commented on pull request #6975: Merge rollup executor enhancement
Posted by GitBox <gi...@apache.org>.
snleee commented on pull request #6975:
URL: https://github.com/apache/incubator-pinot/pull/6975#issuecomment-854092399
LGTM. Thanks for addressing all the comments.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org
[GitHub] [incubator-pinot] jtao15 commented on a change in pull request #6975: Merge rollup executor enhancement
Posted by GitBox <gi...@apache.org>.
jtao15 commented on a change in pull request #6975:
URL: https://github.com/apache/incubator-pinot/pull/6975#discussion_r644370006
##########
File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
##########
@@ -138,11 +149,32 @@ protected void postProcess(PinotTaskConfig pinotTaskConfig) {
taskType + " on table: " + tableNameWithType + ", segments: " + inputSegmentNames + " got cancelled");
}
+ // Update the segment lineage to indicate that the segment replacement is in progress.
+ String lineageEntryId = null;
+ if (replaceSegmentsEnabled) {
+ List<String> segmentsFrom =
+ Arrays.stream(inputSegmentNames.split(",")).map(String::trim).collect(Collectors.toList());
+ List<String> segmentsTo =
+ segmentConversionResults.stream().map(SegmentConversionResult::getSegmentName).collect(Collectors.toList());
+ lineageEntryId = SegmentConversionUtils
+ .startSegmentReplace(tableNameWithType, uploadURL, new StartReplaceSegmentsRequest(segmentsFrom, segmentsTo));
+ }
+
// Upload the tarred segments
for (int i = 0; i < numOutputSegments; i++) {
File convertedTarredSegmentFile = tarredSegmentFiles.get(i);
String resultSegmentName = segmentConversionResults.get(i).getSegmentName();
+ // Set segment ZK metadata custom map modifier into HTTP header to modify the segment ZK metadata
+ SegmentZKMetadataCustomMapModifier segmentZKMetadataCustomMapModifier = getSegmentZKMetadataCustomMapModifier(pinotTaskConfig);
Review comment:
Updated.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org
[GitHub] [incubator-pinot] jtao15 commented on a change in pull request #6975: Merge rollup executor enhancement
Posted by GitBox <gi...@apache.org>.
jtao15 commented on a change in pull request #6975:
URL: https://github.com/apache/incubator-pinot/pull/6975#discussion_r641685492
##########
File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
##########
@@ -138,11 +149,32 @@ protected void postProcess(PinotTaskConfig pinotTaskConfig) {
taskType + " on table: " + tableNameWithType + ", segments: " + inputSegmentNames + " got cancelled");
}
+ // Update the segment lineage to indicate that the segment replacement is in progress.
+ String lineageEntryId = null;
+ if (replaceSegmentsEnabled) {
+ List<String> segmentsFrom =
+ Arrays.stream(inputSegmentNames.split(",")).map(String::trim).collect(Collectors.toList());
+ List<String> segmentsTo =
+ segmentConversionResults.stream().map(SegmentConversionResult::getSegmentName).collect(Collectors.toList());
+ lineageEntryId = SegmentConversionUtils
+ .startSegmentReplace(tableNameWithType, uploadURL, new StartReplaceSegmentsRequest(segmentsFrom, segmentsTo));
+ }
+
// Upload the tarred segments
for (int i = 0; i < numOutputSegments; i++) {
File convertedTarredSegmentFile = tarredSegmentFiles.get(i);
String resultSegmentName = segmentConversionResults.get(i).getSegmentName();
+ // Set segment ZK metadata custom map modifier into HTTP header to modify the segment ZK metadata
+ SegmentZKMetadataCustomMapModifier segmentZKMetadataCustomMapModifier = getSegmentZKMetadataCustomMapModifier(pinotTaskConfig);
+ Header segmentZKMetadataCustomMapModifierHeader =
+ new BasicHeader(FileUploadDownloadClient.CustomHeaders.SEGMENT_ZK_METADATA_CUSTOM_MAP_MODIFIER,
+ segmentZKMetadataCustomMapModifier.toJsonString());
+
+ List<Header> httpHeaders = new ArrayList<>();
+ httpHeaders.add(segmentZKMetadataCustomMapModifierHeader);
+ httpHeaders.addAll(FileUploadDownloadClient.makeAuthHeader(authToken));
Review comment:
I'm adding the SegmentZKMetadataCustomMapModifier to the header, so upload segment api will update the custom map accordingly. For merged segments, we want to mark their bucket granularity in custom map, so the scheduler can differentiate which segments are merged already.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org
[GitHub] [incubator-pinot] codecov-commenter edited a comment on pull request #6975: Merge rollup executor enhancement
Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #6975:
URL: https://github.com/apache/incubator-pinot/pull/6975#issuecomment-848278058
# [Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6975?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
> Merging [#6975](https://codecov.io/gh/apache/incubator-pinot/pull/6975?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (4b54e4f) into [master](https://codecov.io/gh/apache/incubator-pinot/commit/211cf8a8b9476a9756d2b40f1d0660cfcb1d4235?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (211cf8a) will **decrease** coverage by `0.02%`.
> The diff coverage is `38.33%`.
> :exclamation: Current head 4b54e4f differs from pull request most recent head 4806a5f. Consider uploading reports for the commit 4806a5f to get more accurate results
[![Impacted file tree graph](https://codecov.io/gh/apache/incubator-pinot/pull/6975/graphs/tree.svg?width=650&height=150&src=pr&token=4ibza2ugkz&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/incubator-pinot/pull/6975?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
```diff
@@ Coverage Diff @@
## master #6975 +/- ##
============================================
- Coverage 73.30% 73.28% -0.03%
Complexity 12 12
============================================
Files 1447 1447
Lines 71627 71734 +107
Branches 10391 10400 +9
============================================
+ Hits 52506 52567 +61
- Misses 15585 15625 +40
- Partials 3536 3542 +6
```
| Flag | Coverage Δ | |
|---|---|---|
| integration | `41.86% <9.16%> (+0.09%)` | :arrow_up: |
| unittests | `65.47% <29.16%> (-0.07%)` | :arrow_down: |
Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
| [Impacted Files](https://codecov.io/gh/apache/incubator-pinot/pull/6975?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
|---|---|---|
| [.../org/apache/pinot/core/common/MinionConstants.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9jb21tb24vTWluaW9uQ29uc3RhbnRzLmphdmE=) | `0.00% <ø> (ø)` | |
| [...ot/plugin/minion/tasks/SegmentConversionUtils.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1taW5pb24tdGFza3MvcGlub3QtbWluaW9uLWJ1aWx0aW4tdGFza3Mvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3Bpbm90L3BsdWdpbi9taW5pb24vdGFza3MvU2VnbWVudENvbnZlcnNpb25VdGlscy5qYXZh) | `32.14% <0.00%> (-20.80%)` | :arrow_down: |
| [...t/plugin/minion/tasks/purge/PurgeTaskExecutor.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1taW5pb24tdGFza3MvcGlub3QtbWluaW9uLWJ1aWx0aW4tdGFza3Mvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3Bpbm90L3BsdWdpbi9taW5pb24vdGFza3MvcHVyZ2UvUHVyZ2VUYXNrRXhlY3V0b3IuamF2YQ==) | `70.83% <0.00%> (ø)` | |
| [...e/pinot/common/utils/FileUploadDownloadClient.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vdXRpbHMvRmlsZVVwbG9hZERvd25sb2FkQ2xpZW50LmphdmE=) | `53.98% <5.26%> (-4.40%)` | :arrow_down: |
| [.../tasks/BaseMultipleSegmentsConversionExecutor.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1taW5pb24tdGFza3MvcGlub3QtbWluaW9uLWJ1aWx0aW4tdGFza3Mvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3Bpbm90L3BsdWdpbi9taW5pb24vdGFza3MvQmFzZU11bHRpcGxlU2VnbWVudHNDb252ZXJzaW9uRXhlY3V0b3IuamF2YQ==) | `80.51% <50.00%> (-9.32%)` | :arrow_down: |
| [...on/tasks/merge\_rollup/MergeRollupTaskExecutor.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1taW5pb24tdGFza3MvcGlub3QtbWluaW9uLWJ1aWx0aW4tdGFza3Mvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3Bpbm90L3BsdWdpbi9taW5pb24vdGFza3MvbWVyZ2Vfcm9sbHVwL01lcmdlUm9sbHVwVGFza0V4ZWN1dG9yLmphdmE=) | `65.71% <58.62%> (-29.75%)` | :arrow_down: |
| [...egments/RealtimeToOfflineSegmentsTaskExecutor.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1taW5pb24tdGFza3MvcGlub3QtbWluaW9uLWJ1aWx0aW4tdGFza3Mvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3Bpbm90L3BsdWdpbi9taW5pb24vdGFza3MvcmVhbHRpbWVfdG9fb2ZmbGluZV9zZWdtZW50cy9SZWFsdGltZVRvT2ZmbGluZVNlZ21lbnRzVGFza0V4ZWN1dG9yLmphdmE=) | `89.85% <100.00%> (+0.14%)` | :arrow_up: |
| [...org/apache/pinot/core/minion/rollup/MergeType.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9taW5pb24vcm9sbHVwL01lcmdlVHlwZS5qYXZh) | `60.00% <0.00%> (-20.00%)` | :arrow_down: |
| [...ller/helix/core/minion/TaskTypeMetricsUpdater.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29udHJvbGxlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29udHJvbGxlci9oZWxpeC9jb3JlL21pbmlvbi9UYXNrVHlwZU1ldHJpY3NVcGRhdGVyLmphdmE=) | `80.00% <0.00%> (-20.00%)` | :arrow_down: |
| [...operator/filter/RangeIndexBasedFilterOperator.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9vcGVyYXRvci9maWx0ZXIvUmFuZ2VJbmRleEJhc2VkRmlsdGVyT3BlcmF0b3IuamF2YQ==) | `40.42% <0.00%> (-4.26%)` | :arrow_down: |
| ... and [23 more](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
------
[Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6975?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
> **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
> `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
> Powered by [Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6975?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [211cf8a...4806a5f](https://codecov.io/gh/apache/incubator-pinot/pull/6975?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org
[GitHub] [incubator-pinot] jtao15 commented on a change in pull request #6975: Merge rollup executor enhancement
Posted by GitBox <gi...@apache.org>.
jtao15 commented on a change in pull request #6975:
URL: https://github.com/apache/incubator-pinot/pull/6975#discussion_r641846397
##########
File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/merge_rollup/MergeRollupTaskExecutor.java
##########
@@ -40,40 +53,133 @@
* TODO:
* 1. Add the support for roll-up
* 2. Add the support for time split to provide backfill support for merged segments
- * 3. Change the way to decide the number of output segments (explicit numPartition config -> maxNumRowsPerSegment)
+ * 3. Add merge/rollup name prefixes for generated segments
+ * 4. Add the support for realtime table
*/
public class MergeRollupTaskExecutor extends BaseMultipleSegmentsConversionExecutor {
private static final Logger LOGGER = LoggerFactory.getLogger(MergeRollupTaskExecutor.class);
+ private static final String INPUT_SEGMENTS_DIR = "input_segments";
+ private static final String OUTPUT_SEGMENTS_DIR = "output_segments";
@Override
protected List<SegmentConversionResult> convert(PinotTaskConfig pinotTaskConfig, List<File> originalIndexDirs,
File workingDir)
throws Exception {
+ String taskType = pinotTaskConfig.getTaskType();
Map<String, String> configs = pinotTaskConfig.getConfigs();
+ LOGGER.info("Starting task: {} with configs: {}", taskType, configs);
+ long startMillis = System.currentTimeMillis();
+
String mergeTypeString = configs.get(MinionConstants.MergeRollupTask.MERGE_TYPE_KEY);
// TODO: add the support for rollup
Preconditions.checkNotNull(mergeTypeString, "MergeType cannot be null");
- MergeType mergeType = MergeType.fromString(mergeTypeString);
- Preconditions.checkState(mergeType == MergeType.CONCATENATE, "Only 'CONCATENATE' mode is currently supported.");
-
- String mergedSegmentName = configs.get(MinionConstants.MergeRollupTask.MERGED_SEGMENT_NAME_KEY);
String tableNameWithType = configs.get(MinionConstants.TABLE_NAME_KEY);
-
TableConfig tableConfig = getTableConfig(tableNameWithType);
+ Schema schema = getSchema(tableNameWithType);
+ Set<String> schemaColumns = schema.getPhysicalColumnNames();
+
+ Map<String, ValueAggregatorFactory.ValueAggregatorType> aggregatorConfigs =
+ MergeRollupTaskUtils.getRollupAggregationTypeMap(configs);
+ String numRecordsPerSegment = configs.get(MinionConstants.MergeRollupTask.MAX_NUM_RECORDS_PER_SEGMENT);
+
+ SegmentProcessorConfig.Builder segmentProcessorConfigBuilder =
+ new SegmentProcessorConfig.Builder().setTableConfig(tableConfig).setSchema(schema);
+
+ // Partition config from tableConfig
+ if (tableConfig.getIndexingConfig().getSegmentPartitionConfig() != null) {
+ Map<String, ColumnPartitionConfig> columnPartitionMap =
+ tableConfig.getIndexingConfig().getSegmentPartitionConfig().getColumnPartitionMap();
+ PartitionerConfig partitionerConfig = getPartitionerConfig(columnPartitionMap, tableNameWithType, schemaColumns);
Review comment:
You are right, will remove this.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org
[GitHub] [incubator-pinot] snleee merged pull request #6975: Merge rollup executor enhancement
Posted by GitBox <gi...@apache.org>.
snleee merged pull request #6975:
URL: https://github.com/apache/incubator-pinot/pull/6975
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org
[GitHub] [incubator-pinot] codecov-commenter edited a comment on pull request #6975: Merge rollup executor enhancement
Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #6975:
URL: https://github.com/apache/incubator-pinot/pull/6975#issuecomment-848278058
# [Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6975?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
> Merging [#6975](https://codecov.io/gh/apache/incubator-pinot/pull/6975?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (ea4d245) into [master](https://codecov.io/gh/apache/incubator-pinot/commit/4944731579de1b17be23404618e1bb4dd8e5e511?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (4944731) will **decrease** coverage by `0.08%`.
> The diff coverage is `59.21%`.
[![Impacted file tree graph](https://codecov.io/gh/apache/incubator-pinot/pull/6975/graphs/tree.svg?width=650&height=150&src=pr&token=4ibza2ugkz&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/incubator-pinot/pull/6975?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
```diff
@@ Coverage Diff @@
## master #6975 +/- ##
============================================
- Coverage 73.20% 73.11% -0.09%
Complexity 12 12
============================================
Files 1451 1452 +1
Lines 71832 71969 +137
Branches 10415 10423 +8
============================================
+ Hits 52585 52621 +36
- Misses 15715 15817 +102
+ Partials 3532 3531 -1
```
| Flag | Coverage Δ | |
|---|---|---|
| integration | `41.73% <10.52%> (-0.17%)` | :arrow_down: |
| unittests | `65.37% <48.68%> (+<0.01%)` | :arrow_up: |
Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
| [Impacted Files](https://codecov.io/gh/apache/incubator-pinot/pull/6975?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
|---|---|---|
| [.../org/apache/pinot/core/common/MinionConstants.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9jb21tb24vTWluaW9uQ29uc3RhbnRzLmphdmE=) | `0.00% <ø> (ø)` | |
| [...he/pinot/plugin/minion/tasks/BaseTaskExecutor.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1taW5pb24tdGFza3MvcGlub3QtbWluaW9uLWJ1aWx0aW4tdGFza3Mvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3Bpbm90L3BsdWdpbi9taW5pb24vdGFza3MvQmFzZVRhc2tFeGVjdXRvci5qYXZh) | `83.33% <ø> (ø)` | |
| [...ot/plugin/minion/tasks/SegmentConversionUtils.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1taW5pb24tdGFza3MvcGlub3QtbWluaW9uLWJ1aWx0aW4tdGFza3Mvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3Bpbm90L3BsdWdpbi9taW5pb24vdGFza3MvU2VnbWVudENvbnZlcnNpb25VdGlscy5qYXZh) | `32.14% <0.00%> (-20.80%)` | :arrow_down: |
| [...rt\_to\_raw\_index/ConvertToRawIndexTaskExecutor.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1taW5pb24tdGFza3MvcGlub3QtbWluaW9uLWJ1aWx0aW4tdGFza3Mvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3Bpbm90L3BsdWdpbi9taW5pb24vdGFza3MvY29udmVydF90b19yYXdfaW5kZXgvQ29udmVydFRvUmF3SW5kZXhUYXNrRXhlY3V0b3IuamF2YQ==) | `100.00% <ø> (ø)` | |
| [...t/plugin/minion/tasks/purge/PurgeTaskExecutor.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1taW5pb24tdGFza3MvcGlub3QtbWluaW9uLWJ1aWx0aW4tdGFza3Mvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3Bpbm90L3BsdWdpbi9taW5pb24vdGFza3MvcHVyZ2UvUHVyZ2VUYXNrRXhlY3V0b3IuamF2YQ==) | `70.83% <0.00%> (ø)` | |
| [...and\_push/SegmentGenerationAndPushTaskExecutor.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1taW5pb24tdGFza3MvcGlub3QtbWluaW9uLWJ1aWx0aW4tdGFza3Mvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3Bpbm90L3BsdWdpbi9taW5pb24vdGFza3Mvc2VnbWVudF9nZW5lcmF0aW9uX2FuZF9wdXNoL1NlZ21lbnRHZW5lcmF0aW9uQW5kUHVzaFRhc2tFeGVjdXRvci5qYXZh) | `0.00% <0.00%> (ø)` | |
| [...e/pinot/common/utils/FileUploadDownloadClient.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vdXRpbHMvRmlsZVVwbG9hZERvd25sb2FkQ2xpZW50LmphdmE=) | `53.98% <5.26%> (-4.40%)` | :arrow_down: |
| [.../tasks/BaseMultipleSegmentsConversionExecutor.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1taW5pb24tdGFza3MvcGlub3QtbWluaW9uLWJ1aWx0aW4tdGFza3Mvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3Bpbm90L3BsdWdpbi9taW5pb24vdGFza3MvQmFzZU11bHRpcGxlU2VnbWVudHNDb252ZXJzaW9uRXhlY3V0b3IuamF2YQ==) | `81.01% <57.14%> (-8.82%)` | :arrow_down: |
| [...on/tasks/merge\_rollup/MergeRollupTaskExecutor.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1taW5pb24tdGFza3MvcGlub3QtbWluaW9uLWJ1aWx0aW4tdGFza3Mvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3Bpbm90L3BsdWdpbi9taW5pb24vdGFza3MvbWVyZ2Vfcm9sbHVwL01lcmdlUm9sbHVwVGFza0V4ZWN1dG9yLmphdmE=) | `88.57% <82.60%> (-6.89%)` | :arrow_down: |
| [...apache/pinot/core/minion/MergeRollupConverter.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9taW5pb24vTWVyZ2VSb2xsdXBDb252ZXJ0ZXIuamF2YQ==) | `88.52% <88.52%> (ø)` | |
| ... and [37 more](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
------
[Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6975?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
> **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
> `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
> Powered by [Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6975?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [4944731...ea4d245](https://codecov.io/gh/apache/incubator-pinot/pull/6975?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org
[GitHub] [incubator-pinot] jtao15 commented on a change in pull request #6975: Merge rollup executor enhancement
Posted by GitBox <gi...@apache.org>.
jtao15 commented on a change in pull request #6975:
URL: https://github.com/apache/incubator-pinot/pull/6975#discussion_r641844363
##########
File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/merge_rollup/MergeRollupTaskExecutor.java
##########
@@ -40,40 +53,133 @@
* TODO:
* 1. Add the support for roll-up
* 2. Add the support for time split to provide backfill support for merged segments
- * 3. Change the way to decide the number of output segments (explicit numPartition config -> maxNumRowsPerSegment)
+ * 3. Add merge/rollup name prefixes for generated segments
+ * 4. Add the support for realtime table
*/
public class MergeRollupTaskExecutor extends BaseMultipleSegmentsConversionExecutor {
private static final Logger LOGGER = LoggerFactory.getLogger(MergeRollupTaskExecutor.class);
+ private static final String INPUT_SEGMENTS_DIR = "input_segments";
+ private static final String OUTPUT_SEGMENTS_DIR = "output_segments";
@Override
protected List<SegmentConversionResult> convert(PinotTaskConfig pinotTaskConfig, List<File> originalIndexDirs,
File workingDir)
throws Exception {
+ String taskType = pinotTaskConfig.getTaskType();
Map<String, String> configs = pinotTaskConfig.getConfigs();
+ LOGGER.info("Starting task: {} with configs: {}", taskType, configs);
+ long startMillis = System.currentTimeMillis();
+
String mergeTypeString = configs.get(MinionConstants.MergeRollupTask.MERGE_TYPE_KEY);
// TODO: add the support for rollup
Preconditions.checkNotNull(mergeTypeString, "MergeType cannot be null");
- MergeType mergeType = MergeType.fromString(mergeTypeString);
- Preconditions.checkState(mergeType == MergeType.CONCATENATE, "Only 'CONCATENATE' mode is currently supported.");
-
- String mergedSegmentName = configs.get(MinionConstants.MergeRollupTask.MERGED_SEGMENT_NAME_KEY);
String tableNameWithType = configs.get(MinionConstants.TABLE_NAME_KEY);
-
TableConfig tableConfig = getTableConfig(tableNameWithType);
+ Schema schema = getSchema(tableNameWithType);
+ Set<String> schemaColumns = schema.getPhysicalColumnNames();
+
+ Map<String, ValueAggregatorFactory.ValueAggregatorType> aggregatorConfigs =
+ MergeRollupTaskUtils.getRollupAggregationTypeMap(configs);
+ String numRecordsPerSegment = configs.get(MinionConstants.MergeRollupTask.MAX_NUM_RECORDS_PER_SEGMENT);
+
+ SegmentProcessorConfig.Builder segmentProcessorConfigBuilder =
+ new SegmentProcessorConfig.Builder().setTableConfig(tableConfig).setSchema(schema);
+
+ // Partition config from tableConfig
+ if (tableConfig.getIndexingConfig().getSegmentPartitionConfig() != null) {
+ Map<String, ColumnPartitionConfig> columnPartitionMap =
+ tableConfig.getIndexingConfig().getSegmentPartitionConfig().getColumnPartitionMap();
+ PartitionerConfig partitionerConfig = getPartitionerConfig(columnPartitionMap, tableNameWithType, schemaColumns);
+ segmentProcessorConfigBuilder.setPartitionerConfigs(Lists.newArrayList(partitionerConfig));
+ }
- MergeRollupSegmentConverter rollupSegmentConverter =
- new MergeRollupSegmentConverter.Builder().setMergeType(mergeType).setTableName(tableNameWithType)
- .setSegmentName(mergedSegmentName).setInputIndexDirs(originalIndexDirs).setWorkingDir(workingDir)
- .setTableConfig(tableConfig).build();
+ // Aggregations using configured Collector
+ List<String> sortedColumns = tableConfig.getIndexingConfig().getSortedColumn();
+ CollectorConfig collectorConfig =
+ getCollectorConfig(mergeTypeString, aggregatorConfigs, schemaColumns, sortedColumns);
+ Preconditions.checkState(collectorConfig.getCollectorType() == CollectorFactory.CollectorType.CONCAT,
+ "Only 'CONCAT' mode is currently supported.");
+ segmentProcessorConfigBuilder.setCollectorConfig(collectorConfig);
- List<File> resultFiles = rollupSegmentConverter.convert();
+ // Segment config
+ if (numRecordsPerSegment != null) {
+ SegmentConfig segmentConfig = getSegmentConfig(numRecordsPerSegment);
+ segmentProcessorConfigBuilder.setSegmentConfig(segmentConfig);
+ }
+
+ SegmentProcessorConfig segmentProcessorConfig = segmentProcessorConfigBuilder.build();
+
+ File inputSegmentsDir = new File(workingDir, INPUT_SEGMENTS_DIR);
+ Preconditions.checkState(inputSegmentsDir.mkdirs(), "Failed to create input directory: %s for task: %s",
+ inputSegmentsDir.getAbsolutePath(), taskType);
+ for (File indexDir : originalIndexDirs) {
+ FileUtils.copyDirectoryToDirectory(indexDir, inputSegmentsDir);
+ }
+ File outputSegmentsDir = new File(workingDir, OUTPUT_SEGMENTS_DIR);
+ Preconditions.checkState(outputSegmentsDir.mkdirs(), "Failed to create output directory: %s for task: %s",
+ outputSegmentsDir.getAbsolutePath(), taskType);
+
+ SegmentProcessorFramework segmentProcessorFramework =
+ new SegmentProcessorFramework(inputSegmentsDir, segmentProcessorConfig, outputSegmentsDir);
+ try {
+ segmentProcessorFramework.processSegments();
+ } finally {
+ segmentProcessorFramework.cleanup();
+ }
+
+ long endMillis = System.currentTimeMillis();
+ LOGGER.info("Finished task: {} with configs: {}. Total time: {}ms", taskType, configs, (endMillis - startMillis));
List<SegmentConversionResult> results = new ArrayList<>();
- for (File file : resultFiles) {
+ for (File file : outputSegmentsDir.listFiles()) {
String outputSegmentName = file.getName();
results.add(new SegmentConversionResult.Builder().setFile(file).setSegmentName(outputSegmentName)
.setTableNameWithType(tableNameWithType).build());
}
return results;
}
+
+ @Override
+ protected SegmentZKMetadataCustomMapModifier getSegmentZKMetadataCustomMapModifier(PinotTaskConfig pinotTaskConfig) {
+ return new SegmentZKMetadataCustomMapModifier(SegmentZKMetadataCustomMapModifier.ModifyMode.UPDATE, Collections
+ .singletonMap(MinionConstants.MergeRollupTask.TASK_TYPE + MinionConstants.TASK_BUCKET_GRANULARITY_SUFFIX,
+ pinotTaskConfig.getConfigs().get(MinionConstants.MergeRollupTask.GRANULARITY_KEY)));
Review comment:
This will be controlled on scheduler side, the scheduler should call `MergeRollupTaskUtils.getAllMergeProperties()` to get the granularity, and use upper case to create the task config. Maybe we can use `pinotTaskConfig.getConfigs().get(MinionConstants.MergeRollupTask.GRANULARITY_KEY).toUpperCase()` to make it less error-prone.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org
[GitHub] [incubator-pinot] jtao15 commented on a change in pull request #6975: Merge rollup executor enhancement
Posted by GitBox <gi...@apache.org>.
jtao15 commented on a change in pull request #6975:
URL: https://github.com/apache/incubator-pinot/pull/6975#discussion_r641846049
##########
File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/merge_rollup/MergeRollupTaskExecutor.java
##########
@@ -40,40 +53,133 @@
* TODO:
* 1. Add the support for roll-up
* 2. Add the support for time split to provide backfill support for merged segments
- * 3. Change the way to decide the number of output segments (explicit numPartition config -> maxNumRowsPerSegment)
+ * 3. Add merge/rollup name prefixes for generated segments
+ * 4. Add the support for realtime table
*/
public class MergeRollupTaskExecutor extends BaseMultipleSegmentsConversionExecutor {
private static final Logger LOGGER = LoggerFactory.getLogger(MergeRollupTaskExecutor.class);
+ private static final String INPUT_SEGMENTS_DIR = "input_segments";
+ private static final String OUTPUT_SEGMENTS_DIR = "output_segments";
@Override
protected List<SegmentConversionResult> convert(PinotTaskConfig pinotTaskConfig, List<File> originalIndexDirs,
File workingDir)
throws Exception {
+ String taskType = pinotTaskConfig.getTaskType();
Map<String, String> configs = pinotTaskConfig.getConfigs();
+ LOGGER.info("Starting task: {} with configs: {}", taskType, configs);
+ long startMillis = System.currentTimeMillis();
+
String mergeTypeString = configs.get(MinionConstants.MergeRollupTask.MERGE_TYPE_KEY);
// TODO: add the support for rollup
Preconditions.checkNotNull(mergeTypeString, "MergeType cannot be null");
- MergeType mergeType = MergeType.fromString(mergeTypeString);
- Preconditions.checkState(mergeType == MergeType.CONCATENATE, "Only 'CONCATENATE' mode is currently supported.");
-
- String mergedSegmentName = configs.get(MinionConstants.MergeRollupTask.MERGED_SEGMENT_NAME_KEY);
String tableNameWithType = configs.get(MinionConstants.TABLE_NAME_KEY);
-
TableConfig tableConfig = getTableConfig(tableNameWithType);
+ Schema schema = getSchema(tableNameWithType);
+ Set<String> schemaColumns = schema.getPhysicalColumnNames();
+
+ Map<String, ValueAggregatorFactory.ValueAggregatorType> aggregatorConfigs =
+ MergeRollupTaskUtils.getRollupAggregationTypeMap(configs);
+ String numRecordsPerSegment = configs.get(MinionConstants.MergeRollupTask.MAX_NUM_RECORDS_PER_SEGMENT);
+
+ SegmentProcessorConfig.Builder segmentProcessorConfigBuilder =
+ new SegmentProcessorConfig.Builder().setTableConfig(tableConfig).setSchema(schema);
+
+ // Partition config from tableConfig
+ if (tableConfig.getIndexingConfig().getSegmentPartitionConfig() != null) {
+ Map<String, ColumnPartitionConfig> columnPartitionMap =
+ tableConfig.getIndexingConfig().getSegmentPartitionConfig().getColumnPartitionMap();
+ PartitionerConfig partitionerConfig = getPartitionerConfig(columnPartitionMap, tableNameWithType, schemaColumns);
+ segmentProcessorConfigBuilder.setPartitionerConfigs(Lists.newArrayList(partitionerConfig));
+ }
- MergeRollupSegmentConverter rollupSegmentConverter =
- new MergeRollupSegmentConverter.Builder().setMergeType(mergeType).setTableName(tableNameWithType)
- .setSegmentName(mergedSegmentName).setInputIndexDirs(originalIndexDirs).setWorkingDir(workingDir)
- .setTableConfig(tableConfig).build();
+ // Aggregations using configured Collector
+ List<String> sortedColumns = tableConfig.getIndexingConfig().getSortedColumn();
+ CollectorConfig collectorConfig =
+ getCollectorConfig(mergeTypeString, aggregatorConfigs, schemaColumns, sortedColumns);
+ Preconditions.checkState(collectorConfig.getCollectorType() == CollectorFactory.CollectorType.CONCAT,
+ "Only 'CONCAT' mode is currently supported.");
+ segmentProcessorConfigBuilder.setCollectorConfig(collectorConfig);
- List<File> resultFiles = rollupSegmentConverter.convert();
+ // Segment config
+ if (numRecordsPerSegment != null) {
+ SegmentConfig segmentConfig = getSegmentConfig(numRecordsPerSegment);
+ segmentProcessorConfigBuilder.setSegmentConfig(segmentConfig);
Review comment:
If we don't set `SegmentConfig` the framework will use the default one which has `DEFAULT_MAX_NUM_RECORDS_PER_SEGMENT = 5_000_000`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org
[GitHub] [incubator-pinot] snleee commented on a change in pull request #6975: Merge rollup executor enhancement
Posted by GitBox <gi...@apache.org>.
snleee commented on a change in pull request #6975:
URL: https://github.com/apache/incubator-pinot/pull/6975#discussion_r644338809
##########
File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
##########
@@ -138,11 +149,32 @@ protected void postProcess(PinotTaskConfig pinotTaskConfig) {
taskType + " on table: " + tableNameWithType + ", segments: " + inputSegmentNames + " got cancelled");
}
+ // Update the segment lineage to indicate that the segment replacement is in progress.
+ String lineageEntryId = null;
+ if (replaceSegmentsEnabled) {
+ List<String> segmentsFrom =
+ Arrays.stream(inputSegmentNames.split(",")).map(String::trim).collect(Collectors.toList());
+ List<String> segmentsTo =
+ segmentConversionResults.stream().map(SegmentConversionResult::getSegmentName).collect(Collectors.toList());
+ lineageEntryId = SegmentConversionUtils
+ .startSegmentReplace(tableNameWithType, uploadURL, new StartReplaceSegmentsRequest(segmentsFrom, segmentsTo));
+ }
+
// Upload the tarred segments
for (int i = 0; i < numOutputSegments; i++) {
File convertedTarredSegmentFile = tarredSegmentFiles.get(i);
String resultSegmentName = segmentConversionResults.get(i).getSegmentName();
+ // Set segment ZK metadata custom map modifier into HTTP header to modify the segment ZK metadata
+ SegmentZKMetadataCustomMapModifier segmentZKMetadataCustomMapModifier = getSegmentZKMetadataCustomMapModifier(pinotTaskConfig);
Review comment:
I think that we should pass `SegmentConversionResult` instead of `resultSegmentName`. There's a `customProperties` and we can probably use that field.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org
[GitHub] [incubator-pinot] snleee commented on a change in pull request #6975: Merge rollup executor enhancement
Posted by GitBox <gi...@apache.org>.
snleee commented on a change in pull request #6975:
URL: https://github.com/apache/incubator-pinot/pull/6975#discussion_r644338809
##########
File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
##########
@@ -138,11 +149,32 @@ protected void postProcess(PinotTaskConfig pinotTaskConfig) {
taskType + " on table: " + tableNameWithType + ", segments: " + inputSegmentNames + " got cancelled");
}
+ // Update the segment lineage to indicate that the segment replacement is in progress.
+ String lineageEntryId = null;
+ if (replaceSegmentsEnabled) {
+ List<String> segmentsFrom =
+ Arrays.stream(inputSegmentNames.split(",")).map(String::trim).collect(Collectors.toList());
+ List<String> segmentsTo =
+ segmentConversionResults.stream().map(SegmentConversionResult::getSegmentName).collect(Collectors.toList());
+ lineageEntryId = SegmentConversionUtils
+ .startSegmentReplace(tableNameWithType, uploadURL, new StartReplaceSegmentsRequest(segmentsFrom, segmentsTo));
+ }
+
// Upload the tarred segments
for (int i = 0; i < numOutputSegments; i++) {
File convertedTarredSegmentFile = tarredSegmentFiles.get(i);
String resultSegmentName = segmentConversionResults.get(i).getSegmentName();
+ // Set segment ZK metadata custom map modifier into HTTP header to modify the segment ZK metadata
+ SegmentZKMetadataCustomMapModifier segmentZKMetadataCustomMapModifier = getSegmentZKMetadataCustomMapModifier(pinotTaskConfig);
Review comment:
I think that we should pass `SegmentConversionResult` instead of `resultSegmentName`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org
[GitHub] [incubator-pinot] snleee merged pull request #6975: Merge rollup executor enhancement
Posted by GitBox <gi...@apache.org>.
snleee merged pull request #6975:
URL: https://github.com/apache/incubator-pinot/pull/6975
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org
[GitHub] [incubator-pinot] jtao15 commented on a change in pull request #6975: Merge rollup executor enhancement
Posted by GitBox <gi...@apache.org>.
jtao15 commented on a change in pull request #6975:
URL: https://github.com/apache/incubator-pinot/pull/6975#discussion_r644327186
##########
File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
##########
@@ -138,11 +149,32 @@ protected void postProcess(PinotTaskConfig pinotTaskConfig) {
taskType + " on table: " + tableNameWithType + ", segments: " + inputSegmentNames + " got cancelled");
}
+ // Update the segment lineage to indicate that the segment replacement is in progress.
+ String lineageEntryId = null;
+ if (replaceSegmentsEnabled) {
+ List<String> segmentsFrom =
+ Arrays.stream(inputSegmentNames.split(",")).map(String::trim).collect(Collectors.toList());
+ List<String> segmentsTo =
+ segmentConversionResults.stream().map(SegmentConversionResult::getSegmentName).collect(Collectors.toList());
+ lineageEntryId = SegmentConversionUtils
+ .startSegmentReplace(tableNameWithType, uploadURL, new StartReplaceSegmentsRequest(segmentsFrom, segmentsTo));
+ }
+
// Upload the tarred segments
for (int i = 0; i < numOutputSegments; i++) {
File convertedTarredSegmentFile = tarredSegmentFiles.get(i);
String resultSegmentName = segmentConversionResults.get(i).getSegmentName();
+ // Set segment ZK metadata custom map modifier into HTTP header to modify the segment ZK metadata
+ SegmentZKMetadataCustomMapModifier segmentZKMetadataCustomMapModifier = getSegmentZKMetadataCustomMapModifier(pinotTaskConfig);
Review comment:
Changed to pass the `resultSegmentName` in. `MergeRollupTaskExecutor` can decide to update the metadata or not based on the segment name once the time partitioning is introduced.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org
[GitHub] [incubator-pinot] jtao15 commented on a change in pull request #6975: Merge rollup executor enhancement
Posted by GitBox <gi...@apache.org>.
jtao15 commented on a change in pull request #6975:
URL: https://github.com/apache/incubator-pinot/pull/6975#discussion_r641859333
##########
File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/purge/PurgeTaskExecutor.java
##########
@@ -68,7 +68,7 @@ protected SegmentConversionResult convert(PinotTaskConfig pinotTaskConfig, File
@Override
protected SegmentZKMetadataCustomMapModifier getSegmentZKMetadataCustomMapModifier() {
- return new SegmentZKMetadataCustomMapModifier(SegmentZKMetadataCustomMapModifier.ModifyMode.REPLACE, Collections
+ return new SegmentZKMetadataCustomMapModifier(SegmentZKMetadataCustomMapModifier.ModifyMode.UPDATE, Collections
Review comment:
`ConvertToRawIndexMinionClusterIntegrationTest` is checking the map field of uploaded segment ZK metadata. The existing cluster integration tests (e.g. `OfflineClusterIntegrationTest`) will cover the upload api, but they are uploading segments without `SegmentZKMetadataCustomMapModifier`. Maybe I can modify one of the cluster integration test to upload segments with the modifier and add tests for it?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org
[GitHub] [incubator-pinot] snleee commented on a change in pull request #6975: Merge rollup executor enhancement
Posted by GitBox <gi...@apache.org>.
snleee commented on a change in pull request #6975:
URL: https://github.com/apache/incubator-pinot/pull/6975#discussion_r644371094
##########
File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
##########
@@ -64,6 +70,12 @@
File workingDir)
throws Exception;
+ /**
+ * Returns the segment ZK metadata custom map modifier.
+ */
+ protected abstract SegmentZKMetadataCustomMapModifier getSegmentZKMetadataCustomMapModifier(
Review comment:
Can we move this to `BaseTaskExecutor`?
We have the same abstract function defined in `BaseSingleSegmentConversionExecutor`. After we move this to `BaseTaskExecutor`, we can remove this from both `BaseSingleSegmentConversionExecutor` and `BaseMultipleSegmentsConversionExecutor`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org
[GitHub] [incubator-pinot] snleee commented on pull request #6975: Merge rollup executor enhancement
Posted by GitBox <gi...@apache.org>.
snleee commented on pull request #6975:
URL: https://github.com/apache/incubator-pinot/pull/6975#issuecomment-854092399
LGTM. Thanks for addressing all the comments.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org
[GitHub] [incubator-pinot] jtao15 commented on a change in pull request #6975: Merge rollup executor enhancement
Posted by GitBox <gi...@apache.org>.
jtao15 commented on a change in pull request #6975:
URL: https://github.com/apache/incubator-pinot/pull/6975#discussion_r641842651
##########
File path: pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
##########
@@ -296,6 +309,21 @@ public static URI getUploadSegmentURI(URI controllerURI)
return getURI(controllerURI.getScheme(), controllerURI.getHost(), controllerURI.getPort(), SEGMENT_PATH);
}
+ public static URI getStartReplaceSegmentsURI(URI controllerURI, String rawTableName, String tableType)
+ throws URISyntaxException {
+ return getURI(controllerURI.getScheme(), controllerURI.getHost(), controllerURI.getPort(),
Review comment:
Yes, the original `getUri()` will not work since `segments/mytable/startReplaceSegments?type=OFFLINE` is passed as `path`
```
private static URI getURI(String protocol, String host, int port, String path)
throws URISyntaxException {
if (!SUPPORTED_PROTOCOLS.contains(protocol)) {
throw new IllegalArgumentException(String.format("Unsupported protocol '%s'", protocol));
}
return new URI(protocol, null, host, port, path, null, null);
}
```
So I added the new function `getURI()` which will pass `segments/mytable/startReplaceSegments` as `path`, and `type=OFFLINE` as `query`.
```
private static URI getURI(String protocol, String host, int port, String path, String query)
throws URISyntaxException {
if (!SUPPORTED_PROTOCOLS.contains(protocol)) {
throw new IllegalArgumentException(String.format("Unsupported protocol '%s'", protocol));
}
return new URI(protocol, null, host, port, path, query, null);
}
```
I tried on local, it will return the correct uri.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org
[GitHub] [incubator-pinot] snleee commented on a change in pull request #6975: Merge rollup executor enhancement
Posted by GitBox <gi...@apache.org>.
snleee commented on a change in pull request #6975:
URL: https://github.com/apache/incubator-pinot/pull/6975#discussion_r641299163
##########
File path: pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
##########
@@ -172,7 +185,7 @@ public static URI getDeleteSegmentHttpUri(String host, int port, String rawTable
String tableType)
throws URISyntaxException {
return new URI(StringUtil.join("/", StringUtils.chomp(HTTP + "://" + host + ":" + port, "/"), OLD_SEGMENT_PATH,
- rawTableName + "/" + URIUtils.encode(segmentName) + TYPE_DELIMITER + tableType));
+ rawTableName + "/" + URIUtils.encode(segmentName) + "?" + TYPE_DELIMITER + tableType));
Review comment:
This API looks to be deprecated. Why do we need this change?
##########
File path: pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
##########
@@ -185,7 +198,7 @@ public static URI getRetrieveAllSegmentWithTableTypeHttpUri(String host, int por
String tableType)
throws URISyntaxException {
return new URI(StringUtil.join("/", StringUtils.chomp(HTTP + "://" + host + ":" + port, "/"), OLD_SEGMENT_PATH,
- rawTableName + TYPE_DELIMITER + tableType));
+ rawTableName + "?" + TYPE_DELIMITER + tableType));
Review comment:
Same here
##########
File path: pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
##########
@@ -46,6 +48,7 @@ private MinionConstants() {
public static final String RETRY_SCALE_FACTOR_KEY = "retryScaleFactor";
public static final String TABLE_MAX_NUM_TASKS_KEY = "tableMaxNumTasks";
+ public static final String REPLACE_SEGMENTS_KEY = "replaceSegments";
Review comment:
`enableSegmentsReplacement`?
##########
File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
##########
@@ -138,11 +149,32 @@ protected void postProcess(PinotTaskConfig pinotTaskConfig) {
taskType + " on table: " + tableNameWithType + ", segments: " + inputSegmentNames + " got cancelled");
}
+ // Update the segment lineage to indicate that the segment replacement is in progress.
+ String lineageEntryId = null;
+ if (replaceSegmentsEnabled) {
+ List<String> segmentsFrom =
+ Arrays.stream(inputSegmentNames.split(",")).map(String::trim).collect(Collectors.toList());
+ List<String> segmentsTo =
+ segmentConversionResults.stream().map(SegmentConversionResult::getSegmentName).collect(Collectors.toList());
+ lineageEntryId = SegmentConversionUtils
+ .startSegmentReplace(tableNameWithType, uploadURL, new StartReplaceSegmentsRequest(segmentsFrom, segmentsTo));
+ }
+
// Upload the tarred segments
for (int i = 0; i < numOutputSegments; i++) {
File convertedTarredSegmentFile = tarredSegmentFiles.get(i);
String resultSegmentName = segmentConversionResults.get(i).getSegmentName();
+ // Set segment ZK metadata custom map modifier into HTTP header to modify the segment ZK metadata
+ SegmentZKMetadataCustomMapModifier segmentZKMetadataCustomMapModifier = getSegmentZKMetadataCustomMapModifier(pinotTaskConfig);
+ Header segmentZKMetadataCustomMapModifierHeader =
+ new BasicHeader(FileUploadDownloadClient.CustomHeaders.SEGMENT_ZK_METADATA_CUSTOM_MAP_MODIFIER,
+ segmentZKMetadataCustomMapModifier.toJsonString());
+
+ List<Header> httpHeaders = new ArrayList<>();
+ httpHeaders.add(segmentZKMetadataCustomMapModifierHeader);
+ httpHeaders.addAll(FileUploadDownloadClient.makeAuthHeader(authToken));
Review comment:
Can you explain why we need this?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org
[GitHub] [incubator-pinot] codecov-commenter edited a comment on pull request #6975: Merge rollup executor enhancement
Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #6975:
URL: https://github.com/apache/incubator-pinot/pull/6975#issuecomment-848278058
# [Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6975?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
> Merging [#6975](https://codecov.io/gh/apache/incubator-pinot/pull/6975?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (690e902) into [master](https://codecov.io/gh/apache/incubator-pinot/commit/d6154188ed401fdbdea8f78c9675fea0237bb6a6?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (d615418) will **decrease** coverage by `7.86%`.
> The diff coverage is `42.79%`.
[![Impacted file tree graph](https://codecov.io/gh/apache/incubator-pinot/pull/6975/graphs/tree.svg?width=650&height=150&src=pr&token=4ibza2ugkz&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/incubator-pinot/pull/6975?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
```diff
@@ Coverage Diff @@
## master #6975 +/- ##
============================================
- Coverage 73.31% 65.45% -7.87%
Complexity 12 12
============================================
Files 1441 1445 +4
Lines 71450 71652 +202
Branches 10354 10376 +22
============================================
- Hits 52385 46898 -5487
- Misses 15541 21357 +5816
+ Partials 3524 3397 -127
```
| Flag | Coverage Δ | |
|---|---|---|
| integration | `?` | |
| unittests | `65.45% <42.79%> (-0.07%)` | :arrow_down: |
Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
| [Impacted Files](https://codecov.io/gh/apache/incubator-pinot/pull/6975?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
|---|---|---|
| [...e/pinot/common/minion/MergeRollupTaskMetadata.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vbWluaW9uL01lcmdlUm9sbHVwVGFza01ldGFkYXRhLmphdmE=) | `0.00% <0.00%> (ø)` | |
| [...e/pinot/common/minion/MinionTaskMetadataUtils.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vbWluaW9uL01pbmlvblRhc2tNZXRhZGF0YVV0aWxzLmphdmE=) | `0.00% <0.00%> (-75.00%)` | :arrow_down: |
| [...troller/helix/core/minion/ClusterInfoAccessor.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29udHJvbGxlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29udHJvbGxlci9oZWxpeC9jb3JlL21pbmlvbi9DbHVzdGVySW5mb0FjY2Vzc29yLmphdmE=) | `32.14% <0.00%> (-47.86%)` | :arrow_down: |
| [.../org/apache/pinot/core/common/MinionConstants.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9jb21tb24vTWluaW9uQ29uc3RhbnRzLmphdmE=) | `0.00% <ø> (ø)` | |
| [...ore/minion/rollup/MergeRollupSegmentConverter.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9taW5pb24vcm9sbHVwL01lcmdlUm9sbHVwU2VnbWVudENvbnZlcnRlci5qYXZh) | `89.28% <ø> (ø)` | |
| [.../tasks/BaseMultipleSegmentsConversionExecutor.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1taW5pb24tdGFza3MvcGlub3QtbWluaW9uLWJ1aWx0aW4tdGFza3Mvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3Bpbm90L3BsdWdpbi9taW5pb24vdGFza3MvQmFzZU11bHRpcGxlU2VnbWVudHNDb252ZXJzaW9uRXhlY3V0b3IuamF2YQ==) | `2.59% <0.00%> (-87.24%)` | :arrow_down: |
| [...ot/plugin/minion/tasks/SegmentConversionUtils.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1taW5pb24tdGFza3MvcGlub3QtbWluaW9uLWJ1aWx0aW4tdGFza3Mvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3Bpbm90L3BsdWdpbi9taW5pb24vdGFza3MvU2VnbWVudENvbnZlcnNpb25VdGlscy5qYXZh) | `0.00% <0.00%> (-52.95%)` | :arrow_down: |
| [...t/plugin/minion/tasks/purge/PurgeTaskExecutor.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1taW5pb24tdGFza3MvcGlub3QtbWluaW9uLWJ1aWx0aW4tdGFza3Mvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3Bpbm90L3BsdWdpbi9taW5pb24vdGFza3MvcHVyZ2UvUHVyZ2VUYXNrRXhlY3V0b3IuamF2YQ==) | `70.83% <0.00%> (ø)` | |
| [...egments/RealtimeToOfflineSegmentsTaskExecutor.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1taW5pb24tdGFza3MvcGlub3QtbWluaW9uLWJ1aWx0aW4tdGFza3Mvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3Bpbm90L3BsdWdpbi9taW5pb24vdGFza3MvcmVhbHRpbWVfdG9fb2ZmbGluZV9zZWdtZW50cy9SZWFsdGltZVRvT2ZmbGluZVNlZ21lbnRzVGFza0V4ZWN1dG9yLmphdmE=) | `78.26% <0.00%> (-11.45%)` | :arrow_down: |
| [...e/pinot/common/utils/FileUploadDownloadClient.java](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vdXRpbHMvRmlsZVVwbG9hZERvd25sb2FkQ2xpZW50LmphdmE=) | `19.46% <5.26%> (-38.91%)` | :arrow_down: |
| ... and [353 more](https://codecov.io/gh/apache/incubator-pinot/pull/6975/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
------
[Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6975?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
> **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
> `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
> Powered by [Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6975?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [d615418...690e902](https://codecov.io/gh/apache/incubator-pinot/pull/6975?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org
[GitHub] [incubator-pinot] jtao15 commented on a change in pull request #6975: Merge rollup executor enhancement
Posted by GitBox <gi...@apache.org>.
jtao15 commented on a change in pull request #6975:
URL: https://github.com/apache/incubator-pinot/pull/6975#discussion_r644326007
##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/MergeRollupSegmentProcessorFramework.java
##########
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.framework;
+
+import com.google.common.base.Preconditions;
+import java.io.File;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.core.segment.processing.collector.CollectorConfig;
+import org.apache.pinot.core.segment.processing.collector.CollectorFactory;
+import org.apache.pinot.core.segment.processing.collector.ValueAggregatorFactory;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+
+
+/**
+ * Merge rollup segment processor framework takes a list of segments and concatenates/rolls up segments based on
+ * the configuration.
+ *
+ * TODO:
+ * 1. Add the support for roll-up
+ * 2. Add the support to make result segments time aligned
+ * 3. Add merge/roll-up prefixes for result segments
+ */
+public class MergeRollupSegmentProcessorFramework {
+ private List<File> _originalIndexDirs;
+ private final File _workingDir;
+ private final String _inputSegmentDir;
+ private final String _outputSegmentDir;
+ private final SegmentProcessorConfig _segmentProcessorConfig;
+
+ private MergeRollupSegmentProcessorFramework(TableConfig tableConfig, Schema schema, String mergeType,
Review comment:
Done
##########
File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
##########
@@ -138,11 +149,32 @@ protected void postProcess(PinotTaskConfig pinotTaskConfig) {
taskType + " on table: " + tableNameWithType + ", segments: " + inputSegmentNames + " got cancelled");
}
+ // Update the segment lineage to indicate that the segment replacement is in progress.
+ String lineageEntryId = null;
+ if (replaceSegmentsEnabled) {
+ List<String> segmentsFrom =
+ Arrays.stream(inputSegmentNames.split(",")).map(String::trim).collect(Collectors.toList());
+ List<String> segmentsTo =
+ segmentConversionResults.stream().map(SegmentConversionResult::getSegmentName).collect(Collectors.toList());
+ lineageEntryId = SegmentConversionUtils
+ .startSegmentReplace(tableNameWithType, uploadURL, new StartReplaceSegmentsRequest(segmentsFrom, segmentsTo));
+ }
+
// Upload the tarred segments
for (int i = 0; i < numOutputSegments; i++) {
File convertedTarredSegmentFile = tarredSegmentFiles.get(i);
String resultSegmentName = segmentConversionResults.get(i).getSegmentName();
+ // Set segment ZK metadata custom map modifier into HTTP header to modify the segment ZK metadata
+ SegmentZKMetadataCustomMapModifier segmentZKMetadataCustomMapModifier = getSegmentZKMetadataCustomMapModifier(pinotTaskConfig);
Review comment:
Changed to pass the `resultSegmentName` in. `MergeRollupTaskExecutor` can decide to update the metadata or not based on the segment name once the time partitioning is introduced.
##########
File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
##########
@@ -138,11 +149,32 @@ protected void postProcess(PinotTaskConfig pinotTaskConfig) {
taskType + " on table: " + tableNameWithType + ", segments: " + inputSegmentNames + " got cancelled");
}
+ // Update the segment lineage to indicate that the segment replacement is in progress.
+ String lineageEntryId = null;
+ if (replaceSegmentsEnabled) {
+ List<String> segmentsFrom =
+ Arrays.stream(inputSegmentNames.split(",")).map(String::trim).collect(Collectors.toList());
+ List<String> segmentsTo =
+ segmentConversionResults.stream().map(SegmentConversionResult::getSegmentName).collect(Collectors.toList());
+ lineageEntryId = SegmentConversionUtils
+ .startSegmentReplace(tableNameWithType, uploadURL, new StartReplaceSegmentsRequest(segmentsFrom, segmentsTo));
+ }
+
// Upload the tarred segments
for (int i = 0; i < numOutputSegments; i++) {
File convertedTarredSegmentFile = tarredSegmentFiles.get(i);
String resultSegmentName = segmentConversionResults.get(i).getSegmentName();
+ // Set segment ZK metadata custom map modifier into HTTP header to modify the segment ZK metadata
+ SegmentZKMetadataCustomMapModifier segmentZKMetadataCustomMapModifier = getSegmentZKMetadataCustomMapModifier(pinotTaskConfig);
Review comment:
Updated.
##########
File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
##########
@@ -64,6 +70,12 @@
File workingDir)
throws Exception;
+ /**
+ * Returns the segment ZK metadata custom map modifier.
+ */
+ protected abstract SegmentZKMetadataCustomMapModifier getSegmentZKMetadataCustomMapModifier(
Review comment:
Updated.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org
[GitHub] [incubator-pinot] jtao15 commented on a change in pull request #6975: Merge rollup executor enhancement
Posted by GitBox <gi...@apache.org>.
jtao15 commented on a change in pull request #6975:
URL: https://github.com/apache/incubator-pinot/pull/6975#discussion_r641846225
##########
File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/merge_rollup/MergeRollupTaskExecutor.java
##########
@@ -40,40 +53,133 @@
* TODO:
* 1. Add the support for roll-up
* 2. Add the support for time split to provide backfill support for merged segments
- * 3. Change the way to decide the number of output segments (explicit numPartition config -> maxNumRowsPerSegment)
+ * 3. Add merge/rollup name prefixes for generated segments
+ * 4. Add the support for realtime table
*/
public class MergeRollupTaskExecutor extends BaseMultipleSegmentsConversionExecutor {
private static final Logger LOGGER = LoggerFactory.getLogger(MergeRollupTaskExecutor.class);
+ private static final String INPUT_SEGMENTS_DIR = "input_segments";
+ private static final String OUTPUT_SEGMENTS_DIR = "output_segments";
@Override
protected List<SegmentConversionResult> convert(PinotTaskConfig pinotTaskConfig, List<File> originalIndexDirs,
Review comment:
Yes, will update this.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org
[GitHub] [incubator-pinot] jtao15 commented on a change in pull request #6975: Merge rollup executor enhancement
Posted by GitBox <gi...@apache.org>.
jtao15 commented on a change in pull request #6975:
URL: https://github.com/apache/incubator-pinot/pull/6975#discussion_r644379499
##########
File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
##########
@@ -64,6 +70,12 @@
File workingDir)
throws Exception;
+ /**
+ * Returns the segment ZK metadata custom map modifier.
+ */
+ protected abstract SegmentZKMetadataCustomMapModifier getSegmentZKMetadataCustomMapModifier(
Review comment:
Updated.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org