You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2023/07/20 22:05:37 UTC
[pinot] branch master updated: Allow extra aggregation types in RealtimeToOfflineSegmentsTask (#10982)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 723b764bc9 Allow extra aggregation types in RealtimeToOfflineSegmentsTask (#10982)
723b764bc9 is described below
commit 723b764bc91275c0b8361d3f9135f151b6404c39
Author: Andi Miller <an...@andimiller.net>
AuthorDate: Thu Jul 20 23:05:30 2023 +0100
Allow extra aggregation types in RealtimeToOfflineSegmentsTask (#10982)
---
.../aggregator/ValueAggregatorFactory.java | 3 +++
.../segment/local/utils/TableConfigUtils.java | 24 ++++++++++++++++++----
.../segment/local/utils/TableConfigUtilsTest.java | 21 +++++++++++++++++++
3 files changed, 44 insertions(+), 4 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/ValueAggregatorFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/ValueAggregatorFactory.java
index 4cd5a1ea6d..cd5388870d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/ValueAggregatorFactory.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/ValueAggregatorFactory.java
@@ -32,6 +32,9 @@ public class ValueAggregatorFactory {
/**
* Constructs a ValueAggregator from the given aggregation type.
+ *
+ * When adding entries to this please add them to the Set in org.apache.pinot.segment.local.utils.TableConfigUtils
+ * named AVAILABLE_CORE_VALUE_AGGREGATORS so that they can be used in RealtimeToOfflineTask
*/
public static ValueAggregator getValueAggregator(AggregationFunctionType aggregationType, DataType dataType) {
switch (aggregationType) {
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index 89cac72cac..76cba334e5 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -83,6 +83,8 @@ import org.quartz.CronScheduleBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.pinot.segment.spi.AggregationFunctionType.*;
+
/**
* Utils related to table config operations
@@ -103,8 +105,7 @@ public final class TableConfigUtils {
// hardcode the value here to avoid pulling the entire pinot-kinesis module as dependency.
private static final String KINESIS_STREAM_TYPE = "kinesis";
private static final EnumSet<AggregationFunctionType> SUPPORTED_INGESTION_AGGREGATIONS =
- EnumSet.of(AggregationFunctionType.SUM, AggregationFunctionType.MIN, AggregationFunctionType.MAX,
- AggregationFunctionType.COUNT);
+ EnumSet.of(SUM, MIN, MAX, COUNT);
private static final Set<String> UPSERT_DEDUP_ALLOWED_ROUTING_STRATEGIES =
ImmutableSet.of(RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE,
RoutingConfig.MULTI_STAGE_REPLICA_GROUP_SELECTOR_TYPE);
@@ -483,6 +484,11 @@ public final class TableConfigUtils {
}
}
+ public final static EnumSet<AggregationFunctionType> AVAILABLE_CORE_VALUE_AGGREGATORS =
+ EnumSet.of(MIN, MAX, SUM, DISTINCTCOUNTHLL, DISTINCTCOUNTRAWHLL, DISTINCTCOUNTTHETASKETCH,
+ DISTINCTCOUNTRAWTHETASKETCH, DISTINCTCOUNTTUPLESKETCH, DISTINCTCOUNTRAWINTEGERSUMTUPLESKETCH,
+ SUMVALUESINTEGERSUMTUPLESKETCH, AVGVALUEINTEGERSUMTUPLESKETCH);
+
@VisibleForTesting
static void validateTaskConfigs(TableConfig tableConfig, Schema schema) {
TableTaskConfig taskConfig = tableConfig.getTaskConfig();
@@ -523,8 +529,18 @@ public final class TableConfigUtils {
if (entry.getKey().endsWith(".aggregationType")) {
Preconditions.checkState(columnNames.contains(StringUtils.removeEnd(entry.getKey(), ".aggregationType")),
String.format("Column \"%s\" not found in schema!", entry.getKey()));
- Preconditions.checkState(ImmutableSet.of("SUM", "MAX", "MIN").contains(entry.getValue().toUpperCase()),
- String.format("Column \"%s\" has invalid aggregate type: %s", entry.getKey(), entry.getValue()));
+ try {
+ // check that it's a valid aggregation function type
+ AggregationFunctionType aft = AggregationFunctionType.getAggregationFunctionType(entry.getValue());
+ // check that a value aggregator is available
+ if (!AVAILABLE_CORE_VALUE_AGGREGATORS.contains(aft)) {
+ throw new IllegalArgumentException("ValueAggregator not enabled for type: " + aft.toString());
+ }
+ } catch (IllegalArgumentException e) {
+ String err = String.format(
+ "Column \"%s\" has invalid aggregate type: %s", entry.getKey(), entry.getValue());
+ throw new IllegalStateException(err);
+ }
}
}
}
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
index a848fd1247..89f380d049 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
@@ -1755,6 +1755,27 @@ public class TableConfigUtilsTest {
} catch (IllegalStateException e) {
Assert.assertTrue(e.getMessage().contains("has invalid aggregate type"));
}
+
+ // aggregation function that exists but has no ValueAggregator available
+ HashMap<String, String> invalidAgg2Config = new HashMap<>(realtimeToOfflineTaskConfig);
+ invalidAgg2Config.put("myCol.aggregationType", "Histogram");
+ tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTaskConfig(new TableTaskConfig(
+ ImmutableMap.of("RealtimeToOfflineSegmentsTask", invalidAgg2Config, "SegmentGenerationAndPushTask",
+ segmentGenerationAndPushTaskConfig))).build();
+ try {
+ TableConfigUtils.validateTaskConfigs(tableConfig, schema);
+ Assert.fail();
+ } catch (IllegalStateException e) {
+ Assert.assertTrue(e.getMessage().contains("has invalid aggregate type"));
+ }
+
+ // valid agg
+ HashMap<String, String> validAggConfig = new HashMap<>(realtimeToOfflineTaskConfig);
+ validAggConfig.put("myCol.aggregationType", "distinctCountHLL");
+ tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTaskConfig(new TableTaskConfig(
+ ImmutableMap.of("RealtimeToOfflineSegmentsTask", validAggConfig, "SegmentGenerationAndPushTask",
+ segmentGenerationAndPushTaskConfig))).build();
+ TableConfigUtils.validateTaskConfigs(tableConfig, schema);
}
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org