You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2020/10/06 01:51:40 UTC
[incubator-pinot] 01/01: Enhance star-tree to skip matching-all
predicate on non-star-tree dimension
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch enhance_star_tree
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 6519f22df04d8174715530f16c101ae86841024b
Author: Xiaotian (Jackie) Jiang <ja...@gmail.com>
AuthorDate: Mon Oct 5 18:44:09 2020 -0700
Enhance star-tree to skip matching-all predicate on non-star-tree dimension
---
.../plan/AggregationGroupByOrderByPlanNode.java | 34 ++---
.../core/plan/AggregationGroupByPlanNode.java | 34 ++---
.../pinot/core/plan/AggregationPlanNode.java | 36 ++---
.../apache/pinot/core/startree/StarTreeUtils.java | 147 +++++++++++++++------
.../startree/operator/StarTreeFilterOperator.java | 101 +++-----------
.../startree/plan/StarTreeDocIdSetPlanNode.java | 8 +-
.../core/startree/plan/StarTreeFilterPlanNode.java | 13 +-
.../startree/plan/StarTreeProjectionPlanNode.java | 10 +-
.../startree/plan/StarTreeTransformPlanNode.java | 9 +-
.../pinot/core/startree/v2/BaseStarTreeV2Test.java | 30 ++---
.../tests/OfflineClusterIntegrationTest.java | 12 ++
11 files changed, 210 insertions(+), 224 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationGroupByOrderByPlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationGroupByOrderByPlanNode.java
index 5cb94bd..892a96d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationGroupByOrderByPlanNode.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationGroupByOrderByPlanNode.java
@@ -19,13 +19,14 @@
package org.apache.pinot.core.plan;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import org.apache.pinot.core.indexsegment.IndexSegment;
+import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator;
import org.apache.pinot.core.operator.query.AggregationGroupByOrderByOperator;
import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils;
import org.apache.pinot.core.query.request.context.ExpressionContext;
-import org.apache.pinot.core.query.request.context.FilterContext;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.startree.StarTreeUtils;
import org.apache.pinot.core.startree.plan.StarTreeTransformPlanNode;
@@ -59,32 +60,21 @@ public class AggregationGroupByOrderByPlanNode implements PlanNode {
_groupByExpressions = groupByExpressions.toArray(new ExpressionContext[0]);
List<StarTreeV2> starTrees = indexSegment.getStarTrees();
- if (starTrees != null) {
- if (!StarTreeUtils.isStarTreeDisabled(queryContext)) {
- int numAggregationFunctions = _aggregationFunctions.length;
- AggregationFunctionColumnPair[] aggregationFunctionColumnPairs =
- new AggregationFunctionColumnPair[numAggregationFunctions];
- boolean hasUnsupportedAggregationFunction = false;
- for (int i = 0; i < numAggregationFunctions; i++) {
- AggregationFunctionColumnPair aggregationFunctionColumnPair =
- AggregationFunctionUtils.getAggregationFunctionColumnPair(_aggregationFunctions[i]);
- if (aggregationFunctionColumnPair != null) {
- aggregationFunctionColumnPairs[i] = aggregationFunctionColumnPair;
- } else {
- hasUnsupportedAggregationFunction = true;
- break;
- }
- }
- if (!hasUnsupportedAggregationFunction) {
- FilterContext filter = queryContext.getFilter();
+ if (starTrees != null && !StarTreeUtils.isStarTreeDisabled(queryContext)) {
+ AggregationFunctionColumnPair[] aggregationFunctionColumnPairs =
+ StarTreeUtils.extractAggregationFunctionPairs(_aggregationFunctions);
+ if (aggregationFunctionColumnPairs != null) {
+ Map<String, List<PredicateEvaluator>> predicateEvaluatorsMap =
+ StarTreeUtils.extractPredicateEvaluatorsMap(indexSegment, queryContext.getFilter());
+ if (predicateEvaluatorsMap != null) {
for (StarTreeV2 starTreeV2 : starTrees) {
if (StarTreeUtils
.isFitForStarTree(starTreeV2.getMetadata(), aggregationFunctionColumnPairs, _groupByExpressions,
- filter)) {
+ predicateEvaluatorsMap.keySet())) {
_transformPlanNode = null;
_starTreeTransformPlanNode =
- new StarTreeTransformPlanNode(starTreeV2, aggregationFunctionColumnPairs, _groupByExpressions, filter,
- queryContext.getDebugOptions());
+ new StarTreeTransformPlanNode(starTreeV2, aggregationFunctionColumnPairs, _groupByExpressions,
+ predicateEvaluatorsMap, queryContext.getDebugOptions());
return;
}
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationGroupByPlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationGroupByPlanNode.java
index acce657..e2b3ea8 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationGroupByPlanNode.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationGroupByPlanNode.java
@@ -19,13 +19,14 @@
package org.apache.pinot.core.plan;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import org.apache.pinot.core.indexsegment.IndexSegment;
+import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator;
import org.apache.pinot.core.operator.query.AggregationGroupByOperator;
import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils;
import org.apache.pinot.core.query.request.context.ExpressionContext;
-import org.apache.pinot.core.query.request.context.FilterContext;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.startree.StarTreeUtils;
import org.apache.pinot.core.startree.plan.StarTreeTransformPlanNode;
@@ -59,32 +60,21 @@ public class AggregationGroupByPlanNode implements PlanNode {
_groupByExpressions = groupByExpressions.toArray(new ExpressionContext[0]);
List<StarTreeV2> starTrees = indexSegment.getStarTrees();
- if (starTrees != null) {
- if (!StarTreeUtils.isStarTreeDisabled(queryContext)) {
- int numAggregationFunctions = _aggregationFunctions.length;
- AggregationFunctionColumnPair[] aggregationFunctionColumnPairs =
- new AggregationFunctionColumnPair[numAggregationFunctions];
- boolean hasUnsupportedAggregationFunction = false;
- for (int i = 0; i < numAggregationFunctions; i++) {
- AggregationFunctionColumnPair aggregationFunctionColumnPair =
- AggregationFunctionUtils.getAggregationFunctionColumnPair(_aggregationFunctions[i]);
- if (aggregationFunctionColumnPair != null) {
- aggregationFunctionColumnPairs[i] = aggregationFunctionColumnPair;
- } else {
- hasUnsupportedAggregationFunction = true;
- break;
- }
- }
- if (!hasUnsupportedAggregationFunction) {
- FilterContext filter = queryContext.getFilter();
+ if (starTrees != null && !StarTreeUtils.isStarTreeDisabled(queryContext)) {
+ AggregationFunctionColumnPair[] aggregationFunctionColumnPairs =
+ StarTreeUtils.extractAggregationFunctionPairs(_aggregationFunctions);
+ if (aggregationFunctionColumnPairs != null) {
+ Map<String, List<PredicateEvaluator>> predicateEvaluatorsMap =
+ StarTreeUtils.extractPredicateEvaluatorsMap(indexSegment, queryContext.getFilter());
+ if (predicateEvaluatorsMap != null) {
for (StarTreeV2 starTreeV2 : starTrees) {
if (StarTreeUtils
.isFitForStarTree(starTreeV2.getMetadata(), aggregationFunctionColumnPairs, _groupByExpressions,
- filter)) {
+ predicateEvaluatorsMap.keySet())) {
_transformPlanNode = null;
_starTreeTransformPlanNode =
- new StarTreeTransformPlanNode(starTreeV2, aggregationFunctionColumnPairs, _groupByExpressions, filter,
- queryContext.getDebugOptions());
+ new StarTreeTransformPlanNode(starTreeV2, aggregationFunctionColumnPairs, _groupByExpressions,
+ predicateEvaluatorsMap, queryContext.getDebugOptions());
return;
}
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java
index 4a267a2..a224125 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java
@@ -19,13 +19,14 @@
package org.apache.pinot.core.plan;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import org.apache.pinot.core.indexsegment.IndexSegment;
+import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator;
import org.apache.pinot.core.operator.query.AggregationOperator;
import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils;
import org.apache.pinot.core.query.request.context.ExpressionContext;
-import org.apache.pinot.core.query.request.context.FilterContext;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.startree.StarTreeUtils;
import org.apache.pinot.core.startree.plan.StarTreeTransformPlanNode;
@@ -50,31 +51,20 @@ public class AggregationPlanNode implements PlanNode {
assert _aggregationFunctions != null;
List<StarTreeV2> starTrees = indexSegment.getStarTrees();
- if (starTrees != null) {
- if (!StarTreeUtils.isStarTreeDisabled(queryContext)) {
- int numAggregationFunctions = _aggregationFunctions.length;
- AggregationFunctionColumnPair[] aggregationFunctionColumnPairs =
- new AggregationFunctionColumnPair[numAggregationFunctions];
- boolean hasUnsupportedAggregationFunction = false;
- for (int i = 0; i < numAggregationFunctions; i++) {
- AggregationFunctionColumnPair aggregationFunctionColumnPair =
- AggregationFunctionUtils.getAggregationFunctionColumnPair(_aggregationFunctions[i]);
- if (aggregationFunctionColumnPair != null) {
- aggregationFunctionColumnPairs[i] = aggregationFunctionColumnPair;
- } else {
- hasUnsupportedAggregationFunction = true;
- break;
- }
- }
- if (!hasUnsupportedAggregationFunction) {
- FilterContext filter = queryContext.getFilter();
+ if (starTrees != null && !StarTreeUtils.isStarTreeDisabled(queryContext)) {
+ AggregationFunctionColumnPair[] aggregationFunctionColumnPairs =
+ StarTreeUtils.extractAggregationFunctionPairs(_aggregationFunctions);
+ if (aggregationFunctionColumnPairs != null) {
+ Map<String, List<PredicateEvaluator>> predicateEvaluatorsMap =
+ StarTreeUtils.extractPredicateEvaluatorsMap(indexSegment, queryContext.getFilter());
+ if (predicateEvaluatorsMap != null) {
for (StarTreeV2 starTreeV2 : starTrees) {
- if (StarTreeUtils
- .isFitForStarTree(starTreeV2.getMetadata(), aggregationFunctionColumnPairs, null, filter)) {
+ if (StarTreeUtils.isFitForStarTree(starTreeV2.getMetadata(), aggregationFunctionColumnPairs, null,
+ predicateEvaluatorsMap.keySet())) {
_transformPlanNode = null;
_starTreeTransformPlanNode =
- new StarTreeTransformPlanNode(starTreeV2, aggregationFunctionColumnPairs, null, filter,
- queryContext.getDebugOptions());
+ new StarTreeTransformPlanNode(starTreeV2, aggregationFunctionColumnPairs, null,
+ predicateEvaluatorsMap, queryContext.getDebugOptions());
return;
}
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/startree/StarTreeUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/startree/StarTreeUtils.java
index ed98e98..c7340b2 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/startree/StarTreeUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/startree/StarTreeUtils.java
@@ -20,18 +20,30 @@ package org.apache.pinot.core.startree;
import java.io.File;
import java.io.FileOutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Queue;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.io.FileUtils;
+import org.apache.pinot.core.common.DataSource;
+import org.apache.pinot.core.indexsegment.IndexSegment;
+import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator;
+import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluatorProvider;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils;
import org.apache.pinot.core.query.request.context.ExpressionContext;
import org.apache.pinot.core.query.request.context.FilterContext;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.query.request.context.predicate.Predicate;
import org.apache.pinot.core.segment.creator.impl.V1Constants;
+import org.apache.pinot.core.segment.index.readers.Dictionary;
import org.apache.pinot.core.segment.store.SegmentDirectoryPaths;
import org.apache.pinot.core.startree.v2.AggregationFunctionColumnPair;
import org.apache.pinot.core.startree.v2.StarTreeV2Constants;
@@ -40,6 +52,7 @@ import org.apache.pinot.core.startree.v2.builder.StarTreeV2BuilderConfig;
import org.apache.pinot.spi.env.CommonsConfigurationUtils;
+@SuppressWarnings("rawtypes")
public class StarTreeUtils {
private StarTreeUtils() {
}
@@ -55,17 +68,103 @@ public class StarTreeUtils {
}
/**
+ * Extracts the {@link AggregationFunctionColumnPair}s from the given {@link AggregationFunction}s. Returns
+ * {@code null} if any {@link AggregationFunction} cannot be represented as an {@link AggregationFunctionColumnPair}
+ * (e.g. has multiple arguments, argument is not column etc.).
+ */
+ @Nullable
+ public static AggregationFunctionColumnPair[] extractAggregationFunctionPairs(
+ AggregationFunction[] aggregationFunctions) {
+ int numAggregationFunctions = aggregationFunctions.length;
+ AggregationFunctionColumnPair[] aggregationFunctionColumnPairs =
+ new AggregationFunctionColumnPair[numAggregationFunctions];
+ for (int i = 0; i < numAggregationFunctions; i++) {
+ AggregationFunctionColumnPair aggregationFunctionColumnPair =
+ AggregationFunctionUtils.getAggregationFunctionColumnPair(aggregationFunctions[i]);
+ if (aggregationFunctionColumnPair != null) {
+ aggregationFunctionColumnPairs[i] = aggregationFunctionColumnPair;
+ } else {
+ return null;
+ }
+ }
+ return aggregationFunctionColumnPairs;
+ }
+
+ /**
+ * Extracts a map from the column to a list of {@link PredicateEvaluator}s for it. Returns {@code null} if the filter
+ * cannot be solved by the star-tree.
+ */
+ @Nullable
+ public static Map<String, List<PredicateEvaluator>> extractPredicateEvaluatorsMap(IndexSegment indexSegment,
+ @Nullable FilterContext filter) {
+ if (filter == null) {
+ return Collections.emptyMap();
+ }
+
+ Map<String, List<PredicateEvaluator>> predicateEvaluatorsMap = new HashMap<>();
+ Queue<FilterContext> queue = new LinkedList<>();
+ queue.add(filter);
+ FilterContext filterNode;
+ while ((filterNode = queue.poll()) != null) {
+ switch (filterNode.getType()) {
+ case AND:
+ queue.addAll(filterNode.getChildren());
+ break;
+ case OR:
+ // Star-tree does not support OR filter
+ return null;
+ case PREDICATE:
+ Predicate predicate = filterNode.getPredicate();
+ ExpressionContext lhs = predicate.getLhs();
+ if (lhs.getType() != ExpressionContext.Type.IDENTIFIER) {
+ // Star-tree does not support non-identifier expression
+ return null;
+ }
+ String column = lhs.getIdentifier();
+ DataSource dataSource = indexSegment.getDataSource(column);
+ Dictionary dictionary = dataSource.getDictionary();
+ if (dictionary == null) {
+ // Star-tree does not support non-dictionary encoded dimension
+ return null;
+ }
+ switch (predicate.getType()) {
+ // Do not use star-tree for the following predicates because:
+ // - REGEXP_LIKE: Need to scan the whole dictionary to gather the matching dictionary ids
+ // - TEXT_MATCH/IS_NULL/IS_NOT_NULL: No way to gather the matching dictionary ids
+ case REGEXP_LIKE:
+ case TEXT_MATCH:
+ case IS_NULL:
+ case IS_NOT_NULL:
+ return null;
+ }
+ PredicateEvaluator predicateEvaluator = PredicateEvaluatorProvider
+ .getPredicateEvaluator(predicate, dictionary, dataSource.getDataSourceMetadata().getDataType());
+ if (predicateEvaluator.isAlwaysFalse()) {
+ // Do not use star-tree if there is no matching record
+ return null;
+ }
+ if (!predicateEvaluator.isAlwaysTrue()) {
+ predicateEvaluatorsMap.computeIfAbsent(column, k -> new ArrayList<>()).add(predicateEvaluator);
+ }
+ break;
+ default:
+ throw new IllegalStateException();
+ }
+ }
+ return predicateEvaluatorsMap;
+ }
+
+ /**
* Returns whether the query is fit for star tree index.
* <p>The query is fit for star tree index if the following conditions are met:
* <ul>
* <li>Star-tree contains all aggregation function column pairs</li>
* <li>All predicate columns and group-by columns are star-tree dimensions</li>
- * <li>All predicates are conjoined by AND</li>
* </ul>
*/
public static boolean isFitForStarTree(StarTreeV2Metadata starTreeV2Metadata,
AggregationFunctionColumnPair[] aggregationFunctionColumnPairs, @Nullable ExpressionContext[] groupByExpressions,
- @Nullable FilterContext filter) {
+ Set<String> predicateColumns) {
// Check aggregations
for (AggregationFunctionColumnPair aggregationFunctionColumnPair : aggregationFunctionColumnPairs) {
if (!starTreeV2Metadata.containsFunctionColumnPair(aggregationFunctionColumnPair)) {
@@ -73,8 +172,9 @@ public class StarTreeUtils {
}
}
- // Check group-by expressions
Set<String> starTreeDimensions = new HashSet<>(starTreeV2Metadata.getDimensionsSplitOrder());
+
+ // Check group-by expressions
if (groupByExpressions != null) {
Set<String> groupByColumns = new HashSet<>();
for (ExpressionContext groupByExpression : groupByExpressions) {
@@ -85,45 +185,8 @@ public class StarTreeUtils {
}
}
- // Check filters
- return filter == null || checkFilters(filter, starTreeDimensions);
- }
-
- /**
- * Helper method to check whether all columns in predicates are star-tree dimensions, and all predicates are
- * conjoined by AND.
- */
- private static boolean checkFilters(FilterContext filter, Set<String> starTreeDimensions) {
- switch (filter.getType()) {
- case AND:
- for (FilterContext child : filter.getChildren()) {
- if (!checkFilters(child, starTreeDimensions)) {
- return false;
- }
- }
- return true;
- case OR:
- return false;
- case PREDICATE:
- Predicate predicate = filter.getPredicate();
- ExpressionContext lhs = predicate.getLhs();
- if (lhs.getType() != ExpressionContext.Type.IDENTIFIER) {
- return false;
- }
- switch (predicate.getType()) {
- // NOTE: Do not use star-tree for the following predicates because:
- // - REGEXP_LIKE: Need to scan the whole dictionary to gather the matching dictionary ids
- // - TEXT_MATCH/IS_NULL/IS_NOT_NULL: No way to gather the matching dictionary ids
- case REGEXP_LIKE:
- case TEXT_MATCH:
- case IS_NULL:
- case IS_NOT_NULL:
- return false;
- }
- return starTreeDimensions.contains(lhs.getIdentifier());
- default:
- throw new IllegalStateException();
- }
+ // Check predicate columns
+ return starTreeDimensions.containsAll(predicateColumns);
}
/**
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/startree/operator/StarTreeFilterOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/startree/operator/StarTreeFilterOperator.java
index 32d0b57..2037b71 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/startree/operator/StarTreeFilterOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/startree/operator/StarTreeFilterOperator.java
@@ -22,7 +22,6 @@ import it.unimi.dsi.fastutil.ints.IntIterator;
import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
import it.unimi.dsi.fastutil.ints.IntSet;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
@@ -41,9 +40,6 @@ import org.apache.pinot.core.operator.filter.BitmapBasedFilterOperator;
import org.apache.pinot.core.operator.filter.EmptyFilterOperator;
import org.apache.pinot.core.operator.filter.FilterOperatorUtils;
import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator;
-import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluatorProvider;
-import org.apache.pinot.core.query.request.context.FilterContext;
-import org.apache.pinot.core.query.request.context.predicate.Predicate;
import org.apache.pinot.core.startree.StarTree;
import org.apache.pinot.core.startree.StarTreeNode;
import org.apache.pinot.core.startree.v2.StarTreeV2;
@@ -121,81 +117,23 @@ public class StarTreeFilterOperator extends BaseFilterOperator {
// Star-tree
private final StarTreeV2 _starTreeV2;
- // Set of group-by columns
- private final Set<String> _groupByColumns;
// Map from column to predicate evaluators
private final Map<String, List<PredicateEvaluator>> _predicateEvaluatorsMap;
- // Map from column to matching dictionary ids
- private final Map<String, IntSet> _matchingDictIdsMap;
+ // Set of group-by columns
+ private final Set<String> _groupByColumns;
private final Map<String, String> _debugOptions;
boolean _resultEmpty = false;
- public StarTreeFilterOperator(StarTreeV2 starTreeV2, @Nullable FilterContext filter,
- @Nullable Set<String> groupByColumns, @Nullable Map<String, String> debugOptions) {
+ public StarTreeFilterOperator(StarTreeV2 starTreeV2, Map<String, List<PredicateEvaluator>> predicateEvaluatorsMap,
+ Set<String> groupByColumns, @Nullable Map<String, String> debugOptions) {
_starTreeV2 = starTreeV2;
- _groupByColumns = groupByColumns != null ? new HashSet<>(groupByColumns) : Collections.emptySet();
+ _predicateEvaluatorsMap = predicateEvaluatorsMap;
+ _groupByColumns = groupByColumns;
_debugOptions = debugOptions;
- if (filter != null) {
- _predicateEvaluatorsMap = new HashMap<>();
- _matchingDictIdsMap = new HashMap<>();
-
- // Process the filter tree and get a map from column to a list of predicates applied to it
- Map<String, List<Predicate>> predicatesMap = getPredicatesMap(filter);
-
- // Initialize the predicate evaluators map
- for (Map.Entry<String, List<Predicate>> entry : predicatesMap.entrySet()) {
- String columnName = entry.getKey();
- List<Predicate> predicates = entry.getValue();
- List<PredicateEvaluator> predicateEvaluators = new ArrayList<>();
-
- DataSource dataSource = starTreeV2.getDataSource(columnName);
- for (Predicate predicate : predicates) {
- PredicateEvaluator predicateEvaluator = PredicateEvaluatorProvider
- .getPredicateEvaluator(predicate, dataSource.getDictionary(),
- dataSource.getDataSourceMetadata().getDataType());
- // If predicate is always evaluated false, the result for the filter operator will be empty, early terminate
- if (predicateEvaluator.isAlwaysFalse()) {
- _resultEmpty = true;
- return;
- } else if (!predicateEvaluator.isAlwaysTrue()) {
- predicateEvaluators.add(predicateEvaluator);
- }
- }
- if (!predicateEvaluators.isEmpty()) {
- _predicateEvaluatorsMap.put(columnName, predicateEvaluators);
- }
- }
-
- // Remove columns with predicates from group-by columns because we won't use star node for that column
- _groupByColumns.removeAll(_predicateEvaluatorsMap.keySet());
- } else {
- _predicateEvaluatorsMap = Collections.emptyMap();
- _matchingDictIdsMap = Collections.emptyMap();
- }
- }
-
- /**
- * Helper method to process the filter tree and get a map from column to a list of predicates applied to it.
- */
- private Map<String, List<Predicate>> getPredicatesMap(FilterContext filter) {
- Map<String, List<Predicate>> predicatesMap = new HashMap<>();
- Queue<FilterContext> queue = new LinkedList<>();
- queue.add(filter);
-
- while (!queue.isEmpty()) {
- FilterContext filterNode = queue.remove();
- if (filterNode.getType() == FilterContext.Type.AND) {
- queue.addAll(filterNode.getChildren());
- } else {
- Predicate predicate = filterNode.getPredicate();
- String columnName = predicate.getLhs().getIdentifier();
- predicatesMap.computeIfAbsent(columnName, k -> new ArrayList<>()).add(predicate);
- }
- }
-
- return predicatesMap;
+ // Remove columns with predicates from group-by columns because we won't use star node for that column
+ _groupByColumns.removeAll(_predicateEvaluatorsMap.keySet());
}
@Override
@@ -252,13 +190,14 @@ public class StarTreeFilterOperator extends BaseFilterOperator {
/**
* Helper method to traverse the star tree, get matching documents and keep track of all the predicate columns that
- * are not matched.
- * <p>Return <code>null</code> if no matching dictionary id found for a column (i.e. the result for the filter
- * operator is empty).
+ * are not matched. Returns {@code null} if no matching dictionary id found for a column (i.e. the result for the
+ * filter operator is empty).
*/
+ @Nullable
private StarTreeResult traverseStarTree() {
- MutableRoaringBitmap matchedDocIds = new MutableRoaringBitmap();
+ MutableRoaringBitmap matchingDocIds = new MutableRoaringBitmap();
Set<String> remainingPredicateColumns = new HashSet<>();
+ Map<String, IntSet> matchingDictIdsMap = new HashMap<>();
StarTree starTree = _starTreeV2.getStarTree();
List<String> dimensionNames = starTree.getDimensionNames();
@@ -267,19 +206,19 @@ public class StarTreeFilterOperator extends BaseFilterOperator {
// Use BFS to traverse the star tree
Queue<SearchEntry> queue = new LinkedList<>();
queue.add(new SearchEntry(starTreeRootNode, _predicateEvaluatorsMap.keySet(), _groupByColumns));
- while (!queue.isEmpty()) {
- SearchEntry searchEntry = queue.remove();
+ SearchEntry searchEntry;
+ while ((searchEntry = queue.poll()) != null) {
StarTreeNode starTreeNode = searchEntry._starTreeNode;
// If all predicate columns and group-by columns are matched, we can use aggregated document
if (searchEntry._remainingPredicateColumns.isEmpty() && searchEntry._remainingGroupByColumns.isEmpty()) {
- matchedDocIds.add(starTreeNode.getAggregatedDocId());
+ matchingDocIds.add(starTreeNode.getAggregatedDocId());
} else {
// For leaf node, because we haven't exhausted all predicate columns and group-by columns, we cannot use
// the aggregated document. Add the range of documents for this node to the bitmap, and keep track of the
// remaining predicate columns for this node
if (starTreeNode.isLeaf()) {
- matchedDocIds.add(starTreeNode.getStartDocId(), starTreeNode.getEndDocId());
+ matchingDocIds.add((long) starTreeNode.getStartDocId(), starTreeNode.getEndDocId());
remainingPredicateColumns.addAll(searchEntry._remainingPredicateColumns);
} else {
// For non-leaf node, proceed to next level
@@ -290,7 +229,7 @@ public class StarTreeFilterOperator extends BaseFilterOperator {
Set<String> newRemainingPredicateColumns = new HashSet<>(searchEntry._remainingPredicateColumns);
newRemainingPredicateColumns.remove(nextDimension);
- IntSet matchingDictIds = _matchingDictIdsMap.get(nextDimension);
+ IntSet matchingDictIds = matchingDictIdsMap.get(nextDimension);
if (matchingDictIds == null) {
matchingDictIds = getMatchingDictIds(_predicateEvaluatorsMap.get(nextDimension));
@@ -299,7 +238,7 @@ public class StarTreeFilterOperator extends BaseFilterOperator {
return null;
}
- _matchingDictIdsMap.put(nextDimension, matchingDictIds);
+ matchingDictIdsMap.put(nextDimension, matchingDictIds);
}
int numMatchingDictIds = matchingDictIds.size();
@@ -358,7 +297,7 @@ public class StarTreeFilterOperator extends BaseFilterOperator {
}
}
- return new StarTreeResult(matchedDocIds, remainingPredicateColumns);
+ return new StarTreeResult(matchingDocIds, remainingPredicateColumns);
}
/**
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/startree/plan/StarTreeDocIdSetPlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/startree/plan/StarTreeDocIdSetPlanNode.java
index c418d21..4fe485d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/startree/plan/StarTreeDocIdSetPlanNode.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/startree/plan/StarTreeDocIdSetPlanNode.java
@@ -18,22 +18,24 @@
*/
package org.apache.pinot.core.startree.plan;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.pinot.core.operator.DocIdSetOperator;
+import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator;
import org.apache.pinot.core.plan.DocIdSetPlanNode;
import org.apache.pinot.core.plan.PlanNode;
-import org.apache.pinot.core.query.request.context.FilterContext;
import org.apache.pinot.core.startree.v2.StarTreeV2;
public class StarTreeDocIdSetPlanNode implements PlanNode {
private final StarTreeFilterPlanNode _starTreeFilterPlanNode;
- public StarTreeDocIdSetPlanNode(StarTreeV2 starTreeV2, @Nullable FilterContext filter,
+ public StarTreeDocIdSetPlanNode(StarTreeV2 starTreeV2, Map<String, List<PredicateEvaluator>> predicateEvaluatorsMap,
@Nullable Set<String> groupByColumns, @Nullable Map<String, String> debugOptions) {
- _starTreeFilterPlanNode = new StarTreeFilterPlanNode(starTreeV2, filter, groupByColumns, debugOptions);
+ _starTreeFilterPlanNode =
+ new StarTreeFilterPlanNode(starTreeV2, predicateEvaluatorsMap, groupByColumns, debugOptions);
}
@Override
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/startree/plan/StarTreeFilterPlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/startree/plan/StarTreeFilterPlanNode.java
index 9628086..82b3391 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/startree/plan/StarTreeFilterPlanNode.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/startree/plan/StarTreeFilterPlanNode.java
@@ -18,31 +18,32 @@
*/
package org.apache.pinot.core.startree.plan;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
+import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator;
import org.apache.pinot.core.plan.PlanNode;
-import org.apache.pinot.core.query.request.context.FilterContext;
import org.apache.pinot.core.startree.operator.StarTreeFilterOperator;
import org.apache.pinot.core.startree.v2.StarTreeV2;
public class StarTreeFilterPlanNode implements PlanNode {
private final StarTreeV2 _starTreeV2;
- private final FilterContext _filter;
+ private final Map<String, List<PredicateEvaluator>> _predicateEvaluatorsMap;
private final Set<String> _groupByColumns;
private final Map<String, String> _debugOptions;
- public StarTreeFilterPlanNode(StarTreeV2 starTreeV2, @Nullable FilterContext filter,
- @Nullable Set<String> groupByColumns, @Nullable Map<String, String> debugOptions) {
+ public StarTreeFilterPlanNode(StarTreeV2 starTreeV2, Map<String, List<PredicateEvaluator>> predicateEvaluatorsMap,
+ Set<String> groupByColumns, @Nullable Map<String, String> debugOptions) {
_starTreeV2 = starTreeV2;
- _filter = filter;
+ _predicateEvaluatorsMap = predicateEvaluatorsMap;
_groupByColumns = groupByColumns;
_debugOptions = debugOptions;
}
@Override
public StarTreeFilterOperator run() {
- return new StarTreeFilterOperator(_starTreeV2, _filter, _groupByColumns, _debugOptions);
+ return new StarTreeFilterOperator(_starTreeV2, _predicateEvaluatorsMap, _groupByColumns, _debugOptions);
}
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/startree/plan/StarTreeProjectionPlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/startree/plan/StarTreeProjectionPlanNode.java
index e6d482d..7857e57 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/startree/plan/StarTreeProjectionPlanNode.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/startree/plan/StarTreeProjectionPlanNode.java
@@ -19,13 +19,14 @@
package org.apache.pinot.core.startree.plan;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.pinot.core.common.DataSource;
import org.apache.pinot.core.operator.ProjectionOperator;
+import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator;
import org.apache.pinot.core.plan.PlanNode;
-import org.apache.pinot.core.query.request.context.FilterContext;
import org.apache.pinot.core.startree.v2.StarTreeV2;
@@ -34,13 +35,14 @@ public class StarTreeProjectionPlanNode implements PlanNode {
private final StarTreeDocIdSetPlanNode _starTreeDocIdSetPlanNode;
public StarTreeProjectionPlanNode(StarTreeV2 starTreeV2, Set<String> projectionColumns,
- @Nullable FilterContext filter, @Nullable Set<String> groupByColumns,
+ Map<String, List<PredicateEvaluator>> predicateEvaluatorsMap, Set<String> groupByColumns,
@Nullable Map<String, String> debugOptions) {
- _dataSourceMap = new HashMap<>(projectionColumns.size());
+ _dataSourceMap = new HashMap<>();
for (String projectionColumn : projectionColumns) {
_dataSourceMap.put(projectionColumn, starTreeV2.getDataSource(projectionColumn));
}
- _starTreeDocIdSetPlanNode = new StarTreeDocIdSetPlanNode(starTreeV2, filter, groupByColumns, debugOptions);
+ _starTreeDocIdSetPlanNode =
+ new StarTreeDocIdSetPlanNode(starTreeV2, predicateEvaluatorsMap, groupByColumns, debugOptions);
}
@Override
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/startree/plan/StarTreeTransformPlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/startree/plan/StarTreeTransformPlanNode.java
index 8f50fa5..6a316fd 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/startree/plan/StarTreeTransformPlanNode.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/startree/plan/StarTreeTransformPlanNode.java
@@ -25,10 +25,10 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
+import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator;
import org.apache.pinot.core.operator.transform.TransformOperator;
import org.apache.pinot.core.plan.PlanNode;
import org.apache.pinot.core.query.request.context.ExpressionContext;
-import org.apache.pinot.core.query.request.context.FilterContext;
import org.apache.pinot.core.startree.v2.AggregationFunctionColumnPair;
import org.apache.pinot.core.startree.v2.StarTreeV2;
@@ -39,7 +39,7 @@ public class StarTreeTransformPlanNode implements PlanNode {
public StarTreeTransformPlanNode(StarTreeV2 starTreeV2,
AggregationFunctionColumnPair[] aggregationFunctionColumnPairs, @Nullable ExpressionContext[] groupByExpressions,
- @Nullable FilterContext filter, @Nullable Map<String, String> debugOptions) {
+ Map<String, List<PredicateEvaluator>> predicateEvaluatorsMap, @Nullable Map<String, String> debugOptions) {
Set<String> projectionColumns = new HashSet<>();
for (AggregationFunctionColumnPair aggregationFunctionColumnPair : aggregationFunctionColumnPairs) {
projectionColumns.add(aggregationFunctionColumnPair.toColumnName());
@@ -54,10 +54,11 @@ public class StarTreeTransformPlanNode implements PlanNode {
projectionColumns.addAll(groupByColumns);
} else {
_groupByExpressions = Collections.emptyList();
- groupByColumns = null;
+ groupByColumns = Collections.emptySet();
}
_starTreeProjectionPlanNode =
- new StarTreeProjectionPlanNode(starTreeV2, projectionColumns, filter, groupByColumns, debugOptions);
+ new StarTreeProjectionPlanNode(starTreeV2, projectionColumns, predicateEvaluatorsMap, groupByColumns,
+ debugOptions);
}
@Override
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/BaseStarTreeV2Test.java b/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/BaseStarTreeV2Test.java
index 7b46277..4ea61cf 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/BaseStarTreeV2Test.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/BaseStarTreeV2Test.java
@@ -40,10 +40,10 @@ import org.apache.pinot.core.data.readers.GenericRowRecordReader;
import org.apache.pinot.core.indexsegment.IndexSegment;
import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
import org.apache.pinot.core.indexsegment.immutable.ImmutableSegmentLoader;
+import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator;
import org.apache.pinot.core.plan.FilterPlanNode;
import org.apache.pinot.core.plan.PlanNode;
import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
-import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils;
import org.apache.pinot.core.query.request.context.ExpressionContext;
import org.apache.pinot.core.query.request.context.FilterContext;
import org.apache.pinot.core.query.request.context.QueryContext;
@@ -52,6 +52,7 @@ import org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl
import org.apache.pinot.core.segment.index.readers.Dictionary;
import org.apache.pinot.core.segment.index.readers.ForwardIndexReader;
import org.apache.pinot.core.segment.index.readers.ForwardIndexReaderContext;
+import org.apache.pinot.core.startree.StarTreeUtils;
import org.apache.pinot.core.startree.plan.StarTreeFilterPlanNode;
import org.apache.pinot.core.startree.v2.builder.MultipleTreesBuilder;
import org.apache.pinot.core.startree.v2.builder.MultipleTreesBuilder.BuildMode;
@@ -185,15 +186,11 @@ abstract class BaseStarTreeV2Test<R, A> {
// Aggregations
AggregationFunction[] aggregationFunctions = queryContext.getAggregationFunctions();
- assert aggregationFunctions != null;
+ assertNotNull(aggregationFunctions);
int numAggregations = aggregationFunctions.length;
- List<AggregationFunctionColumnPair> functionColumnPairs = new ArrayList<>(numAggregations);
- for (AggregationFunction aggregationFunction : aggregationFunctions) {
- AggregationFunctionColumnPair aggregationFunctionColumnPair =
- AggregationFunctionUtils.getAggregationFunctionColumnPair(aggregationFunction);
- assertNotNull(aggregationFunctionColumnPair);
- functionColumnPairs.add(aggregationFunctionColumnPair);
- }
+ AggregationFunctionColumnPair[] aggregationFunctionColumnPairs =
+ StarTreeUtils.extractAggregationFunctionPairs(aggregationFunctions);
+ assertNotNull(aggregationFunctionColumnPairs);
// Group-by columns
Set<String> groupByColumnSet = new HashSet<>();
@@ -208,16 +205,15 @@ abstract class BaseStarTreeV2Test<R, A> {
// Filter
FilterContext filter = queryContext.getFilter();
+ Map<String, List<PredicateEvaluator>> predicateEvaluatorsMap =
+ StarTreeUtils.extractPredicateEvaluatorsMap(_indexSegment, filter);
+ assertNotNull(predicateEvaluatorsMap);
// Extract values with star-tree
- PlanNode starTreeFilterPlanNode;
- if (groupByColumns.isEmpty()) {
- starTreeFilterPlanNode = new StarTreeFilterPlanNode(_starTreeV2, filter, null, null);
- } else {
- starTreeFilterPlanNode = new StarTreeFilterPlanNode(_starTreeV2, filter, groupByColumnSet, null);
- }
+ PlanNode starTreeFilterPlanNode =
+ new StarTreeFilterPlanNode(_starTreeV2, predicateEvaluatorsMap, groupByColumnSet, null);
List<ForwardIndexReader> starTreeAggregationColumnReaders = new ArrayList<>(numAggregations);
- for (AggregationFunctionColumnPair aggregationFunctionColumnPair : functionColumnPairs) {
+ for (AggregationFunctionColumnPair aggregationFunctionColumnPair : aggregationFunctionColumnPairs) {
starTreeAggregationColumnReaders
.add(_starTreeV2.getDataSource(aggregationFunctionColumnPair.toColumnName()).getForwardIndex());
}
@@ -232,7 +228,7 @@ abstract class BaseStarTreeV2Test<R, A> {
PlanNode nonStarTreeFilterPlanNode = new FilterPlanNode(_indexSegment, queryContext);
List<ForwardIndexReader> nonStarTreeAggregationColumnReaders = new ArrayList<>(numAggregations);
List<Dictionary> nonStarTreeAggregationColumnDictionaries = new ArrayList<>(numAggregations);
- for (AggregationFunctionColumnPair aggregationFunctionColumnPair : functionColumnPairs) {
+ for (AggregationFunctionColumnPair aggregationFunctionColumnPair : aggregationFunctionColumnPairs) {
if (aggregationFunctionColumnPair.getFunctionType() == AggregationFunctionType.COUNT) {
nonStarTreeAggregationColumnReaders.add(null);
nonStarTreeAggregationColumnDictionaries.add(null);
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index f2bf7dc..0d3c3b0 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -477,6 +477,12 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
assertEquals(firstQueryResponse.get("totalDocs").asLong(), numTotalDocs);
assertEquals(firstQueryResponse.get("numDocsScanned").asInt(), NUM_SEGMENTS);
+ // Should be able to use the star-tree with an additional match-all predicate on another dimension
+ firstQueryResponse = postQuery(TEST_STAR_TREE_QUERY_1 + " AND DaysSinceEpoch > 16070");
+ assertEquals(firstQueryResponse.get("aggregationResults").get(0).get("value").asInt(), firstQueryResult);
+ assertEquals(firstQueryResponse.get("totalDocs").asLong(), numTotalDocs);
+ assertEquals(firstQueryResponse.get("numDocsScanned").asInt(), NUM_SEGMENTS);
+
// Test the second query
JsonNode secondQueryResponse = postQuery(TEST_STAR_TREE_QUERY_2);
int secondQueryResult = secondQueryResponse.get("aggregationResults").get(0).get("value").asInt();
@@ -518,6 +524,12 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
assertEquals(secondQueryResponse.get("totalDocs").asLong(), numTotalDocs);
assertEquals(secondQueryResponse.get("numDocsScanned").asInt(), NUM_SEGMENTS);
+ // Should be able to use the star-tree with an additional match-all predicate on another dimension
+ secondQueryResponse = postQuery(TEST_STAR_TREE_QUERY_2 + " AND DaysSinceEpoch > 16070");
+ assertEquals(secondQueryResponse.get("aggregationResults").get(0).get("value").asInt(), secondQueryResult);
+ assertEquals(secondQueryResponse.get("totalDocs").asLong(), numTotalDocs);
+ assertEquals(secondQueryResponse.get("numDocsScanned").asInt(), NUM_SEGMENTS);
+
// Remove the star-tree index config and trigger reload
indexingConfig.setStarTreeIndexConfigs(null);
updateTableConfig(tableConfig);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org