You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2023/07/20 22:05:37 UTC

[pinot] branch master updated: Allow extra aggregation types in RealtimeToOfflineSegmentsTask (#10982)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 723b764bc9 Allow extra aggregation types in RealtimeToOfflineSegmentsTask (#10982)
723b764bc9 is described below

commit 723b764bc91275c0b8361d3f9135f151b6404c39
Author: Andi Miller <an...@andimiller.net>
AuthorDate: Thu Jul 20 23:05:30 2023 +0100

    Allow extra aggregation types in RealtimeToOfflineSegmentsTask (#10982)
---
 .../aggregator/ValueAggregatorFactory.java         |  3 +++
 .../segment/local/utils/TableConfigUtils.java      | 24 ++++++++++++++++++----
 .../segment/local/utils/TableConfigUtilsTest.java  | 21 +++++++++++++++++++
 3 files changed, 44 insertions(+), 4 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/ValueAggregatorFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/ValueAggregatorFactory.java
index 4cd5a1ea6d..cd5388870d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/ValueAggregatorFactory.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/ValueAggregatorFactory.java
@@ -32,6 +32,9 @@ public class ValueAggregatorFactory {
 
   /**
    * Constructs a ValueAggregator from the given aggregation type.
+   *
+   * When adding entries to this please add them to the Set in org.apache.pinot.segment.local.utils.TableConfigUtils
+   * named AVAILABLE_CORE_VALUE_AGGREGATORS so that they can be used in RealtimeToOfflineTask
    */
   public static ValueAggregator getValueAggregator(AggregationFunctionType aggregationType, DataType dataType) {
     switch (aggregationType) {
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index 89cac72cac..76cba334e5 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -83,6 +83,8 @@ import org.quartz.CronScheduleBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.pinot.segment.spi.AggregationFunctionType.*;
+
 
 /**
  * Utils related to table config operations
@@ -103,8 +105,7 @@ public final class TableConfigUtils {
   // hardcode the value here to avoid pulling the entire pinot-kinesis module as dependency.
   private static final String KINESIS_STREAM_TYPE = "kinesis";
   private static final EnumSet<AggregationFunctionType> SUPPORTED_INGESTION_AGGREGATIONS =
-      EnumSet.of(AggregationFunctionType.SUM, AggregationFunctionType.MIN, AggregationFunctionType.MAX,
-          AggregationFunctionType.COUNT);
+      EnumSet.of(SUM, MIN, MAX, COUNT);
   private static final Set<String> UPSERT_DEDUP_ALLOWED_ROUTING_STRATEGIES =
       ImmutableSet.of(RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE,
           RoutingConfig.MULTI_STAGE_REPLICA_GROUP_SELECTOR_TYPE);
@@ -483,6 +484,11 @@ public final class TableConfigUtils {
     }
   }
 
+  public final static EnumSet<AggregationFunctionType> AVAILABLE_CORE_VALUE_AGGREGATORS =
+      EnumSet.of(MIN, MAX, SUM, DISTINCTCOUNTHLL, DISTINCTCOUNTRAWHLL, DISTINCTCOUNTTHETASKETCH,
+          DISTINCTCOUNTRAWTHETASKETCH, DISTINCTCOUNTTUPLESKETCH, DISTINCTCOUNTRAWINTEGERSUMTUPLESKETCH,
+          SUMVALUESINTEGERSUMTUPLESKETCH, AVGVALUEINTEGERSUMTUPLESKETCH);
+
   @VisibleForTesting
   static void validateTaskConfigs(TableConfig tableConfig, Schema schema) {
     TableTaskConfig taskConfig = tableConfig.getTaskConfig();
@@ -523,8 +529,18 @@ public final class TableConfigUtils {
             if (entry.getKey().endsWith(".aggregationType")) {
               Preconditions.checkState(columnNames.contains(StringUtils.removeEnd(entry.getKey(), ".aggregationType")),
                   String.format("Column \"%s\" not found in schema!", entry.getKey()));
-              Preconditions.checkState(ImmutableSet.of("SUM", "MAX", "MIN").contains(entry.getValue().toUpperCase()),
-                  String.format("Column \"%s\" has invalid aggregate type: %s", entry.getKey(), entry.getValue()));
+              try {
+                // check that it's a valid aggregation function type
+                AggregationFunctionType aft = AggregationFunctionType.getAggregationFunctionType(entry.getValue());
+                // check that a value aggregator is available
+                if (!AVAILABLE_CORE_VALUE_AGGREGATORS.contains(aft)) {
+                  throw new IllegalArgumentException("ValueAggregator not enabled for type: " + aft.toString());
+                }
+              } catch (IllegalArgumentException e) {
+                String err = String.format(
+                    "Column \"%s\" has invalid aggregate type: %s", entry.getKey(), entry.getValue());
+                throw new IllegalStateException(err);
+              }
             }
           }
         }
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
index a848fd1247..89f380d049 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
@@ -1755,6 +1755,27 @@ public class TableConfigUtilsTest {
     } catch (IllegalStateException e) {
       Assert.assertTrue(e.getMessage().contains("has invalid aggregate type"));
     }
+
+    // aggregation function that exists but has no ValueAggregator available
+    HashMap<String, String> invalidAgg2Config = new HashMap<>(realtimeToOfflineTaskConfig);
+    invalidAgg2Config.put("myCol.aggregationType", "Histogram");
+    tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTaskConfig(new TableTaskConfig(
+        ImmutableMap.of("RealtimeToOfflineSegmentsTask", invalidAgg2Config, "SegmentGenerationAndPushTask",
+            segmentGenerationAndPushTaskConfig))).build();
+    try {
+      TableConfigUtils.validateTaskConfigs(tableConfig, schema);
+      Assert.fail();
+    } catch (IllegalStateException e) {
+      Assert.assertTrue(e.getMessage().contains("has invalid aggregate type"));
+    }
+
+    // valid agg
+    HashMap<String, String> validAggConfig = new HashMap<>(realtimeToOfflineTaskConfig);
+    validAggConfig.put("myCol.aggregationType", "distinctCountHLL");
+    tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTaskConfig(new TableTaskConfig(
+        ImmutableMap.of("RealtimeToOfflineSegmentsTask", validAggConfig, "SegmentGenerationAndPushTask",
+            segmentGenerationAndPushTaskConfig))).build();
+    TableConfigUtils.validateTaskConfigs(tableConfig, schema);
   }
 
   @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org