You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2022/04/28 20:54:11 UTC

[GitHub] [pinot] noon-stripe opened a new pull request, #8611: Merge Ingestion Aggregation Feature

noon-stripe opened a new pull request, #8611:
URL: https://github.com/apache/pinot/pull/8611

   Description
   This PR adds Ingestion Aggregation. The design doc can be found https://github.com/apache/pinot/issues/8360.
   
   This feature aggregates values at ingestion time, which reduces the number of rows stores (thus speeding up queries), using the same strategy as the 'aggregateMetrics' feature. This expands the feature to include, COUNT, MIN, and MAX, with the ability to expand further.
   
   Upgrade Notes
   Does this PR prevent a zero down-time upgrade? (Assume upgrade order: Controller, Broker, Server, Minion)
   
    Yes (Please label as backward-incompat, and complete the section below on Release Notes)
   Does this PR fix a zero-downtime upgrade introduced earlier?
   
    Yes (Please label this as backward-incompat, and complete the section below on Release Notes)
   Does this PR otherwise need attention when creating release notes? Things to consider:
   
   New configuration options
   Deprecation of configurations
   Signature changes to public methods/interfaces
   New plugins added or old plugins removed
    Yes (Please label this PR as release-notes and complete the section on Release Notes)
   Release Notes
   Ingestion Pre-Aggregation is now supported for MIN, MAX, and COUNT, in addition to SUM. To enable the feature, add an aggregationConfig to the ingestionConfigs of a realtime table config. The format of the config is
   
   "aggregationConfigs": [
     {
       "columnName": "destColumn",
       "aggregationFunction": "MIN(srcColumn)"
     }
   ],
   The destColumn must be in the schema and the srcColumn must not be in the schema. Additionally, all destColumns must be noDictionaryColumns.
   
   Documentation
   After this PR I will add the documentation.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] noon-stripe commented on a diff in pull request #8611: Ingestion Aggregation Feature

Posted by GitBox <gi...@apache.org>.
noon-stripe commented on code in PR #8611:
URL: https://github.com/apache/pinot/pull/8611#discussion_r875043785


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java:
##########
@@ -336,10 +353,15 @@ public long getLatestIngestionTimestamp() {
       // Null value vector
       MutableNullValueVector nullValueVector = _nullHandlingEnabled ? new MutableNullValueVector() : null;
 
+      String sourceColumn = metricsAggregators.containsKey(column) ? metricsAggregators.get(column).getFirst() : column;
+      ValueAggregator valueAggregator =
+          metricsAggregators.containsKey(column) ? metricsAggregators.get(column).getSecond() : null;

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] Jackie-Jiang merged pull request #8611: Ingestion Aggregation Feature

Posted by GitBox <gi...@apache.org>.
Jackie-Jiang merged PR #8611:
URL: https://github.com/apache/pinot/pull/8611


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] noon-stripe commented on a diff in pull request #8611: Ingestion Aggregation Feature

Posted by GitBox <gi...@apache.org>.
noon-stripe commented on code in PR #8611:
URL: https://github.com/apache/pinot/pull/8611#discussion_r870770105


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java:
##########
@@ -506,13 +510,26 @@ private PartitionUpsertMetadataManager.RecordInfo getRecordInfo(GenericRow row,
     return new PartitionUpsertMetadataManager.RecordInfo(primaryKey, docId, (Comparable) upsertComparisonValue);
   }
 
+  /**
+   *
+   * This helper function checks if the value is null, which is only applicable if the column isn't "*",
+   * which is a side effect of metrics aggregation "COUNT" operation.
+   *
+   * @param column
+   * @param value
+   * @return is the value null if the column is an actual data column (not "*").
+   */
+  private boolean isColumnValueNull(String column, Object value) {
+    return !column.equals("*") && value == null;
+  }
+
   private void updateDictionary(GenericRow row) {
     for (Map.Entry<String, IndexContainer> entry : _indexContainerMap.entrySet()) {
-      String column = entry.getKey();
+      String column = _ingestionAggregator.getMetricName(entry.getKey());

Review Comment:
   I've fixed this. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] noon-stripe commented on a diff in pull request #8611: Ingestion Aggregation Feature

Posted by GitBox <gi...@apache.org>.
noon-stripe commented on code in PR #8611:
URL: https://github.com/apache/pinot/pull/8611#discussion_r875070266


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java:
##########
@@ -306,15 +311,71 @@ public static void validateIngestionConfig(TableConfig tableConfig, @Nullable Sc
         }
       }
 
+      // Aggregation configs
+      List<AggregationConfig> aggregationConfigs = ingestionConfig.getAggregationConfigs();
+      Set<String> aggregationSourceColumns = new HashSet<>();
+      if (aggregationConfigs != null) {
+        Preconditions.checkState(
+            !tableConfig.getIndexingConfig().isAggregateMetrics(),
+            "aggregateMetrics cannot be set with AggregationConfig");
+        Set<String> aggregationColumns = new HashSet<>();
+        for (AggregationConfig aggregationConfig : aggregationConfigs) {
+          String columnName = aggregationConfig.getColumnName();
+          if (schema != null) {

Review Comment:
   Which check? I believe these are in the correct order, right? 
   
   ```
               FieldSpec fieldSpec = schema.getFieldSpecFor(columnName);
               Preconditions.checkState(fieldSpec != null, "The destination column '" + columnName
                   + "' of the aggregation function must be present in the schema");
               Preconditions.checkState(fieldSpec.getFieldType() == FieldSpec.FieldType.METRIC,
                   "The destination column '" + columnName + "' of the aggregation function must be a metric column");
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] noon-stripe commented on a diff in pull request #8611: Ingestion Aggregation Feature

Posted by GitBox <gi...@apache.org>.
noon-stripe commented on code in PR #8611:
URL: https://github.com/apache/pinot/pull/8611#discussion_r875040530


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java:
##########
@@ -20,6 +20,7 @@
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Sets;
+import groovy.lang.Tuple2;

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] codecov-commenter commented on pull request #8611: Merge Ingestion Aggregation Feature

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on PR #8611:
URL: https://github.com/apache/pinot/pull/8611#issuecomment-1112698554

   # [Codecov](https://codecov.io/gh/apache/pinot/pull/8611?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#8611](https://codecov.io/gh/apache/pinot/pull/8611?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (db7fb0d) into [master](https://codecov.io/gh/apache/pinot/commit/232b946419d05b785610e9b2daf7467f5f8bee82?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (232b946) will **decrease** coverage by `1.60%`.
   > The diff coverage is `76.92%`.
   
   > :exclamation: Current head db7fb0d differs from pull request most recent head ce69c7b. Consider uploading reports for the commit ce69c7b to get more accurate results
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #8611      +/-   ##
   ============================================
   - Coverage     70.68%   69.08%   -1.61%     
   - Complexity     4321     4362      +41     
   ============================================
     Files          1693     1695       +2     
     Lines         88795    88922     +127     
     Branches      13472    13501      +29     
   ============================================
   - Hits          62767    61433    -1334     
   - Misses        21639    23174    +1535     
   + Partials       4389     4315      -74     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | integration1 | `?` | |
   | integration2 | `25.68% <3.84%> (-0.11%)` | :arrow_down: |
   | unittests1 | `66.98% <76.92%> (+0.03%)` | :arrow_up: |
   | unittests2 | `14.17% <0.00%> (-0.03%)` | :arrow_down: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/pinot/pull/8611?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...he/pinot/common/utils/grpc/GrpcRequestBuilder.java](https://codecov.io/gh/apache/pinot/pull/8611/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vdXRpbHMvZ3JwYy9HcnBjUmVxdWVzdEJ1aWxkZXIuamF2YQ==) | `72.72% <ø> (ø)` | |
   | [...che/pinot/controller/util/FileIngestionHelper.java](https://codecov.io/gh/apache/pinot/pull/8611/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29udHJvbGxlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29udHJvbGxlci91dGlsL0ZpbGVJbmdlc3Rpb25IZWxwZXIuamF2YQ==) | `91.02% <ø> (ø)` | |
   | [...g/apache/pinot/spi/utils/IngestionConfigUtils.java](https://codecov.io/gh/apache/pinot/pull/8611/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc3BpL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9zcGkvdXRpbHMvSW5nZXN0aW9uQ29uZmlnVXRpbHMuamF2YQ==) | `73.33% <50.00%> (-2.73%)` | :arrow_down: |
   | [...local/indexsegment/mutable/MutableSegmentImpl.java](https://codecov.io/gh/apache/pinot/pull/8611/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc2VnbWVudC1sb2NhbC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3Qvc2VnbWVudC9sb2NhbC9pbmRleHNlZ21lbnQvbXV0YWJsZS9NdXRhYmxlU2VnbWVudEltcGwuamF2YQ==) | `59.44% <56.52%> (+0.63%)` | :arrow_up: |
   | [...tils/ingestionaggregation/IngestionAggregator.java](https://codecov.io/gh/apache/pinot/pull/8611/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc2VnbWVudC1sb2NhbC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3Qvc2VnbWVudC9sb2NhbC91dGlscy9pbmdlc3Rpb25hZ2dyZWdhdGlvbi9Jbmdlc3Rpb25BZ2dyZWdhdG9yLmphdmE=) | `80.64% <80.64%> (ø)` | |
   | [...he/pinot/segment/local/utils/TableConfigUtils.java](https://codecov.io/gh/apache/pinot/pull/8611/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc2VnbWVudC1sb2NhbC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3Qvc2VnbWVudC9sb2NhbC91dGlscy9UYWJsZUNvbmZpZ1V0aWxzLmphdmE=) | `66.53% <84.21%> (+1.39%)` | :arrow_up: |
   | [...ache/pinot/segment/local/utils/IngestionUtils.java](https://codecov.io/gh/apache/pinot/pull/8611/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc2VnbWVudC1sb2NhbC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3Qvc2VnbWVudC9sb2NhbC91dGlscy9Jbmdlc3Rpb25VdGlscy5qYXZh) | `29.14% <90.90%> (+4.14%)` | :arrow_up: |
   | [...rg/apache/pinot/common/function/FunctionUtils.java](https://codecov.io/gh/apache/pinot/pull/8611/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vZnVuY3Rpb24vRnVuY3Rpb25VdGlscy5qYXZh) | `100.00% <100.00%> (ø)` | |
   | [...a/org/apache/pinot/common/utils/PinotDataType.java](https://codecov.io/gh/apache/pinot/pull/8611/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vdXRpbHMvUGlub3REYXRhVHlwZS5qYXZh) | `81.11% <100.00%> (+0.04%)` | :arrow_up: |
   | [...he/pinot/common/utils/config/TableConfigUtils.java](https://codecov.io/gh/apache/pinot/pull/8611/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vdXRpbHMvY29uZmlnL1RhYmxlQ29uZmlnVXRpbHMuamF2YQ==) | `83.82% <100.00%> (ø)` | |
   | ... and [148 more](https://codecov.io/gh/apache/pinot/pull/8611/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/pinot/pull/8611?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/pinot/pull/8611?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [232b946...ce69c7b](https://codecov.io/gh/apache/pinot/pull/8611?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] noon-stripe commented on a diff in pull request #8611: Ingestion Aggregation Feature

Posted by GitBox <gi...@apache.org>.
noon-stripe commented on code in PR #8611:
URL: https://github.com/apache/pinot/pull/8611#discussion_r870768539


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java:
##########
@@ -1005,7 +1075,7 @@ private int getOrCreateDocId() {
    */
   private IdMap<FixedIntArray> enableMetricsAggregationIfPossible(RealtimeSegmentConfig config,

Review Comment:
   Because _recordIdMap is final, it doesn't like it being assigned within a method. I moved _ingestionAggregator from being a class member and instead have sourceColumn and valueAggregator as members of IndexContainer. Not touching the return type of this, but it all works (see code once I update PR). 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] noon-stripe commented on a diff in pull request #8611: Ingestion Aggregation Feature

Posted by GitBox <gi...@apache.org>.
noon-stripe commented on code in PR #8611:
URL: https://github.com/apache/pinot/pull/8611#discussion_r875133037


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java:
##########
@@ -363,6 +424,25 @@ public static void validateIngestionConfig(TableConfig tableConfig, @Nullable Sc
     }
   }
 
+  /**
+   * Currently only, ValueAggregators with fixed width types are allowed, so MIN, MAX, SUM, and COUNT. The reason
+   * is that only the {@link org.apache.pinot.segment.local.realtime.impl.forward.FixedByteSVMutableForwardIndex}
+   * supports random inserts and lookups. The
+   * {@link org.apache.pinot.segment.local.realtime.impl.forward.VarByteSVMutableForwardIndex only supports
+   * sequential inserts.
+   */
+  public static void validateIngestionAggregation(String name) {
+    List<AggregationFunctionType> allowed =

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] noon-stripe commented on a diff in pull request #8611: Ingestion Aggregation Feature

Posted by GitBox <gi...@apache.org>.
noon-stripe commented on code in PR #8611:
URL: https://github.com/apache/pinot/pull/8611#discussion_r875068654


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java:
##########
@@ -306,15 +311,71 @@ public static void validateIngestionConfig(TableConfig tableConfig, @Nullable Sc
         }
       }
 
+      // Aggregation configs
+      List<AggregationConfig> aggregationConfigs = ingestionConfig.getAggregationConfigs();
+      Set<String> aggregationSourceColumns = new HashSet<>();
+      if (aggregationConfigs != null) {

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] noon-stripe commented on a diff in pull request #8611: Ingestion Aggregation Feature

Posted by GitBox <gi...@apache.org>.
noon-stripe commented on code in PR #8611:
URL: https://github.com/apache/pinot/pull/8611#discussion_r870753251


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java:
##########
@@ -506,13 +510,26 @@ private PartitionUpsertMetadataManager.RecordInfo getRecordInfo(GenericRow row,
     return new PartitionUpsertMetadataManager.RecordInfo(primaryKey, docId, (Comparable) upsertComparisonValue);
   }
 
+  /**
+   *

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] noon-stripe commented on a diff in pull request #8611: Ingestion Aggregation Feature

Posted by GitBox <gi...@apache.org>.
noon-stripe commented on code in PR #8611:
URL: https://github.com/apache/pinot/pull/8611#discussion_r870714615


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/IngestionUtils.java:
##########
@@ -360,6 +364,19 @@ private static void extractFieldsFromIngestionConfig(@Nullable IngestionConfig i
           fields.addAll(functionEvaluator.getArguments());
         }
       }
+      List<AggregationConfig> aggregationConfigs = ingestionConfig.getAggregationConfigs();
+      if (aggregationConfigs != null) {
+        for (AggregationConfig aggregationConfig : aggregationConfigs) {
+          ExpressionContext expressionContext =
+              RequestContextUtils.getExpression(aggregationConfig.getAggregationFunction());
+          FunctionContext functionContext = expressionContext.getFunction();
+          if (functionContext != null) {
+            for (ExpressionContext argument : functionContext.getArguments()) {
+              fields.add(argument.getIdentifier());
+            }
+          }

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] noon-stripe commented on a diff in pull request #8611: Ingestion Aggregation Feature

Posted by GitBox <gi...@apache.org>.
noon-stripe commented on code in PR #8611:
URL: https://github.com/apache/pinot/pull/8611#discussion_r875323123


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java:
##########
@@ -306,15 +311,71 @@ public static void validateIngestionConfig(TableConfig tableConfig, @Nullable Sc
         }
       }
 
+      // Aggregation configs
+      List<AggregationConfig> aggregationConfigs = ingestionConfig.getAggregationConfigs();
+      Set<String> aggregationSourceColumns = new HashSet<>();
+      if (aggregationConfigs != null) {
+        Preconditions.checkState(
+            !tableConfig.getIndexingConfig().isAggregateMetrics(),
+            "aggregateMetrics cannot be set with AggregationConfig");
+        Set<String> aggregationColumns = new HashSet<>();
+        for (AggregationConfig aggregationConfig : aggregationConfigs) {
+          String columnName = aggregationConfig.getColumnName();
+          if (schema != null) {

Review Comment:
   I see, thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] Jackie-Jiang commented on a diff in pull request #8611: Ingestion Aggregation Feature

Posted by GitBox <gi...@apache.org>.
Jackie-Jiang commented on code in PR #8611:
URL: https://github.com/apache/pinot/pull/8611#discussion_r864331170


##########
pinot-common/src/main/java/org/apache/pinot/common/utils/grpc/GrpcRequestBuilder.java:
##########
@@ -31,7 +31,6 @@
 import org.apache.thrift.TSerializer;
 import org.apache.thrift.protocol.TCompactProtocol;
 
-

Review Comment:
   (minor) Revert



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ingestionaggregation/IngestionAggregator.java:
##########
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.utils.ingestionaggregation;
+
+import com.google.common.base.Preconditions;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.request.context.FunctionContext;
+import org.apache.pinot.common.request.context.RequestContextUtils;
+import org.apache.pinot.segment.local.aggregator.ValueAggregator;
+import org.apache.pinot.segment.local.aggregator.ValueAggregatorFactory;
+import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig;
+import org.apache.pinot.segment.local.utils.TableConfigUtils;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.config.table.AggregationConfig;
+
+
+public class IngestionAggregator {
+  private final Map<String, String> _aggregatorColumnNameToMetricColumnName;
+  private final Map<String, ValueAggregator> _aggregatorColumnNameToValueAggregator;
+  private boolean _disabled;
+
+  public static IngestionAggregator fromRealtimeSegmentConfig(RealtimeSegmentConfig segmentConfig) {
+    if (segmentConfig == null || segmentConfig.getIngestionAggregationConfigs() == null
+        || segmentConfig.getIngestionAggregationConfigs().size() == 0) {
+      return new IngestionAggregator(new HashMap<>(), new HashMap<>());
+    }
+
+    Map<String, String> destColumnToSrcColumn = new HashMap<>();

Review Comment:
   Suggest keeping a single `Map<String, Pair<String, ValueAggregator>>` to reduce overhead



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java:
##########
@@ -1058,6 +1105,10 @@ private IdMap<FixedIntArray> enableMetricsAggregationIfPossible(RealtimeSegmentC
         RECORD_ID_MAP);
   }
 
+  private boolean isAggregateMetricsEnabled() {
+    return _aggregateMetrics || _ingestionAggregator.isEnabled();

Review Comment:
   This can be simplified to `return _recordIdMap != null`, and we don't need to track `disabled` inside the `IngestionAggregator`



##########
pinot-spi/src/main/java/org/apache/pinot/spi/config/table/AggregationConfig.java:
##########
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.config.table;

Review Comment:
   Move this into `ingestion` folder (same level as `TransformConfig`)



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ingestionaggregation/IngestionAggregator.java:
##########
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.utils.ingestionaggregation;
+
+import com.google.common.base.Preconditions;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.request.context.FunctionContext;
+import org.apache.pinot.common.request.context.RequestContextUtils;
+import org.apache.pinot.segment.local.aggregator.ValueAggregator;
+import org.apache.pinot.segment.local.aggregator.ValueAggregatorFactory;
+import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig;
+import org.apache.pinot.segment.local.utils.TableConfigUtils;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.config.table.AggregationConfig;
+
+
+public class IngestionAggregator {
+  private final Map<String, String> _aggregatorColumnNameToMetricColumnName;
+  private final Map<String, ValueAggregator> _aggregatorColumnNameToValueAggregator;
+  private boolean _disabled;
+
+  public static IngestionAggregator fromRealtimeSegmentConfig(RealtimeSegmentConfig segmentConfig) {
+    if (segmentConfig == null || segmentConfig.getIngestionAggregationConfigs() == null
+        || segmentConfig.getIngestionAggregationConfigs().size() == 0) {
+      return new IngestionAggregator(new HashMap<>(), new HashMap<>());

Review Comment:
   (minor)
   ```suggestion
         return new IngestionAggregator(Collections.emptyMap(), Collections.emptyMap());
   ```



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java:
##########
@@ -306,15 +311,67 @@ public static void validateIngestionConfig(TableConfig tableConfig, @Nullable Sc
         }
       }
 
+      // Aggregation configs
+      List<AggregationConfig> aggregationConfigs = ingestionConfig.getAggregationConfigs();
+      Set<String> aggregationSourceColumns = new HashSet<>();
+      if (aggregationConfigs != null) {
+        Preconditions.checkState(
+            tableConfig.getIndexingConfig() == null || !tableConfig.getIndexingConfig().isAggregateMetrics(),
+            "aggregateMetrics cannot be set with AggregationConfig");
+        Set<String> aggregationColumns = new HashSet<>();
+        for (AggregationConfig aggregationConfig : aggregationConfigs) {
+          String columnName = aggregationConfig.getColumnName();
+          if (schema != null) {
+            Preconditions.checkState(schema.getFieldSpecFor(columnName) != null, "The destination column '" + columnName

Review Comment:
   We should also check if it is a metric. We cannot aggregate a dimension/time column



##########
pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/IngestionConfig.java:
##########
@@ -44,18 +45,23 @@ public class IngestionConfig extends BaseJsonConfig {
   @JsonPropertyDescription("Configs related to record transformation functions applied during ingestion")
   private final List<TransformConfig> _transformConfigs;
 
+  @JsonPropertyDescription("Configs related to record aggregation function applied during ingestion")
+  private final List<AggregationConfig> _aggregationConfigs;

Review Comment:
   (minor) Suggest adding it as the last member variable because it is applied at last during ingestion after filter/transform/complexTypeHandling



##########
pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTestUtils.java:
##########
@@ -40,7 +42,24 @@ private MutableSegmentImplTestUtils() {
 
   private static final String TABLE_NAME_WITH_TYPE = "testTable_REALTIME";
   private static final String SEGMENT_NAME = "testSegment__0__0__155555";
-  private static final String STEAM_NAME = "testStream";
+  private static final String STREAM_NAME = "testStream";
+
+  public static MutableSegmentImpl createMutableSegmentImpl(Schema schema, Set<String> noDictionaryColumns,

Review Comment:
   Suggest adding an extra parameter `preAggregationConfigs` to the last method (with the actual implementation) and call that in this method. We don't want to maintain 2 methods with implementation



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java:
##########
@@ -306,15 +311,67 @@ public static void validateIngestionConfig(TableConfig tableConfig, @Nullable Sc
         }
       }
 
+      // Aggregation configs
+      List<AggregationConfig> aggregationConfigs = ingestionConfig.getAggregationConfigs();
+      Set<String> aggregationSourceColumns = new HashSet<>();
+      if (aggregationConfigs != null) {
+        Preconditions.checkState(
+            tableConfig.getIndexingConfig() == null || !tableConfig.getIndexingConfig().isAggregateMetrics(),

Review Comment:
   (minor) indexingConfig can never be `null`



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ingestionaggregation/IngestionAggregator.java:
##########
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.utils.ingestionaggregation;
+
+import com.google.common.base.Preconditions;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.request.context.FunctionContext;
+import org.apache.pinot.common.request.context.RequestContextUtils;
+import org.apache.pinot.segment.local.aggregator.ValueAggregator;
+import org.apache.pinot.segment.local.aggregator.ValueAggregatorFactory;
+import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig;
+import org.apache.pinot.segment.local.utils.TableConfigUtils;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.config.table.AggregationConfig;
+
+
+public class IngestionAggregator {
+  private final Map<String, String> _aggregatorColumnNameToMetricColumnName;
+  private final Map<String, ValueAggregator> _aggregatorColumnNameToValueAggregator;
+  private boolean _disabled;
+
+  public static IngestionAggregator fromRealtimeSegmentConfig(RealtimeSegmentConfig segmentConfig) {
+    if (segmentConfig == null || segmentConfig.getIngestionAggregationConfigs() == null
+        || segmentConfig.getIngestionAggregationConfigs().size() == 0) {

Review Comment:
   (minor) Can be simplified, and `segmentConfig` should never be `null`
   ```suggestion
       if (CollectionUtils.isEmpty(segmentConfig.getIngestionAggregationConfigs())) {
   ```



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ingestionaggregation/IngestionAggregator.java:
##########
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.utils.ingestionaggregation;
+
+import com.google.common.base.Preconditions;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.request.context.FunctionContext;
+import org.apache.pinot.common.request.context.RequestContextUtils;
+import org.apache.pinot.segment.local.aggregator.ValueAggregator;
+import org.apache.pinot.segment.local.aggregator.ValueAggregatorFactory;
+import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig;
+import org.apache.pinot.segment.local.utils.TableConfigUtils;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.config.table.AggregationConfig;
+
+
+public class IngestionAggregator {
+  private final Map<String, String> _aggregatorColumnNameToMetricColumnName;
+  private final Map<String, ValueAggregator> _aggregatorColumnNameToValueAggregator;
+  private boolean _disabled;
+
+  public static IngestionAggregator fromRealtimeSegmentConfig(RealtimeSegmentConfig segmentConfig) {
+    if (segmentConfig == null || segmentConfig.getIngestionAggregationConfigs() == null
+        || segmentConfig.getIngestionAggregationConfigs().size() == 0) {
+      return new IngestionAggregator(new HashMap<>(), new HashMap<>());
+    }
+
+    Map<String, String> destColumnToSrcColumn = new HashMap<>();
+    Map<String, ValueAggregator> destColumnToValueAggregators = new HashMap<>();
+
+    for (AggregationConfig config : segmentConfig.getIngestionAggregationConfigs()) {
+      ExpressionContext expressionContext = RequestContextUtils.getExpressionFromSQL(config.getAggregationFunction());

Review Comment:
   This API has changed. Please rebase the latest master



##########
pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/IngestionConfig.java:
##########
@@ -44,18 +45,23 @@ public class IngestionConfig extends BaseJsonConfig {
   @JsonPropertyDescription("Configs related to record transformation functions applied during ingestion")
   private final List<TransformConfig> _transformConfigs;
 
+  @JsonPropertyDescription("Configs related to record aggregation function applied during ingestion")
+  private final List<AggregationConfig> _aggregationConfigs;
+
   @JsonPropertyDescription("Config related to handling complex type")
   private final ComplexTypeConfig _complexTypeConfig;
 
   @JsonCreator
   public IngestionConfig(@JsonProperty("batchIngestionConfig") @Nullable BatchIngestionConfig batchIngestionConfig,
       @JsonProperty("streamIngestionConfig") @Nullable StreamIngestionConfig streamIngestionConfig,
       @JsonProperty("filterConfig") @Nullable FilterConfig filterConfig,
+      @JsonProperty("aggregationConfigs") @Nullable List<AggregationConfig> aggregationConfigs,

Review Comment:
   (minor) Put it as the last argument, same for the assignment and getter



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java:
##########
@@ -363,6 +420,25 @@ public static void validateIngestionConfig(TableConfig tableConfig, @Nullable Sc
     }
   }
 
+  static public void validateIngestionAggregation(String name) {

Review Comment:
   ```suggestion
     public static void validateIngestionAggregation(String name) {
   ```



##########
pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java:
##########
@@ -281,9 +284,10 @@ public void testSerDe()
       Map<String, String> prefixesToRename = new HashMap<>();
       IngestionConfig ingestionConfig =
           new IngestionConfig(new BatchIngestionConfig(batchConfigMaps, "APPEND", "HOURLY"),
-              new StreamIngestionConfig(streamConfigMaps), new FilterConfig("filterFunc(foo)"), transformConfigs,
-              new ComplexTypeConfig(fieldsToUnnest, ".",
-                      ComplexTypeConfig.CollectionNotUnnestedToJson.NON_PRIMITIVE, prefixesToRename));
+              new StreamIngestionConfig(streamConfigMaps), new FilterConfig("filterFunc(foo)"), aggregationConfigs,

Review Comment:
   (code style) Please apply the [Pinot Style](https://docs.pinot.apache.org/developers/developers-and-contributors/code-setup#intellij), and reformat all the changes



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java:
##########
@@ -363,6 +420,25 @@ public static void validateIngestionConfig(TableConfig tableConfig, @Nullable Sc
     }
   }
 
+  static public void validateIngestionAggregation(String name) {
+    /**
+     * Currently only, ValueAggregators with fixed width types are allowed, so MIN, MAX, SUM, and COUNT. The reason
+     * is that only the {@link org.apache.pinot.segment.local.realtime.impl.forward.FixedByteSVMutableForwardIndex}
+     * supports random inserts and lookups. The
+     * {@link org.apache.pinot.segment.local.realtime.impl.forward.VarByteSVMutableForwardIndex only supports
+     * sequential inserts.
+     */

Review Comment:
   Move this javadoc above the method declaration



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ingestionaggregation/IngestionAggregator.java:
##########
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.utils.ingestionaggregation;
+
+import com.google.common.base.Preconditions;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.request.context.FunctionContext;
+import org.apache.pinot.common.request.context.RequestContextUtils;
+import org.apache.pinot.segment.local.aggregator.ValueAggregator;
+import org.apache.pinot.segment.local.aggregator.ValueAggregatorFactory;
+import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig;
+import org.apache.pinot.segment.local.utils.TableConfigUtils;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.config.table.AggregationConfig;
+
+
+public class IngestionAggregator {
+  private final Map<String, String> _aggregatorColumnNameToMetricColumnName;
+  private final Map<String, ValueAggregator> _aggregatorColumnNameToValueAggregator;
+  private boolean _disabled;
+
+  public static IngestionAggregator fromRealtimeSegmentConfig(RealtimeSegmentConfig segmentConfig) {
+    if (segmentConfig == null || segmentConfig.getIngestionAggregationConfigs() == null
+        || segmentConfig.getIngestionAggregationConfigs().size() == 0) {
+      return new IngestionAggregator(new HashMap<>(), new HashMap<>());
+    }
+
+    Map<String, String> destColumnToSrcColumn = new HashMap<>();
+    Map<String, ValueAggregator> destColumnToValueAggregators = new HashMap<>();
+
+    for (AggregationConfig config : segmentConfig.getIngestionAggregationConfigs()) {
+      ExpressionContext expressionContext = RequestContextUtils.getExpressionFromSQL(config.getAggregationFunction());
+
+      // validation is also done when the table is created, this is just a sanity check.
+      Preconditions.checkState(!segmentConfig.aggregateMetrics(),

Review Comment:
   (minor) Move this check out of the for loop



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java:
##########
@@ -306,15 +311,67 @@ public static void validateIngestionConfig(TableConfig tableConfig, @Nullable Sc
         }
       }
 
+      // Aggregation configs
+      List<AggregationConfig> aggregationConfigs = ingestionConfig.getAggregationConfigs();
+      Set<String> aggregationSourceColumns = new HashSet<>();
+      if (aggregationConfigs != null) {
+        Preconditions.checkState(
+            tableConfig.getIndexingConfig() == null || !tableConfig.getIndexingConfig().isAggregateMetrics(),
+            "aggregateMetrics cannot be set with AggregationConfig");
+        Set<String> aggregationColumns = new HashSet<>();
+        for (AggregationConfig aggregationConfig : aggregationConfigs) {
+          String columnName = aggregationConfig.getColumnName();
+          if (schema != null) {
+            Preconditions.checkState(schema.getFieldSpecFor(columnName) != null, "The destination column '" + columnName
+                + "' of the aggregation function must be present in the schema");
+          }
+          String aggregationFunction = aggregationConfig.getAggregationFunction();
+          if (columnName == null || aggregationFunction == null) {
+            throw new IllegalStateException(
+                "columnName/aggregationFunction cannot be null in AggregationConfig " + aggregationConfig);
+          }
+
+          if (!aggregationColumns.add(columnName)) {
+            throw new IllegalStateException("Duplicate aggregation config found for column '" + columnName + "'");
+          }
+          ExpressionContext expressionContext;
+          try {
+            expressionContext = RequestContextUtils.getExpressionFromSQL(aggregationConfig.getAggregationFunction());
+          } catch (Exception e) {
+            throw new IllegalStateException(
+                "Invalid aggregation function '" + aggregationFunction + "' for column '" + columnName + "'", e);
+          }
+          Preconditions.checkState(expressionContext.getType() == ExpressionContext.Type.FUNCTION,
+              "aggregation function must be a function for: %s", aggregationConfig);
+
+          FunctionContext functionContext = expressionContext.getFunction();
+          validateIngestionAggregation(functionContext.getFunctionName());
+          Preconditions.checkState(functionContext.getArguments().size() == 1,
+              "aggregation function can only have one argument: %s", aggregationConfig);
+
+          ExpressionContext argument = functionContext.getArguments().get(0);
+          Preconditions.checkState(argument.getType() == ExpressionContext.Type.IDENTIFIER,
+              "aggregator function argument must be a identifier: %s", aggregationConfig);
+
+          Preconditions.checkState(schema.getFieldSpecFor(argument.getIdentifier()) == null,

Review Comment:
   Ideally we want to support something like `met1 = SUM(met1)` (`src` and `dest` use the same name)



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java:
##########
@@ -306,15 +311,67 @@ public static void validateIngestionConfig(TableConfig tableConfig, @Nullable Sc
         }
       }
 
+      // Aggregation configs
+      List<AggregationConfig> aggregationConfigs = ingestionConfig.getAggregationConfigs();
+      Set<String> aggregationSourceColumns = new HashSet<>();
+      if (aggregationConfigs != null) {
+        Preconditions.checkState(
+            tableConfig.getIndexingConfig() == null || !tableConfig.getIndexingConfig().isAggregateMetrics(),
+            "aggregateMetrics cannot be set with AggregationConfig");
+        Set<String> aggregationColumns = new HashSet<>();

Review Comment:
   We should also check if all metrics are covered. We need to pre-aggregate on all metric columns



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java:
##########
@@ -682,26 +707,43 @@ private void recordIndexingError(String indexType) {
 
   private void aggregateMetrics(GenericRow row, int docId) {
     for (MetricFieldSpec metricFieldSpec : _physicalMetricFieldSpecs) {
-      String column = metricFieldSpec.getName();
+      String column = _ingestionAggregator.getMetricName(metricFieldSpec.getName());
       Object value = row.getValue(column);
-      MutableForwardIndex forwardIndex = _indexContainerMap.get(column)._forwardIndex;
+      MutableForwardIndex forwardIndex = _indexContainerMap.get(metricFieldSpec.getName())._forwardIndex;
       DataType dataType = metricFieldSpec.getDataType();
-      switch (dataType) {
-        case INT:
-          forwardIndex.setInt(docId, (Integer) value + forwardIndex.getInt(docId));
-          break;
-        case LONG:
-          forwardIndex.setLong(docId, (Long) value + forwardIndex.getLong(docId));
-          break;
-        case FLOAT:
-          forwardIndex.setFloat(docId, (Float) value + forwardIndex.getFloat(docId));
-          break;
-        case DOUBLE:
-          forwardIndex.setDouble(docId, (Double) value + forwardIndex.getDouble(docId));
-          break;
-        default:
-          throw new UnsupportedOperationException(
-              "Unsupported data type: " + dataType + " for aggregate metric column: " + column);
+      ValueAggregator valueAggregator = _ingestionAggregator.getAggregator(metricFieldSpec.getName());
+
+      if (valueAggregator != null) {
+        switch (valueAggregator.getAggregatedValueType()) {

Review Comment:
   We should convert the result type to the forward index type if the type does not match. User might want to store `COUNT` in `INT` column in certain cases



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] noon-stripe commented on a diff in pull request #8611: Ingestion Aggregation Feature

Posted by GitBox <gi...@apache.org>.
noon-stripe commented on code in PR #8611:
URL: https://github.com/apache/pinot/pull/8611#discussion_r870779190


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java:
##########
@@ -706,26 +728,74 @@ private void recordIndexingError(String indexType) {
 
   private void aggregateMetrics(GenericRow row, int docId) {
     for (MetricFieldSpec metricFieldSpec : _physicalMetricFieldSpecs) {
-      String column = metricFieldSpec.getName();
+      String column = _ingestionAggregator.getMetricName(metricFieldSpec.getName());

Review Comment:
   The line below is the only place it's used so I just referenced it directly there. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] noon-stripe commented on a diff in pull request #8611: Ingestion Aggregation Feature

Posted by GitBox <gi...@apache.org>.
noon-stripe commented on code in PR #8611:
URL: https://github.com/apache/pinot/pull/8611#discussion_r870778748


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java:
##########
@@ -530,7 +547,7 @@ private void updateDictionary(GenericRow row) {
 
   private void addNewRow(int docId, GenericRow row) {
     for (Map.Entry<String, IndexContainer> entry : _indexContainerMap.entrySet()) {
-      String column = entry.getKey();
+      String column = _ingestionAggregator.getMetricName(entry.getKey());

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] noon-stripe commented on a diff in pull request #8611: Ingestion Aggregation Feature

Posted by GitBox <gi...@apache.org>.
noon-stripe commented on code in PR #8611:
URL: https://github.com/apache/pinot/pull/8611#discussion_r875044425


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java:
##########
@@ -1098,6 +1200,61 @@ void updateMVEntry(int numValuesInMVEntry) {
     }
   }
 
+  static class IngestionAggregator {

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] noon-stripe commented on a diff in pull request #8611: Ingestion Aggregation Feature

Posted by GitBox <gi...@apache.org>.
noon-stripe commented on code in PR #8611:
URL: https://github.com/apache/pinot/pull/8611#discussion_r875045890


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java:
##########
@@ -533,6 +556,37 @@ private void addNewRow(int docId, GenericRow row) {
       String column = entry.getKey();
       IndexContainer indexContainer = entry.getValue();
 
+      // aggregate metrics is enabled.
+      if (indexContainer._valueAggregator != null) {
+        Object value = row.getValue(indexContainer._sourceColumn);
+
+        // Update numValues info
+        indexContainer._numValuesInfo.updateSVEntry();
+
+        MutableForwardIndex forwardIndex = indexContainer._forwardIndex;
+        FieldSpec fieldSpec = indexContainer._fieldSpec;
+
+        DataType dataType = fieldSpec.getDataType();
+        value = indexContainer._valueAggregator.getInitialAggregatedValue(value);
+        switch (dataType.getStoredType()) {
+          case INT:
+            forwardIndex.setInt(docId, ((Number) value).intValue());
+            break;
+          case LONG:
+            forwardIndex.setLong(docId, ((Number) value).longValue());
+            break;
+          case FLOAT:
+            forwardIndex.setFloat(docId, ((Number) value).floatValue());
+            break;
+          case DOUBLE:
+            forwardIndex.setDouble(docId, ((Number) value).doubleValue());
+            break;
+          default:
+            throw new UnsupportedOperationException(
+                "Unsupported data type: " + dataType + " for aggregation: " + column);
+        }

Review Comment:
   yes



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java:
##########
@@ -610,9 +664,13 @@ private void addNewRow(int docId, GenericRow row) {
                   "Unsupported data type: " + dataType + " for no-dictionary column: " + column);
           }
 
+          if (column.equals("*")) {

Review Comment:
   nice catch!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] noon-stripe commented on a diff in pull request #8611: Ingestion Aggregation Feature

Posted by GitBox <gi...@apache.org>.
noon-stripe commented on code in PR #8611:
URL: https://github.com/apache/pinot/pull/8611#discussion_r875071563


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java:
##########
@@ -306,15 +311,71 @@ public static void validateIngestionConfig(TableConfig tableConfig, @Nullable Sc
         }
       }
 
+      // Aggregation configs
+      List<AggregationConfig> aggregationConfigs = ingestionConfig.getAggregationConfigs();
+      Set<String> aggregationSourceColumns = new HashSet<>();
+      if (aggregationConfigs != null) {
+        Preconditions.checkState(
+            !tableConfig.getIndexingConfig().isAggregateMetrics(),
+            "aggregateMetrics cannot be set with AggregationConfig");
+        Set<String> aggregationColumns = new HashSet<>();
+        for (AggregationConfig aggregationConfig : aggregationConfigs) {
+          String columnName = aggregationConfig.getColumnName();
+          if (schema != null) {
+            FieldSpec fieldSpec = schema.getFieldSpecFor(columnName);
+            Preconditions.checkState(fieldSpec != null, "The destination column '" + columnName
+                + "' of the aggregation function must be present in the schema");
+            Preconditions.checkState(fieldSpec.getFieldType() == FieldSpec.FieldType.METRIC,
+                "The destination column '" + columnName + "' of the aggregation function must be a metric column");
+          }
+          String aggregationFunction = aggregationConfig.getAggregationFunction();
+          if (columnName == null || aggregationFunction == null) {
+            throw new IllegalStateException(
+                "columnName/aggregationFunction cannot be null in AggregationConfig " + aggregationConfig);
+          }
+
+          if (!aggregationColumns.add(columnName)) {
+            throw new IllegalStateException("Duplicate aggregation config found for column '" + columnName + "'");
+          }
+          ExpressionContext expressionContext;
+          try {
+            expressionContext = RequestContextUtils.getExpression(aggregationConfig.getAggregationFunction());
+          } catch (Exception e) {
+            throw new IllegalStateException(
+                "Invalid aggregation function '" + aggregationFunction + "' for column '" + columnName + "'", e);
+          }
+          Preconditions.checkState(expressionContext.getType() == ExpressionContext.Type.FUNCTION,
+              "aggregation function must be a function for: %s", aggregationConfig);
+
+          FunctionContext functionContext = expressionContext.getFunction();
+          validateIngestionAggregation(functionContext.getFunctionName());
+          Preconditions.checkState(functionContext.getArguments().size() == 1,
+              "aggregation function can only have one argument: %s", aggregationConfig);
+
+          ExpressionContext argument = functionContext.getArguments().get(0);
+          Preconditions.checkState(argument.getType() == ExpressionContext.Type.IDENTIFIER,
+              "aggregator function argument must be a identifier: %s", aggregationConfig);
+
+          aggregationSourceColumns.add(argument.getIdentifier());
+        }
+        if (schema != null) {
+          Preconditions.checkState(new HashSet<>(schema.getMetricNames()).equals(aggregationColumns),

Review Comment:
   That's a good suggestion, but I actually prefer to have the aggregation config be a requirement. Otherwise, you're altering the data without informing the user. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] noon-stripe commented on a diff in pull request #8611: Ingestion Aggregation Feature

Posted by GitBox <gi...@apache.org>.
noon-stripe commented on code in PR #8611:
URL: https://github.com/apache/pinot/pull/8611#discussion_r869698932


##########
pinot-spi/src/main/java/org/apache/pinot/spi/config/table/AggregationConfig.java:
##########
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.config.table;

Review Comment:
   done



##########
pinot-common/src/main/java/org/apache/pinot/common/utils/grpc/GrpcRequestBuilder.java:
##########
@@ -31,7 +31,6 @@
 import org.apache.thrift.TSerializer;
 import org.apache.thrift.protocol.TCompactProtocol;
 
-

Review Comment:
   done



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ingestionaggregation/IngestionAggregator.java:
##########
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.utils.ingestionaggregation;
+
+import com.google.common.base.Preconditions;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.request.context.FunctionContext;
+import org.apache.pinot.common.request.context.RequestContextUtils;
+import org.apache.pinot.segment.local.aggregator.ValueAggregator;
+import org.apache.pinot.segment.local.aggregator.ValueAggregatorFactory;
+import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig;
+import org.apache.pinot.segment.local.utils.TableConfigUtils;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.config.table.AggregationConfig;
+
+
+public class IngestionAggregator {
+  private final Map<String, String> _aggregatorColumnNameToMetricColumnName;
+  private final Map<String, ValueAggregator> _aggregatorColumnNameToValueAggregator;
+  private boolean _disabled;
+
+  public static IngestionAggregator fromRealtimeSegmentConfig(RealtimeSegmentConfig segmentConfig) {
+    if (segmentConfig == null || segmentConfig.getIngestionAggregationConfigs() == null
+        || segmentConfig.getIngestionAggregationConfigs().size() == 0) {
+      return new IngestionAggregator(new HashMap<>(), new HashMap<>());
+    }
+
+    Map<String, String> destColumnToSrcColumn = new HashMap<>();
+    Map<String, ValueAggregator> destColumnToValueAggregators = new HashMap<>();
+
+    for (AggregationConfig config : segmentConfig.getIngestionAggregationConfigs()) {
+      ExpressionContext expressionContext = RequestContextUtils.getExpressionFromSQL(config.getAggregationFunction());
+
+      // validation is also done when the table is created, this is just a sanity check.
+      Preconditions.checkState(!segmentConfig.aggregateMetrics(),

Review Comment:
   done



##########
pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/IngestionConfig.java:
##########
@@ -44,18 +45,23 @@ public class IngestionConfig extends BaseJsonConfig {
   @JsonPropertyDescription("Configs related to record transformation functions applied during ingestion")
   private final List<TransformConfig> _transformConfigs;
 
+  @JsonPropertyDescription("Configs related to record aggregation function applied during ingestion")
+  private final List<AggregationConfig> _aggregationConfigs;

Review Comment:
   done



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java:
##########
@@ -306,15 +311,67 @@ public static void validateIngestionConfig(TableConfig tableConfig, @Nullable Sc
         }
       }
 
+      // Aggregation configs
+      List<AggregationConfig> aggregationConfigs = ingestionConfig.getAggregationConfigs();
+      Set<String> aggregationSourceColumns = new HashSet<>();
+      if (aggregationConfigs != null) {
+        Preconditions.checkState(
+            tableConfig.getIndexingConfig() == null || !tableConfig.getIndexingConfig().isAggregateMetrics(),
+            "aggregateMetrics cannot be set with AggregationConfig");
+        Set<String> aggregationColumns = new HashSet<>();
+        for (AggregationConfig aggregationConfig : aggregationConfigs) {
+          String columnName = aggregationConfig.getColumnName();
+          if (schema != null) {
+            Preconditions.checkState(schema.getFieldSpecFor(columnName) != null, "The destination column '" + columnName
+                + "' of the aggregation function must be present in the schema");
+          }
+          String aggregationFunction = aggregationConfig.getAggregationFunction();
+          if (columnName == null || aggregationFunction == null) {
+            throw new IllegalStateException(
+                "columnName/aggregationFunction cannot be null in AggregationConfig " + aggregationConfig);
+          }
+
+          if (!aggregationColumns.add(columnName)) {
+            throw new IllegalStateException("Duplicate aggregation config found for column '" + columnName + "'");
+          }
+          ExpressionContext expressionContext;
+          try {
+            expressionContext = RequestContextUtils.getExpressionFromSQL(aggregationConfig.getAggregationFunction());
+          } catch (Exception e) {
+            throw new IllegalStateException(
+                "Invalid aggregation function '" + aggregationFunction + "' for column '" + columnName + "'", e);
+          }
+          Preconditions.checkState(expressionContext.getType() == ExpressionContext.Type.FUNCTION,
+              "aggregation function must be a function for: %s", aggregationConfig);
+
+          FunctionContext functionContext = expressionContext.getFunction();
+          validateIngestionAggregation(functionContext.getFunctionName());
+          Preconditions.checkState(functionContext.getArguments().size() == 1,
+              "aggregation function can only have one argument: %s", aggregationConfig);
+
+          ExpressionContext argument = functionContext.getArguments().get(0);
+          Preconditions.checkState(argument.getType() == ExpressionContext.Type.IDENTIFIER,
+              "aggregator function argument must be a identifier: %s", aggregationConfig);
+
+          Preconditions.checkState(schema.getFieldSpecFor(argument.getIdentifier()) == null,

Review Comment:
   fixed



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ingestionaggregation/IngestionAggregator.java:
##########
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.utils.ingestionaggregation;
+
+import com.google.common.base.Preconditions;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.request.context.FunctionContext;
+import org.apache.pinot.common.request.context.RequestContextUtils;
+import org.apache.pinot.segment.local.aggregator.ValueAggregator;
+import org.apache.pinot.segment.local.aggregator.ValueAggregatorFactory;
+import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig;
+import org.apache.pinot.segment.local.utils.TableConfigUtils;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.config.table.AggregationConfig;
+
+
+public class IngestionAggregator {
+  private final Map<String, String> _aggregatorColumnNameToMetricColumnName;
+  private final Map<String, ValueAggregator> _aggregatorColumnNameToValueAggregator;
+  private boolean _disabled;
+
+  public static IngestionAggregator fromRealtimeSegmentConfig(RealtimeSegmentConfig segmentConfig) {
+    if (segmentConfig == null || segmentConfig.getIngestionAggregationConfigs() == null
+        || segmentConfig.getIngestionAggregationConfigs().size() == 0) {

Review Comment:
   done



##########
pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/IngestionConfig.java:
##########
@@ -44,18 +45,23 @@ public class IngestionConfig extends BaseJsonConfig {
   @JsonPropertyDescription("Configs related to record transformation functions applied during ingestion")
   private final List<TransformConfig> _transformConfigs;
 
+  @JsonPropertyDescription("Configs related to record aggregation function applied during ingestion")
+  private final List<AggregationConfig> _aggregationConfigs;
+
   @JsonPropertyDescription("Config related to handling complex type")
   private final ComplexTypeConfig _complexTypeConfig;
 
   @JsonCreator
   public IngestionConfig(@JsonProperty("batchIngestionConfig") @Nullable BatchIngestionConfig batchIngestionConfig,
       @JsonProperty("streamIngestionConfig") @Nullable StreamIngestionConfig streamIngestionConfig,
       @JsonProperty("filterConfig") @Nullable FilterConfig filterConfig,
+      @JsonProperty("aggregationConfigs") @Nullable List<AggregationConfig> aggregationConfigs,

Review Comment:
   done



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java:
##########
@@ -363,6 +420,25 @@ public static void validateIngestionConfig(TableConfig tableConfig, @Nullable Sc
     }
   }
 
+  static public void validateIngestionAggregation(String name) {
+    /**
+     * Currently only, ValueAggregators with fixed width types are allowed, so MIN, MAX, SUM, and COUNT. The reason
+     * is that only the {@link org.apache.pinot.segment.local.realtime.impl.forward.FixedByteSVMutableForwardIndex}
+     * supports random inserts and lookups. The
+     * {@link org.apache.pinot.segment.local.realtime.impl.forward.VarByteSVMutableForwardIndex only supports
+     * sequential inserts.
+     */

Review Comment:
   done



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java:
##########
@@ -306,15 +311,67 @@ public static void validateIngestionConfig(TableConfig tableConfig, @Nullable Sc
         }
       }
 
+      // Aggregation configs
+      List<AggregationConfig> aggregationConfigs = ingestionConfig.getAggregationConfigs();
+      Set<String> aggregationSourceColumns = new HashSet<>();
+      if (aggregationConfigs != null) {
+        Preconditions.checkState(
+            tableConfig.getIndexingConfig() == null || !tableConfig.getIndexingConfig().isAggregateMetrics(),
+            "aggregateMetrics cannot be set with AggregationConfig");
+        Set<String> aggregationColumns = new HashSet<>();

Review Comment:
   done, added test



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ingestionaggregation/IngestionAggregator.java:
##########
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.utils.ingestionaggregation;
+
+import com.google.common.base.Preconditions;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.request.context.FunctionContext;
+import org.apache.pinot.common.request.context.RequestContextUtils;
+import org.apache.pinot.segment.local.aggregator.ValueAggregator;
+import org.apache.pinot.segment.local.aggregator.ValueAggregatorFactory;
+import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig;
+import org.apache.pinot.segment.local.utils.TableConfigUtils;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.config.table.AggregationConfig;
+
+
+public class IngestionAggregator {
+  private final Map<String, String> _aggregatorColumnNameToMetricColumnName;
+  private final Map<String, ValueAggregator> _aggregatorColumnNameToValueAggregator;
+  private boolean _disabled;
+
+  public static IngestionAggregator fromRealtimeSegmentConfig(RealtimeSegmentConfig segmentConfig) {
+    if (segmentConfig == null || segmentConfig.getIngestionAggregationConfigs() == null
+        || segmentConfig.getIngestionAggregationConfigs().size() == 0) {
+      return new IngestionAggregator(new HashMap<>(), new HashMap<>());
+    }
+
+    Map<String, String> destColumnToSrcColumn = new HashMap<>();
+    Map<String, ValueAggregator> destColumnToValueAggregators = new HashMap<>();
+
+    for (AggregationConfig config : segmentConfig.getIngestionAggregationConfigs()) {
+      ExpressionContext expressionContext = RequestContextUtils.getExpressionFromSQL(config.getAggregationFunction());

Review Comment:
   done



##########
pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java:
##########
@@ -281,9 +284,10 @@ public void testSerDe()
       Map<String, String> prefixesToRename = new HashMap<>();
       IngestionConfig ingestionConfig =
           new IngestionConfig(new BatchIngestionConfig(batchConfigMaps, "APPEND", "HOURLY"),
-              new StreamIngestionConfig(streamConfigMaps), new FilterConfig("filterFunc(foo)"), transformConfigs,
-              new ComplexTypeConfig(fieldsToUnnest, ".",
-                      ComplexTypeConfig.CollectionNotUnnestedToJson.NON_PRIMITIVE, prefixesToRename));
+              new StreamIngestionConfig(streamConfigMaps), new FilterConfig("filterFunc(foo)"), aggregationConfigs,

Review Comment:
   done



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java:
##########
@@ -306,15 +311,67 @@ public static void validateIngestionConfig(TableConfig tableConfig, @Nullable Sc
         }
       }
 
+      // Aggregation configs
+      List<AggregationConfig> aggregationConfigs = ingestionConfig.getAggregationConfigs();
+      Set<String> aggregationSourceColumns = new HashSet<>();
+      if (aggregationConfigs != null) {
+        Preconditions.checkState(
+            tableConfig.getIndexingConfig() == null || !tableConfig.getIndexingConfig().isAggregateMetrics(),

Review Comment:
   done



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java:
##########
@@ -363,6 +420,25 @@ public static void validateIngestionConfig(TableConfig tableConfig, @Nullable Sc
     }
   }
 
+  static public void validateIngestionAggregation(String name) {

Review Comment:
   done



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java:
##########
@@ -1058,6 +1105,10 @@ private IdMap<FixedIntArray> enableMetricsAggregationIfPossible(RealtimeSegmentC
         RECORD_ID_MAP);
   }
 
+  private boolean isAggregateMetricsEnabled() {
+    return _aggregateMetrics || _ingestionAggregator.isEnabled();

Review Comment:
   done



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java:
##########
@@ -682,26 +707,43 @@ private void recordIndexingError(String indexType) {
 
   private void aggregateMetrics(GenericRow row, int docId) {
     for (MetricFieldSpec metricFieldSpec : _physicalMetricFieldSpecs) {
-      String column = metricFieldSpec.getName();
+      String column = _ingestionAggregator.getMetricName(metricFieldSpec.getName());
       Object value = row.getValue(column);
-      MutableForwardIndex forwardIndex = _indexContainerMap.get(column)._forwardIndex;
+      MutableForwardIndex forwardIndex = _indexContainerMap.get(metricFieldSpec.getName())._forwardIndex;
       DataType dataType = metricFieldSpec.getDataType();
-      switch (dataType) {
-        case INT:
-          forwardIndex.setInt(docId, (Integer) value + forwardIndex.getInt(docId));
-          break;
-        case LONG:
-          forwardIndex.setLong(docId, (Long) value + forwardIndex.getLong(docId));
-          break;
-        case FLOAT:
-          forwardIndex.setFloat(docId, (Float) value + forwardIndex.getFloat(docId));
-          break;
-        case DOUBLE:
-          forwardIndex.setDouble(docId, (Double) value + forwardIndex.getDouble(docId));
-          break;
-        default:
-          throw new UnsupportedOperationException(
-              "Unsupported data type: " + dataType + " for aggregate metric column: " + column);
+      ValueAggregator valueAggregator = _ingestionAggregator.getAggregator(metricFieldSpec.getName());
+
+      if (valueAggregator != null) {
+        switch (valueAggregator.getAggregatedValueType()) {

Review Comment:
   done



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java:
##########
@@ -306,15 +311,67 @@ public static void validateIngestionConfig(TableConfig tableConfig, @Nullable Sc
         }
       }
 
+      // Aggregation configs
+      List<AggregationConfig> aggregationConfigs = ingestionConfig.getAggregationConfigs();
+      Set<String> aggregationSourceColumns = new HashSet<>();
+      if (aggregationConfigs != null) {
+        Preconditions.checkState(
+            tableConfig.getIndexingConfig() == null || !tableConfig.getIndexingConfig().isAggregateMetrics(),
+            "aggregateMetrics cannot be set with AggregationConfig");
+        Set<String> aggregationColumns = new HashSet<>();
+        for (AggregationConfig aggregationConfig : aggregationConfigs) {
+          String columnName = aggregationConfig.getColumnName();
+          if (schema != null) {
+            Preconditions.checkState(schema.getFieldSpecFor(columnName) != null, "The destination column '" + columnName

Review Comment:
   done and added test



##########
pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTestUtils.java:
##########
@@ -40,7 +42,24 @@ private MutableSegmentImplTestUtils() {
 
   private static final String TABLE_NAME_WITH_TYPE = "testTable_REALTIME";
   private static final String SEGMENT_NAME = "testSegment__0__0__155555";
-  private static final String STEAM_NAME = "testStream";
+  private static final String STREAM_NAME = "testStream";
+
+  public static MutableSegmentImpl createMutableSegmentImpl(Schema schema, Set<String> noDictionaryColumns,

Review Comment:
   done



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ingestionaggregation/IngestionAggregator.java:
##########
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.utils.ingestionaggregation;
+
+import com.google.common.base.Preconditions;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.request.context.FunctionContext;
+import org.apache.pinot.common.request.context.RequestContextUtils;
+import org.apache.pinot.segment.local.aggregator.ValueAggregator;
+import org.apache.pinot.segment.local.aggregator.ValueAggregatorFactory;
+import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig;
+import org.apache.pinot.segment.local.utils.TableConfigUtils;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.config.table.AggregationConfig;
+
+
+public class IngestionAggregator {
+  private final Map<String, String> _aggregatorColumnNameToMetricColumnName;
+  private final Map<String, ValueAggregator> _aggregatorColumnNameToValueAggregator;
+  private boolean _disabled;
+
+  public static IngestionAggregator fromRealtimeSegmentConfig(RealtimeSegmentConfig segmentConfig) {
+    if (segmentConfig == null || segmentConfig.getIngestionAggregationConfigs() == null
+        || segmentConfig.getIngestionAggregationConfigs().size() == 0) {
+      return new IngestionAggregator(new HashMap<>(), new HashMap<>());
+    }
+
+    Map<String, String> destColumnToSrcColumn = new HashMap<>();

Review Comment:
   done



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ingestionaggregation/IngestionAggregator.java:
##########
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.utils.ingestionaggregation;
+
+import com.google.common.base.Preconditions;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.request.context.FunctionContext;
+import org.apache.pinot.common.request.context.RequestContextUtils;
+import org.apache.pinot.segment.local.aggregator.ValueAggregator;
+import org.apache.pinot.segment.local.aggregator.ValueAggregatorFactory;
+import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig;
+import org.apache.pinot.segment.local.utils.TableConfigUtils;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.config.table.AggregationConfig;
+
+
+public class IngestionAggregator {
+  private final Map<String, String> _aggregatorColumnNameToMetricColumnName;
+  private final Map<String, ValueAggregator> _aggregatorColumnNameToValueAggregator;
+  private boolean _disabled;
+
+  public static IngestionAggregator fromRealtimeSegmentConfig(RealtimeSegmentConfig segmentConfig) {
+    if (segmentConfig == null || segmentConfig.getIngestionAggregationConfigs() == null
+        || segmentConfig.getIngestionAggregationConfigs().size() == 0) {
+      return new IngestionAggregator(new HashMap<>(), new HashMap<>());

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] Jackie-Jiang commented on a diff in pull request #8611: Ingestion Aggregation Feature

Posted by GitBox <gi...@apache.org>.
Jackie-Jiang commented on code in PR #8611:
URL: https://github.com/apache/pinot/pull/8611#discussion_r870638029


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java:
##########
@@ -196,7 +198,7 @@ public long getLatestIngestionTimestamp() {
     _partitionColumn = config.getPartitionColumn();
     _partitionFunction = config.getPartitionFunction();
     _nullHandlingEnabled = config.isNullHandlingEnabled();
-    _aggregateMetrics = config.aggregateMetrics();
+    _ingestionAggregator = IngestionAggregator.fromRealtimeSegmentConfig(config);

Review Comment:
   Let's initiate `_ingestionAggregator` only when `_recordIdMap` is set. We want to perform the checks before setting it up



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java:
##########
@@ -506,13 +510,26 @@ private PartitionUpsertMetadataManager.RecordInfo getRecordInfo(GenericRow row,
     return new PartitionUpsertMetadataManager.RecordInfo(primaryKey, docId, (Comparable) upsertComparisonValue);
   }
 
+  /**
+   *
+   * This helper function checks if the value is null, which is only applicable if the column isn't "*",
+   * which is a side effect of metrics aggregation "COUNT" operation.
+   *
+   * @param column
+   * @param value
+   * @return is the value null if the column is an actual data column (not "*").
+   */
+  private boolean isColumnValueNull(String column, Object value) {
+    return !column.equals("*") && value == null;
+  }
+
   private void updateDictionary(GenericRow row) {
     for (Map.Entry<String, IndexContainer> entry : _indexContainerMap.entrySet()) {
-      String column = entry.getKey();
+      String column = _ingestionAggregator.getMetricName(entry.getKey());

Review Comment:
   This is not required. All the metrics must be raw (non-dictionary-encoded). We should check `dictionary != null` before the value read and check



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java:
##########
@@ -506,13 +510,26 @@ private PartitionUpsertMetadataManager.RecordInfo getRecordInfo(GenericRow row,
     return new PartitionUpsertMetadataManager.RecordInfo(primaryKey, docId, (Comparable) upsertComparisonValue);
   }
 
+  /**
+   *

Review Comment:
   (nit) Remove this line



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java:
##########
@@ -706,26 +728,74 @@ private void recordIndexingError(String indexType) {
 
   private void aggregateMetrics(GenericRow row, int docId) {
     for (MetricFieldSpec metricFieldSpec : _physicalMetricFieldSpecs) {
-      String column = metricFieldSpec.getName();
+      String column = _ingestionAggregator.getMetricName(metricFieldSpec.getName());

Review Comment:
   Suggest renaming this to `sourceColumn`, and keep the current `column`



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java:
##########
@@ -1005,7 +1075,7 @@ private int getOrCreateDocId() {
    */
   private IdMap<FixedIntArray> enableMetricsAggregationIfPossible(RealtimeSegmentConfig config,

Review Comment:
   We can remove the return value and set both `_recordIdMap` and `_ingestionAggregator` in this method



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java:
##########
@@ -815,7 +885,7 @@ public GenericRow getRecord(int docId, GenericRow reuse) {
    * Helper method to read the value for the given document id.
    */
   private static Object getValue(int docId, MutableForwardIndex forwardIndex, @Nullable MutableDictionary dictionary,
-      int maxNumMultiValues) {
+      int maxNumMultiValues, @Nullable ValueAggregator valueAggregator) {

Review Comment:
   `valueAggregator` is not needed



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/IngestionUtils.java:
##########
@@ -360,6 +364,19 @@ private static void extractFieldsFromIngestionConfig(@Nullable IngestionConfig i
           fields.addAll(functionEvaluator.getArguments());
         }
       }
+      List<AggregationConfig> aggregationConfigs = ingestionConfig.getAggregationConfigs();
+      if (aggregationConfigs != null) {
+        for (AggregationConfig aggregationConfig : aggregationConfigs) {
+          ExpressionContext expressionContext =
+              RequestContextUtils.getExpression(aggregationConfig.getAggregationFunction());
+          FunctionContext functionContext = expressionContext.getFunction();
+          if (functionContext != null) {
+            for (ExpressionContext argument : functionContext.getArguments()) {
+              fields.add(argument.getIdentifier());
+            }
+          }

Review Comment:
   This can handle identifier and nested functions
   ```suggestion
             expressionContext.getColumns(fields);
   ```



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java:
##########
@@ -586,18 +603,19 @@ private void addNewRow(int docId, GenericRow row) {
 
           // Update forward index
           DataType dataType = fieldSpec.getDataType();
+          value = column.equals("*") ? 1 : value;

Review Comment:
   For aggregated metrics, we should set the value to `ValueAggregator.getInitialAggregatedValue(value)`



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java:
##########
@@ -530,7 +547,7 @@ private void updateDictionary(GenericRow row) {
 
   private void addNewRow(int docId, GenericRow row) {
     for (Map.Entry<String, IndexContainer> entry : _indexContainerMap.entrySet()) {
-      String column = entry.getKey();
+      String column = _ingestionAggregator.getMetricName(entry.getKey());

Review Comment:
   Suggest splitting the logic of aggregated metric and regular column. Mixing them can complicate the handling logic, and potentially leads to bugs



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] noon-stripe commented on a diff in pull request #8611: Ingestion Aggregation Feature

Posted by GitBox <gi...@apache.org>.
noon-stripe commented on code in PR #8611:
URL: https://github.com/apache/pinot/pull/8611#discussion_r870766593


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java:
##########
@@ -815,7 +885,7 @@ public GenericRow getRecord(int docId, GenericRow reuse) {
    * Helper method to read the value for the given document id.
    */
   private static Object getValue(int docId, MutableForwardIndex forwardIndex, @Nullable MutableDictionary dictionary,
-      int maxNumMultiValues) {
+      int maxNumMultiValues, @Nullable ValueAggregator valueAggregator) {

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] noon-stripe commented on a diff in pull request #8611: Ingestion Aggregation Feature

Posted by GitBox <gi...@apache.org>.
noon-stripe commented on code in PR #8611:
URL: https://github.com/apache/pinot/pull/8611#discussion_r870752986


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java:
##########
@@ -196,7 +198,7 @@ public long getLatestIngestionTimestamp() {
     _partitionColumn = config.getPartitionColumn();
     _partitionFunction = config.getPartitionFunction();
     _nullHandlingEnabled = config.isNullHandlingEnabled();
-    _aggregateMetrics = config.aggregateMetrics();
+    _ingestionAggregator = IngestionAggregator.fromRealtimeSegmentConfig(config);

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] Jackie-Jiang commented on a diff in pull request #8611: Ingestion Aggregation Feature

Posted by GitBox <gi...@apache.org>.
Jackie-Jiang commented on code in PR #8611:
URL: https://github.com/apache/pinot/pull/8611#discussion_r872883391


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java:
##########
@@ -20,6 +20,7 @@
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Sets;
+import groovy.lang.Tuple2;

Review Comment:
   (minor) Let's use `org.apache.commons.lang3.tuple.Pair`



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java:
##########
@@ -336,10 +353,15 @@ public long getLatestIngestionTimestamp() {
       // Null value vector
       MutableNullValueVector nullValueVector = _nullHandlingEnabled ? new MutableNullValueVector() : null;
 
+      String sourceColumn = metricsAggregators.containsKey(column) ? metricsAggregators.get(column).getFirst() : column;
+      ValueAggregator valueAggregator =
+          metricsAggregators.containsKey(column) ? metricsAggregators.get(column).getSecond() : null;

Review Comment:
   Reduce the map lookups
   ```suggestion
         Pair<String, ValueAggregator> columnAggregatorPair = metricsAggregators.get(column);
         String sourceColumn = columnAggregatorPair != null ? columnAggregatorPair.getLeft() : null;
         ValueAggregator valueAggregator = columnAggregatorPair != null ? columnAggregatorPair.getRight() : null;
   ```



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java:
##########
@@ -610,9 +664,13 @@ private void addNewRow(int docId, GenericRow row) {
                   "Unsupported data type: " + dataType + " for no-dictionary column: " + column);
           }
 
+          if (column.equals("*")) {

Review Comment:
   We should never hit this branch



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java:
##########
@@ -1125,7 +1284,8 @@ private class IndexContainer implements Closeable {
         @Nullable MutableDictionary dictionary, @Nullable MutableInvertedIndex invertedIndex,
         @Nullable RangeIndexReader rangeIndex, @Nullable MutableTextIndex textIndex,
         @Nullable MutableJsonIndex jsonIndex, @Nullable MutableH3Index h3Index, @Nullable BloomFilterReader bloomFilter,
-        @Nullable MutableNullValueVector nullValueVector) {
+        @Nullable MutableNullValueVector nullValueVector, String sourceColumn,

Review Comment:
   (minor)
   ```suggestion
           @Nullable MutableNullValueVector nullValueVector, @Nullable String sourceColumn,
   ```



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java:
##########
@@ -306,15 +311,71 @@ public static void validateIngestionConfig(TableConfig tableConfig, @Nullable Sc
         }
       }
 
+      // Aggregation configs
+      List<AggregationConfig> aggregationConfigs = ingestionConfig.getAggregationConfigs();
+      Set<String> aggregationSourceColumns = new HashSet<>();
+      if (aggregationConfigs != null) {
+        Preconditions.checkState(
+            !tableConfig.getIndexingConfig().isAggregateMetrics(),
+            "aggregateMetrics cannot be set with AggregationConfig");
+        Set<String> aggregationColumns = new HashSet<>();
+        for (AggregationConfig aggregationConfig : aggregationConfigs) {
+          String columnName = aggregationConfig.getColumnName();
+          if (schema != null) {

Review Comment:
   Move this check after the null value check, or it will throw NPE



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java:
##########
@@ -306,15 +311,71 @@ public static void validateIngestionConfig(TableConfig tableConfig, @Nullable Sc
         }
       }
 
+      // Aggregation configs
+      List<AggregationConfig> aggregationConfigs = ingestionConfig.getAggregationConfigs();
+      Set<String> aggregationSourceColumns = new HashSet<>();
+      if (aggregationConfigs != null) {
+        Preconditions.checkState(
+            !tableConfig.getIndexingConfig().isAggregateMetrics(),
+            "aggregateMetrics cannot be set with AggregationConfig");
+        Set<String> aggregationColumns = new HashSet<>();
+        for (AggregationConfig aggregationConfig : aggregationConfigs) {
+          String columnName = aggregationConfig.getColumnName();
+          if (schema != null) {
+            FieldSpec fieldSpec = schema.getFieldSpecFor(columnName);
+            Preconditions.checkState(fieldSpec != null, "The destination column '" + columnName
+                + "' of the aggregation function must be present in the schema");
+            Preconditions.checkState(fieldSpec.getFieldType() == FieldSpec.FieldType.METRIC,
+                "The destination column '" + columnName + "' of the aggregation function must be a metric column");
+          }
+          String aggregationFunction = aggregationConfig.getAggregationFunction();
+          if (columnName == null || aggregationFunction == null) {
+            throw new IllegalStateException(
+                "columnName/aggregationFunction cannot be null in AggregationConfig " + aggregationConfig);
+          }
+
+          if (!aggregationColumns.add(columnName)) {
+            throw new IllegalStateException("Duplicate aggregation config found for column '" + columnName + "'");
+          }
+          ExpressionContext expressionContext;
+          try {
+            expressionContext = RequestContextUtils.getExpression(aggregationConfig.getAggregationFunction());
+          } catch (Exception e) {
+            throw new IllegalStateException(
+                "Invalid aggregation function '" + aggregationFunction + "' for column '" + columnName + "'", e);
+          }
+          Preconditions.checkState(expressionContext.getType() == ExpressionContext.Type.FUNCTION,
+              "aggregation function must be a function for: %s", aggregationConfig);
+
+          FunctionContext functionContext = expressionContext.getFunction();
+          validateIngestionAggregation(functionContext.getFunctionName());
+          Preconditions.checkState(functionContext.getArguments().size() == 1,
+              "aggregation function can only have one argument: %s", aggregationConfig);
+
+          ExpressionContext argument = functionContext.getArguments().get(0);
+          Preconditions.checkState(argument.getType() == ExpressionContext.Type.IDENTIFIER,
+              "aggregator function argument must be a identifier: %s", aggregationConfig);
+
+          aggregationSourceColumns.add(argument.getIdentifier());
+        }
+        if (schema != null) {
+          Preconditions.checkState(new HashSet<>(schema.getMetricNames()).equals(aggregationColumns),

Review Comment:
   Should we consider using `SUM` on metric itself as the default if not configured?



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java:
##########
@@ -533,6 +556,37 @@ private void addNewRow(int docId, GenericRow row) {
       String column = entry.getKey();
       IndexContainer indexContainer = entry.getValue();
 
+      // aggregate metrics is enabled.
+      if (indexContainer._valueAggregator != null) {
+        Object value = row.getValue(indexContainer._sourceColumn);
+
+        // Update numValues info
+        indexContainer._numValuesInfo.updateSVEntry();
+
+        MutableForwardIndex forwardIndex = indexContainer._forwardIndex;
+        FieldSpec fieldSpec = indexContainer._fieldSpec;
+
+        DataType dataType = fieldSpec.getDataType();
+        value = indexContainer._valueAggregator.getInitialAggregatedValue(value);
+        switch (dataType.getStoredType()) {
+          case INT:
+            forwardIndex.setInt(docId, ((Number) value).intValue());
+            break;
+          case LONG:
+            forwardIndex.setLong(docId, ((Number) value).longValue());
+            break;
+          case FLOAT:
+            forwardIndex.setFloat(docId, ((Number) value).floatValue());
+            break;
+          case DOUBLE:
+            forwardIndex.setDouble(docId, ((Number) value).doubleValue());
+            break;
+          default:
+            throw new UnsupportedOperationException(
+                "Unsupported data type: " + dataType + " for aggregation: " + column);
+        }

Review Comment:
   Should we `continue` here?



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java:
##########
@@ -363,6 +424,25 @@ public static void validateIngestionConfig(TableConfig tableConfig, @Nullable Sc
     }
   }
 
+  /**
+   * Currently only, ValueAggregators with fixed width types are allowed, so MIN, MAX, SUM, and COUNT. The reason
+   * is that only the {@link org.apache.pinot.segment.local.realtime.impl.forward.FixedByteSVMutableForwardIndex}
+   * supports random inserts and lookups. The
+   * {@link org.apache.pinot.segment.local.realtime.impl.forward.VarByteSVMutableForwardIndex only supports
+   * sequential inserts.
+   */
+  public static void validateIngestionAggregation(String name) {
+    List<AggregationFunctionType> allowed =

Review Comment:
   Put this as a constant `EnumSet` (`private static final EnumSet<AggregationFunctionType> SUPPORTED_INGESTION_AGGREGATIONS`)



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java:
##########
@@ -306,15 +311,71 @@ public static void validateIngestionConfig(TableConfig tableConfig, @Nullable Sc
         }
       }
 
+      // Aggregation configs
+      List<AggregationConfig> aggregationConfigs = ingestionConfig.getAggregationConfigs();
+      Set<String> aggregationSourceColumns = new HashSet<>();
+      if (aggregationConfigs != null) {

Review Comment:
   To be consistent with the check in mutable segment
   ```suggestion
         if (!CollectionUtils.isEmpty(aggregationConfigs)) {
   ```



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java:
##########
@@ -1098,6 +1200,61 @@ void updateMVEntry(int numValuesInMVEntry) {
     }
   }
 
+  static class IngestionAggregator {

Review Comment:
   (minor) Don't see much value on this inner util class. Suggest removing this class and change the method name to `getMetricsAggregators()`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] Jackie-Jiang commented on a diff in pull request #8611: Ingestion Aggregation Feature

Posted by GitBox <gi...@apache.org>.
Jackie-Jiang commented on code in PR #8611:
URL: https://github.com/apache/pinot/pull/8611#discussion_r875301392


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java:
##########
@@ -306,15 +311,71 @@ public static void validateIngestionConfig(TableConfig tableConfig, @Nullable Sc
         }
       }
 
+      // Aggregation configs
+      List<AggregationConfig> aggregationConfigs = ingestionConfig.getAggregationConfigs();
+      Set<String> aggregationSourceColumns = new HashSet<>();
+      if (aggregationConfigs != null) {
+        Preconditions.checkState(
+            !tableConfig.getIndexingConfig().isAggregateMetrics(),
+            "aggregateMetrics cannot be set with AggregationConfig");
+        Set<String> aggregationColumns = new HashSet<>();
+        for (AggregationConfig aggregationConfig : aggregationConfigs) {
+          String columnName = aggregationConfig.getColumnName();
+          if (schema != null) {

Review Comment:
   Move this block before the field spec check:
   ```
             String aggregationFunction = aggregationConfig.getAggregationFunction();
             if (columnName == null || aggregationFunction == null) {
               throw new IllegalStateException(
                   "columnName/aggregationFunction cannot be null in AggregationConfig " + aggregationConfig);
             }
   ```
   Without checking the `columnName` is not null, `schema.getFieldSpecFor(columnName)` could throw NPE



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] noon-stripe commented on a diff in pull request #8611: Ingestion Aggregation Feature

Posted by GitBox <gi...@apache.org>.
noon-stripe commented on code in PR #8611:
URL: https://github.com/apache/pinot/pull/8611#discussion_r875044235


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java:
##########
@@ -1125,7 +1284,8 @@ private class IndexContainer implements Closeable {
         @Nullable MutableDictionary dictionary, @Nullable MutableInvertedIndex invertedIndex,
         @Nullable RangeIndexReader rangeIndex, @Nullable MutableTextIndex textIndex,
         @Nullable MutableJsonIndex jsonIndex, @Nullable MutableH3Index h3Index, @Nullable BloomFilterReader bloomFilter,
-        @Nullable MutableNullValueVector nullValueVector) {
+        @Nullable MutableNullValueVector nullValueVector, String sourceColumn,

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org