You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2020/10/06 01:51:40 UTC

[incubator-pinot] 01/01: Enhance star-tree to skip matching-all predicate on non-star-tree dimension

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch enhance_star_tree
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 6519f22df04d8174715530f16c101ae86841024b
Author: Xiaotian (Jackie) Jiang <ja...@gmail.com>
AuthorDate: Mon Oct 5 18:44:09 2020 -0700

    Enhance star-tree to skip matching-all predicate on non-star-tree dimension
---
 .../plan/AggregationGroupByOrderByPlanNode.java    |  34 ++---
 .../core/plan/AggregationGroupByPlanNode.java      |  34 ++---
 .../pinot/core/plan/AggregationPlanNode.java       |  36 ++---
 .../apache/pinot/core/startree/StarTreeUtils.java  | 147 +++++++++++++++------
 .../startree/operator/StarTreeFilterOperator.java  | 101 +++-----------
 .../startree/plan/StarTreeDocIdSetPlanNode.java    |   8 +-
 .../core/startree/plan/StarTreeFilterPlanNode.java |  13 +-
 .../startree/plan/StarTreeProjectionPlanNode.java  |  10 +-
 .../startree/plan/StarTreeTransformPlanNode.java   |   9 +-
 .../pinot/core/startree/v2/BaseStarTreeV2Test.java |  30 ++---
 .../tests/OfflineClusterIntegrationTest.java       |  12 ++
 11 files changed, 210 insertions(+), 224 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationGroupByOrderByPlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationGroupByOrderByPlanNode.java
index 5cb94bd..892a96d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationGroupByOrderByPlanNode.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationGroupByOrderByPlanNode.java
@@ -19,13 +19,14 @@
 package org.apache.pinot.core.plan;
 
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import org.apache.pinot.core.indexsegment.IndexSegment;
+import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator;
 import org.apache.pinot.core.operator.query.AggregationGroupByOrderByOperator;
 import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
 import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils;
 import org.apache.pinot.core.query.request.context.ExpressionContext;
-import org.apache.pinot.core.query.request.context.FilterContext;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.core.startree.StarTreeUtils;
 import org.apache.pinot.core.startree.plan.StarTreeTransformPlanNode;
@@ -59,32 +60,21 @@ public class AggregationGroupByOrderByPlanNode implements PlanNode {
     _groupByExpressions = groupByExpressions.toArray(new ExpressionContext[0]);
 
     List<StarTreeV2> starTrees = indexSegment.getStarTrees();
-    if (starTrees != null) {
-      if (!StarTreeUtils.isStarTreeDisabled(queryContext)) {
-        int numAggregationFunctions = _aggregationFunctions.length;
-        AggregationFunctionColumnPair[] aggregationFunctionColumnPairs =
-            new AggregationFunctionColumnPair[numAggregationFunctions];
-        boolean hasUnsupportedAggregationFunction = false;
-        for (int i = 0; i < numAggregationFunctions; i++) {
-          AggregationFunctionColumnPair aggregationFunctionColumnPair =
-              AggregationFunctionUtils.getAggregationFunctionColumnPair(_aggregationFunctions[i]);
-          if (aggregationFunctionColumnPair != null) {
-            aggregationFunctionColumnPairs[i] = aggregationFunctionColumnPair;
-          } else {
-            hasUnsupportedAggregationFunction = true;
-            break;
-          }
-        }
-        if (!hasUnsupportedAggregationFunction) {
-          FilterContext filter = queryContext.getFilter();
+    if (starTrees != null && !StarTreeUtils.isStarTreeDisabled(queryContext)) {
+      AggregationFunctionColumnPair[] aggregationFunctionColumnPairs =
+          StarTreeUtils.extractAggregationFunctionPairs(_aggregationFunctions);
+      if (aggregationFunctionColumnPairs != null) {
+        Map<String, List<PredicateEvaluator>> predicateEvaluatorsMap =
+            StarTreeUtils.extractPredicateEvaluatorsMap(indexSegment, queryContext.getFilter());
+        if (predicateEvaluatorsMap != null) {
           for (StarTreeV2 starTreeV2 : starTrees) {
             if (StarTreeUtils
                 .isFitForStarTree(starTreeV2.getMetadata(), aggregationFunctionColumnPairs, _groupByExpressions,
-                    filter)) {
+                    predicateEvaluatorsMap.keySet())) {
               _transformPlanNode = null;
               _starTreeTransformPlanNode =
-                  new StarTreeTransformPlanNode(starTreeV2, aggregationFunctionColumnPairs, _groupByExpressions, filter,
-                      queryContext.getDebugOptions());
+                  new StarTreeTransformPlanNode(starTreeV2, aggregationFunctionColumnPairs, _groupByExpressions,
+                      predicateEvaluatorsMap, queryContext.getDebugOptions());
               return;
             }
           }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationGroupByPlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationGroupByPlanNode.java
index acce657..e2b3ea8 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationGroupByPlanNode.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationGroupByPlanNode.java
@@ -19,13 +19,14 @@
 package org.apache.pinot.core.plan;
 
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import org.apache.pinot.core.indexsegment.IndexSegment;
+import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator;
 import org.apache.pinot.core.operator.query.AggregationGroupByOperator;
 import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
 import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils;
 import org.apache.pinot.core.query.request.context.ExpressionContext;
-import org.apache.pinot.core.query.request.context.FilterContext;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.core.startree.StarTreeUtils;
 import org.apache.pinot.core.startree.plan.StarTreeTransformPlanNode;
@@ -59,32 +60,21 @@ public class AggregationGroupByPlanNode implements PlanNode {
     _groupByExpressions = groupByExpressions.toArray(new ExpressionContext[0]);
 
     List<StarTreeV2> starTrees = indexSegment.getStarTrees();
-    if (starTrees != null) {
-      if (!StarTreeUtils.isStarTreeDisabled(queryContext)) {
-        int numAggregationFunctions = _aggregationFunctions.length;
-        AggregationFunctionColumnPair[] aggregationFunctionColumnPairs =
-            new AggregationFunctionColumnPair[numAggregationFunctions];
-        boolean hasUnsupportedAggregationFunction = false;
-        for (int i = 0; i < numAggregationFunctions; i++) {
-          AggregationFunctionColumnPair aggregationFunctionColumnPair =
-              AggregationFunctionUtils.getAggregationFunctionColumnPair(_aggregationFunctions[i]);
-          if (aggregationFunctionColumnPair != null) {
-            aggregationFunctionColumnPairs[i] = aggregationFunctionColumnPair;
-          } else {
-            hasUnsupportedAggregationFunction = true;
-            break;
-          }
-        }
-        if (!hasUnsupportedAggregationFunction) {
-          FilterContext filter = queryContext.getFilter();
+    if (starTrees != null && !StarTreeUtils.isStarTreeDisabled(queryContext)) {
+      AggregationFunctionColumnPair[] aggregationFunctionColumnPairs =
+          StarTreeUtils.extractAggregationFunctionPairs(_aggregationFunctions);
+      if (aggregationFunctionColumnPairs != null) {
+        Map<String, List<PredicateEvaluator>> predicateEvaluatorsMap =
+            StarTreeUtils.extractPredicateEvaluatorsMap(indexSegment, queryContext.getFilter());
+        if (predicateEvaluatorsMap != null) {
           for (StarTreeV2 starTreeV2 : starTrees) {
             if (StarTreeUtils
                 .isFitForStarTree(starTreeV2.getMetadata(), aggregationFunctionColumnPairs, _groupByExpressions,
-                    filter)) {
+                    predicateEvaluatorsMap.keySet())) {
               _transformPlanNode = null;
               _starTreeTransformPlanNode =
-                  new StarTreeTransformPlanNode(starTreeV2, aggregationFunctionColumnPairs, _groupByExpressions, filter,
-                      queryContext.getDebugOptions());
+                  new StarTreeTransformPlanNode(starTreeV2, aggregationFunctionColumnPairs, _groupByExpressions,
+                      predicateEvaluatorsMap, queryContext.getDebugOptions());
               return;
             }
           }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java
index 4a267a2..a224125 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java
@@ -19,13 +19,14 @@
 package org.apache.pinot.core.plan;
 
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import org.apache.pinot.core.indexsegment.IndexSegment;
+import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator;
 import org.apache.pinot.core.operator.query.AggregationOperator;
 import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
 import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils;
 import org.apache.pinot.core.query.request.context.ExpressionContext;
-import org.apache.pinot.core.query.request.context.FilterContext;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.core.startree.StarTreeUtils;
 import org.apache.pinot.core.startree.plan.StarTreeTransformPlanNode;
@@ -50,31 +51,20 @@ public class AggregationPlanNode implements PlanNode {
     assert _aggregationFunctions != null;
 
     List<StarTreeV2> starTrees = indexSegment.getStarTrees();
-    if (starTrees != null) {
-      if (!StarTreeUtils.isStarTreeDisabled(queryContext)) {
-        int numAggregationFunctions = _aggregationFunctions.length;
-        AggregationFunctionColumnPair[] aggregationFunctionColumnPairs =
-            new AggregationFunctionColumnPair[numAggregationFunctions];
-        boolean hasUnsupportedAggregationFunction = false;
-        for (int i = 0; i < numAggregationFunctions; i++) {
-          AggregationFunctionColumnPair aggregationFunctionColumnPair =
-              AggregationFunctionUtils.getAggregationFunctionColumnPair(_aggregationFunctions[i]);
-          if (aggregationFunctionColumnPair != null) {
-            aggregationFunctionColumnPairs[i] = aggregationFunctionColumnPair;
-          } else {
-            hasUnsupportedAggregationFunction = true;
-            break;
-          }
-        }
-        if (!hasUnsupportedAggregationFunction) {
-          FilterContext filter = queryContext.getFilter();
+    if (starTrees != null && !StarTreeUtils.isStarTreeDisabled(queryContext)) {
+      AggregationFunctionColumnPair[] aggregationFunctionColumnPairs =
+          StarTreeUtils.extractAggregationFunctionPairs(_aggregationFunctions);
+      if (aggregationFunctionColumnPairs != null) {
+        Map<String, List<PredicateEvaluator>> predicateEvaluatorsMap =
+            StarTreeUtils.extractPredicateEvaluatorsMap(indexSegment, queryContext.getFilter());
+        if (predicateEvaluatorsMap != null) {
           for (StarTreeV2 starTreeV2 : starTrees) {
-            if (StarTreeUtils
-                .isFitForStarTree(starTreeV2.getMetadata(), aggregationFunctionColumnPairs, null, filter)) {
+            if (StarTreeUtils.isFitForStarTree(starTreeV2.getMetadata(), aggregationFunctionColumnPairs, null,
+                predicateEvaluatorsMap.keySet())) {
               _transformPlanNode = null;
               _starTreeTransformPlanNode =
-                  new StarTreeTransformPlanNode(starTreeV2, aggregationFunctionColumnPairs, null, filter,
-                      queryContext.getDebugOptions());
+                  new StarTreeTransformPlanNode(starTreeV2, aggregationFunctionColumnPairs, null,
+                      predicateEvaluatorsMap, queryContext.getDebugOptions());
               return;
             }
           }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/startree/StarTreeUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/startree/StarTreeUtils.java
index ed98e98..c7340b2 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/startree/StarTreeUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/startree/StarTreeUtils.java
@@ -20,18 +20,30 @@ package org.apache.pinot.core.startree;
 
 import java.io.File;
 import java.io.FileOutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Queue;
 import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.commons.io.FileUtils;
+import org.apache.pinot.core.common.DataSource;
+import org.apache.pinot.core.indexsegment.IndexSegment;
+import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator;
+import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluatorProvider;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils;
 import org.apache.pinot.core.query.request.context.ExpressionContext;
 import org.apache.pinot.core.query.request.context.FilterContext;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.core.query.request.context.predicate.Predicate;
 import org.apache.pinot.core.segment.creator.impl.V1Constants;
+import org.apache.pinot.core.segment.index.readers.Dictionary;
 import org.apache.pinot.core.segment.store.SegmentDirectoryPaths;
 import org.apache.pinot.core.startree.v2.AggregationFunctionColumnPair;
 import org.apache.pinot.core.startree.v2.StarTreeV2Constants;
@@ -40,6 +52,7 @@ import org.apache.pinot.core.startree.v2.builder.StarTreeV2BuilderConfig;
 import org.apache.pinot.spi.env.CommonsConfigurationUtils;
 
 
+@SuppressWarnings("rawtypes")
 public class StarTreeUtils {
   private StarTreeUtils() {
   }
@@ -55,17 +68,103 @@ public class StarTreeUtils {
   }
 
   /**
+   * Extracts the {@link AggregationFunctionColumnPair}s from the given {@link AggregationFunction}s. Returns
+   * {@code null} if any {@link AggregationFunction} cannot be represented as an {@link AggregationFunctionColumnPair}
+   * (e.g. has multiple arguments, argument is not column etc.).
+   */
+  @Nullable
+  public static AggregationFunctionColumnPair[] extractAggregationFunctionPairs(
+      AggregationFunction[] aggregationFunctions) {
+    int numAggregationFunctions = aggregationFunctions.length;
+    AggregationFunctionColumnPair[] aggregationFunctionColumnPairs =
+        new AggregationFunctionColumnPair[numAggregationFunctions];
+    for (int i = 0; i < numAggregationFunctions; i++) {
+      AggregationFunctionColumnPair aggregationFunctionColumnPair =
+          AggregationFunctionUtils.getAggregationFunctionColumnPair(aggregationFunctions[i]);
+      if (aggregationFunctionColumnPair != null) {
+        aggregationFunctionColumnPairs[i] = aggregationFunctionColumnPair;
+      } else {
+        return null;
+      }
+    }
+    return aggregationFunctionColumnPairs;
+  }
+
+  /**
+   * Extracts a map from the column to a list of {@link PredicateEvaluator}s for it. Returns {@code null} if the filter
+   * cannot be solved by the star-tree.
+   */
+  @Nullable
+  public static Map<String, List<PredicateEvaluator>> extractPredicateEvaluatorsMap(IndexSegment indexSegment,
+      @Nullable FilterContext filter) {
+    if (filter == null) {
+      return Collections.emptyMap();
+    }
+
+    Map<String, List<PredicateEvaluator>> predicateEvaluatorsMap = new HashMap<>();
+    Queue<FilterContext> queue = new LinkedList<>();
+    queue.add(filter);
+    FilterContext filterNode;
+    while ((filterNode = queue.poll()) != null) {
+      switch (filterNode.getType()) {
+        case AND:
+          queue.addAll(filterNode.getChildren());
+          break;
+        case OR:
+          // Star-tree does not support OR filter
+          return null;
+        case PREDICATE:
+          Predicate predicate = filterNode.getPredicate();
+          ExpressionContext lhs = predicate.getLhs();
+          if (lhs.getType() != ExpressionContext.Type.IDENTIFIER) {
+            // Star-tree does not support non-identifier expression
+            return null;
+          }
+          String column = lhs.getIdentifier();
+          DataSource dataSource = indexSegment.getDataSource(column);
+          Dictionary dictionary = dataSource.getDictionary();
+          if (dictionary == null) {
+            // Star-tree does not support non-dictionary encoded dimension
+            return null;
+          }
+          switch (predicate.getType()) {
+            // Do not use star-tree for the following predicates because:
+            //   - REGEXP_LIKE: Need to scan the whole dictionary to gather the matching dictionary ids
+            //   - TEXT_MATCH/IS_NULL/IS_NOT_NULL: No way to gather the matching dictionary ids
+            case REGEXP_LIKE:
+            case TEXT_MATCH:
+            case IS_NULL:
+            case IS_NOT_NULL:
+              return null;
+          }
+          PredicateEvaluator predicateEvaluator = PredicateEvaluatorProvider
+              .getPredicateEvaluator(predicate, dictionary, dataSource.getDataSourceMetadata().getDataType());
+          if (predicateEvaluator.isAlwaysFalse()) {
+            // Do not use star-tree if there is no matching record
+            return null;
+          }
+          if (!predicateEvaluator.isAlwaysTrue()) {
+            predicateEvaluatorsMap.computeIfAbsent(column, k -> new ArrayList<>()).add(predicateEvaluator);
+          }
+          break;
+        default:
+          throw new IllegalStateException();
+      }
+    }
+    return predicateEvaluatorsMap;
+  }
+
+  /**
    * Returns whether the query is fit for star tree index.
    * <p>The query is fit for star tree index if the following conditions are met:
    * <ul>
    *   <li>Star-tree contains all aggregation function column pairs</li>
    *   <li>All predicate columns and group-by columns are star-tree dimensions</li>
-   *   <li>All predicates are conjoined by AND</li>
    * </ul>
    */
   public static boolean isFitForStarTree(StarTreeV2Metadata starTreeV2Metadata,
       AggregationFunctionColumnPair[] aggregationFunctionColumnPairs, @Nullable ExpressionContext[] groupByExpressions,
-      @Nullable FilterContext filter) {
+      Set<String> predicateColumns) {
     // Check aggregations
     for (AggregationFunctionColumnPair aggregationFunctionColumnPair : aggregationFunctionColumnPairs) {
       if (!starTreeV2Metadata.containsFunctionColumnPair(aggregationFunctionColumnPair)) {
@@ -73,8 +172,9 @@ public class StarTreeUtils {
       }
     }
 
-    // Check group-by expressions
     Set<String> starTreeDimensions = new HashSet<>(starTreeV2Metadata.getDimensionsSplitOrder());
+
+    // Check group-by expressions
     if (groupByExpressions != null) {
       Set<String> groupByColumns = new HashSet<>();
       for (ExpressionContext groupByExpression : groupByExpressions) {
@@ -85,45 +185,8 @@ public class StarTreeUtils {
       }
     }
 
-    // Check filters
-    return filter == null || checkFilters(filter, starTreeDimensions);
-  }
-
-  /**
-   * Helper method to check whether all columns in predicates are star-tree dimensions, and all predicates are
-   * conjoined by AND.
-   */
-  private static boolean checkFilters(FilterContext filter, Set<String> starTreeDimensions) {
-    switch (filter.getType()) {
-      case AND:
-        for (FilterContext child : filter.getChildren()) {
-          if (!checkFilters(child, starTreeDimensions)) {
-            return false;
-          }
-        }
-        return true;
-      case OR:
-        return false;
-      case PREDICATE:
-        Predicate predicate = filter.getPredicate();
-        ExpressionContext lhs = predicate.getLhs();
-        if (lhs.getType() != ExpressionContext.Type.IDENTIFIER) {
-          return false;
-        }
-        switch (predicate.getType()) {
-          // NOTE: Do not use star-tree for the following predicates because:
-          //       - REGEXP_LIKE: Need to scan the whole dictionary to gather the matching dictionary ids
-          //       - TEXT_MATCH/IS_NULL/IS_NOT_NULL: No way to gather the matching dictionary ids
-          case REGEXP_LIKE:
-          case TEXT_MATCH:
-          case IS_NULL:
-          case IS_NOT_NULL:
-            return false;
-        }
-        return starTreeDimensions.contains(lhs.getIdentifier());
-      default:
-        throw new IllegalStateException();
-    }
+    // Check predicate columns
+    return starTreeDimensions.containsAll(predicateColumns);
   }
 
   /**
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/startree/operator/StarTreeFilterOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/startree/operator/StarTreeFilterOperator.java
index 32d0b57..2037b71 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/startree/operator/StarTreeFilterOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/startree/operator/StarTreeFilterOperator.java
@@ -22,7 +22,6 @@ import it.unimi.dsi.fastutil.ints.IntIterator;
 import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
 import it.unimi.dsi.fastutil.ints.IntSet;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -41,9 +40,6 @@ import org.apache.pinot.core.operator.filter.BitmapBasedFilterOperator;
 import org.apache.pinot.core.operator.filter.EmptyFilterOperator;
 import org.apache.pinot.core.operator.filter.FilterOperatorUtils;
 import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator;
-import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluatorProvider;
-import org.apache.pinot.core.query.request.context.FilterContext;
-import org.apache.pinot.core.query.request.context.predicate.Predicate;
 import org.apache.pinot.core.startree.StarTree;
 import org.apache.pinot.core.startree.StarTreeNode;
 import org.apache.pinot.core.startree.v2.StarTreeV2;
@@ -121,81 +117,23 @@ public class StarTreeFilterOperator extends BaseFilterOperator {
 
   // Star-tree
   private final StarTreeV2 _starTreeV2;
-  // Set of group-by columns
-  private final Set<String> _groupByColumns;
   // Map from column to predicate evaluators
   private final Map<String, List<PredicateEvaluator>> _predicateEvaluatorsMap;
-  // Map from column to matching dictionary ids
-  private final Map<String, IntSet> _matchingDictIdsMap;
+  // Set of group-by columns
+  private final Set<String> _groupByColumns;
 
   private final Map<String, String> _debugOptions;
   boolean _resultEmpty = false;
 
-  public StarTreeFilterOperator(StarTreeV2 starTreeV2, @Nullable FilterContext filter,
-      @Nullable Set<String> groupByColumns, @Nullable Map<String, String> debugOptions) {
+  public StarTreeFilterOperator(StarTreeV2 starTreeV2, Map<String, List<PredicateEvaluator>> predicateEvaluatorsMap,
+      Set<String> groupByColumns, @Nullable Map<String, String> debugOptions) {
     _starTreeV2 = starTreeV2;
-    _groupByColumns = groupByColumns != null ? new HashSet<>(groupByColumns) : Collections.emptySet();
+    _predicateEvaluatorsMap = predicateEvaluatorsMap;
+    _groupByColumns = groupByColumns;
     _debugOptions = debugOptions;
 
-    if (filter != null) {
-      _predicateEvaluatorsMap = new HashMap<>();
-      _matchingDictIdsMap = new HashMap<>();
-
-      // Process the filter tree and get a map from column to a list of predicates applied to it
-      Map<String, List<Predicate>> predicatesMap = getPredicatesMap(filter);
-
-      // Initialize the predicate evaluators map
-      for (Map.Entry<String, List<Predicate>> entry : predicatesMap.entrySet()) {
-        String columnName = entry.getKey();
-        List<Predicate> predicates = entry.getValue();
-        List<PredicateEvaluator> predicateEvaluators = new ArrayList<>();
-
-        DataSource dataSource = starTreeV2.getDataSource(columnName);
-        for (Predicate predicate : predicates) {
-          PredicateEvaluator predicateEvaluator = PredicateEvaluatorProvider
-              .getPredicateEvaluator(predicate, dataSource.getDictionary(),
-                  dataSource.getDataSourceMetadata().getDataType());
-          // If predicate is always evaluated false, the result for the filter operator will be empty, early terminate
-          if (predicateEvaluator.isAlwaysFalse()) {
-            _resultEmpty = true;
-            return;
-          } else if (!predicateEvaluator.isAlwaysTrue()) {
-            predicateEvaluators.add(predicateEvaluator);
-          }
-        }
-        if (!predicateEvaluators.isEmpty()) {
-          _predicateEvaluatorsMap.put(columnName, predicateEvaluators);
-        }
-      }
-
-      // Remove columns with predicates from group-by columns because we won't use star node for that column
-      _groupByColumns.removeAll(_predicateEvaluatorsMap.keySet());
-    } else {
-      _predicateEvaluatorsMap = Collections.emptyMap();
-      _matchingDictIdsMap = Collections.emptyMap();
-    }
-  }
-
-  /**
-   * Helper method to process the filter tree and get a map from column to a list of predicates applied to it.
-   */
-  private Map<String, List<Predicate>> getPredicatesMap(FilterContext filter) {
-    Map<String, List<Predicate>> predicatesMap = new HashMap<>();
-    Queue<FilterContext> queue = new LinkedList<>();
-    queue.add(filter);
-
-    while (!queue.isEmpty()) {
-      FilterContext filterNode = queue.remove();
-      if (filterNode.getType() == FilterContext.Type.AND) {
-        queue.addAll(filterNode.getChildren());
-      } else {
-        Predicate predicate = filterNode.getPredicate();
-        String columnName = predicate.getLhs().getIdentifier();
-        predicatesMap.computeIfAbsent(columnName, k -> new ArrayList<>()).add(predicate);
-      }
-    }
-
-    return predicatesMap;
+    // Remove columns with predicates from group-by columns because we won't use star node for that column
+    _groupByColumns.removeAll(_predicateEvaluatorsMap.keySet());
   }
 
   @Override
@@ -252,13 +190,14 @@ public class StarTreeFilterOperator extends BaseFilterOperator {
 
   /**
    * Helper method to traverse the star tree, get matching documents and keep track of all the predicate columns that
-   * are not matched.
-   * <p>Return <code>null</code> if no matching dictionary id found for a column (i.e. the result for the filter
-   * operator is empty).
+   * are not matched. Returns {@code null} if no matching dictionary id found for a column (i.e. the result for the
+   * filter operator is empty).
    */
+  @Nullable
   private StarTreeResult traverseStarTree() {
-    MutableRoaringBitmap matchedDocIds = new MutableRoaringBitmap();
+    MutableRoaringBitmap matchingDocIds = new MutableRoaringBitmap();
     Set<String> remainingPredicateColumns = new HashSet<>();
+    Map<String, IntSet> matchingDictIdsMap = new HashMap<>();
 
     StarTree starTree = _starTreeV2.getStarTree();
     List<String> dimensionNames = starTree.getDimensionNames();
@@ -267,19 +206,19 @@ public class StarTreeFilterOperator extends BaseFilterOperator {
     // Use BFS to traverse the star tree
     Queue<SearchEntry> queue = new LinkedList<>();
     queue.add(new SearchEntry(starTreeRootNode, _predicateEvaluatorsMap.keySet(), _groupByColumns));
-    while (!queue.isEmpty()) {
-      SearchEntry searchEntry = queue.remove();
+    SearchEntry searchEntry;
+    while ((searchEntry = queue.poll()) != null) {
       StarTreeNode starTreeNode = searchEntry._starTreeNode;
 
       // If all predicate columns and group-by columns are matched, we can use aggregated document
       if (searchEntry._remainingPredicateColumns.isEmpty() && searchEntry._remainingGroupByColumns.isEmpty()) {
-        matchedDocIds.add(starTreeNode.getAggregatedDocId());
+        matchingDocIds.add(starTreeNode.getAggregatedDocId());
       } else {
         // For leaf node, because we haven't exhausted all predicate columns and group-by columns, we cannot use
         // the aggregated document. Add the range of documents for this node to the bitmap, and keep track of the
         // remaining predicate columns for this node
         if (starTreeNode.isLeaf()) {
-          matchedDocIds.add(starTreeNode.getStartDocId(), starTreeNode.getEndDocId());
+          matchingDocIds.add((long) starTreeNode.getStartDocId(), starTreeNode.getEndDocId());
           remainingPredicateColumns.addAll(searchEntry._remainingPredicateColumns);
         } else {
           // For non-leaf node, proceed to next level
@@ -290,7 +229,7 @@ public class StarTreeFilterOperator extends BaseFilterOperator {
             Set<String> newRemainingPredicateColumns = new HashSet<>(searchEntry._remainingPredicateColumns);
             newRemainingPredicateColumns.remove(nextDimension);
 
-            IntSet matchingDictIds = _matchingDictIdsMap.get(nextDimension);
+            IntSet matchingDictIds = matchingDictIdsMap.get(nextDimension);
             if (matchingDictIds == null) {
               matchingDictIds = getMatchingDictIds(_predicateEvaluatorsMap.get(nextDimension));
 
@@ -299,7 +238,7 @@ public class StarTreeFilterOperator extends BaseFilterOperator {
                 return null;
               }
 
-              _matchingDictIdsMap.put(nextDimension, matchingDictIds);
+              matchingDictIdsMap.put(nextDimension, matchingDictIds);
             }
 
             int numMatchingDictIds = matchingDictIds.size();
@@ -358,7 +297,7 @@ public class StarTreeFilterOperator extends BaseFilterOperator {
       }
     }
 
-    return new StarTreeResult(matchedDocIds, remainingPredicateColumns);
+    return new StarTreeResult(matchingDocIds, remainingPredicateColumns);
   }
 
   /**
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/startree/plan/StarTreeDocIdSetPlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/startree/plan/StarTreeDocIdSetPlanNode.java
index c418d21..4fe485d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/startree/plan/StarTreeDocIdSetPlanNode.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/startree/plan/StarTreeDocIdSetPlanNode.java
@@ -18,22 +18,24 @@
  */
 package org.apache.pinot.core.startree.plan;
 
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.pinot.core.operator.DocIdSetOperator;
+import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator;
 import org.apache.pinot.core.plan.DocIdSetPlanNode;
 import org.apache.pinot.core.plan.PlanNode;
-import org.apache.pinot.core.query.request.context.FilterContext;
 import org.apache.pinot.core.startree.v2.StarTreeV2;
 
 
 public class StarTreeDocIdSetPlanNode implements PlanNode {
   private final StarTreeFilterPlanNode _starTreeFilterPlanNode;
 
-  public StarTreeDocIdSetPlanNode(StarTreeV2 starTreeV2, @Nullable FilterContext filter,
+  public StarTreeDocIdSetPlanNode(StarTreeV2 starTreeV2, Map<String, List<PredicateEvaluator>> predicateEvaluatorsMap,
       @Nullable Set<String> groupByColumns, @Nullable Map<String, String> debugOptions) {
-    _starTreeFilterPlanNode = new StarTreeFilterPlanNode(starTreeV2, filter, groupByColumns, debugOptions);
+    _starTreeFilterPlanNode =
+        new StarTreeFilterPlanNode(starTreeV2, predicateEvaluatorsMap, groupByColumns, debugOptions);
   }
 
   @Override
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/startree/plan/StarTreeFilterPlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/startree/plan/StarTreeFilterPlanNode.java
index 9628086..82b3391 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/startree/plan/StarTreeFilterPlanNode.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/startree/plan/StarTreeFilterPlanNode.java
@@ -18,31 +18,32 @@
  */
 package org.apache.pinot.core.startree.plan;
 
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import javax.annotation.Nullable;
+import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator;
 import org.apache.pinot.core.plan.PlanNode;
-import org.apache.pinot.core.query.request.context.FilterContext;
 import org.apache.pinot.core.startree.operator.StarTreeFilterOperator;
 import org.apache.pinot.core.startree.v2.StarTreeV2;
 
 
 public class StarTreeFilterPlanNode implements PlanNode {
   private final StarTreeV2 _starTreeV2;
-  private final FilterContext _filter;
+  private final Map<String, List<PredicateEvaluator>> _predicateEvaluatorsMap;
   private final Set<String> _groupByColumns;
   private final Map<String, String> _debugOptions;
 
-  public StarTreeFilterPlanNode(StarTreeV2 starTreeV2, @Nullable FilterContext filter,
-      @Nullable Set<String> groupByColumns, @Nullable Map<String, String> debugOptions) {
+  public StarTreeFilterPlanNode(StarTreeV2 starTreeV2, Map<String, List<PredicateEvaluator>> predicateEvaluatorsMap,
+      Set<String> groupByColumns, @Nullable Map<String, String> debugOptions) {
     _starTreeV2 = starTreeV2;
-    _filter = filter;
+    _predicateEvaluatorsMap = predicateEvaluatorsMap;
     _groupByColumns = groupByColumns;
     _debugOptions = debugOptions;
   }
 
   @Override
   public StarTreeFilterOperator run() {
-    return new StarTreeFilterOperator(_starTreeV2, _filter, _groupByColumns, _debugOptions);
+    return new StarTreeFilterOperator(_starTreeV2, _predicateEvaluatorsMap, _groupByColumns, _debugOptions);
   }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/startree/plan/StarTreeProjectionPlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/startree/plan/StarTreeProjectionPlanNode.java
index e6d482d..7857e57 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/startree/plan/StarTreeProjectionPlanNode.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/startree/plan/StarTreeProjectionPlanNode.java
@@ -19,13 +19,14 @@
 package org.apache.pinot.core.startree.plan;
 
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.pinot.core.common.DataSource;
 import org.apache.pinot.core.operator.ProjectionOperator;
+import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator;
 import org.apache.pinot.core.plan.PlanNode;
-import org.apache.pinot.core.query.request.context.FilterContext;
 import org.apache.pinot.core.startree.v2.StarTreeV2;
 
 
@@ -34,13 +35,14 @@ public class StarTreeProjectionPlanNode implements PlanNode {
   private final StarTreeDocIdSetPlanNode _starTreeDocIdSetPlanNode;
 
   public StarTreeProjectionPlanNode(StarTreeV2 starTreeV2, Set<String> projectionColumns,
-      @Nullable FilterContext filter, @Nullable Set<String> groupByColumns,
+      Map<String, List<PredicateEvaluator>> predicateEvaluatorsMap, Set<String> groupByColumns,
       @Nullable Map<String, String> debugOptions) {
-    _dataSourceMap = new HashMap<>(projectionColumns.size());
+    _dataSourceMap = new HashMap<>();
     for (String projectionColumn : projectionColumns) {
       _dataSourceMap.put(projectionColumn, starTreeV2.getDataSource(projectionColumn));
     }
-    _starTreeDocIdSetPlanNode = new StarTreeDocIdSetPlanNode(starTreeV2, filter, groupByColumns, debugOptions);
+    _starTreeDocIdSetPlanNode =
+        new StarTreeDocIdSetPlanNode(starTreeV2, predicateEvaluatorsMap, groupByColumns, debugOptions);
   }
 
   @Override
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/startree/plan/StarTreeTransformPlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/startree/plan/StarTreeTransformPlanNode.java
index 8f50fa5..6a316fd 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/startree/plan/StarTreeTransformPlanNode.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/startree/plan/StarTreeTransformPlanNode.java
@@ -25,10 +25,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import javax.annotation.Nullable;
+import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator;
 import org.apache.pinot.core.operator.transform.TransformOperator;
 import org.apache.pinot.core.plan.PlanNode;
 import org.apache.pinot.core.query.request.context.ExpressionContext;
-import org.apache.pinot.core.query.request.context.FilterContext;
 import org.apache.pinot.core.startree.v2.AggregationFunctionColumnPair;
 import org.apache.pinot.core.startree.v2.StarTreeV2;
 
@@ -39,7 +39,7 @@ public class StarTreeTransformPlanNode implements PlanNode {
 
   public StarTreeTransformPlanNode(StarTreeV2 starTreeV2,
       AggregationFunctionColumnPair[] aggregationFunctionColumnPairs, @Nullable ExpressionContext[] groupByExpressions,
-      @Nullable FilterContext filter, @Nullable Map<String, String> debugOptions) {
+      Map<String, List<PredicateEvaluator>> predicateEvaluatorsMap, @Nullable Map<String, String> debugOptions) {
     Set<String> projectionColumns = new HashSet<>();
     for (AggregationFunctionColumnPair aggregationFunctionColumnPair : aggregationFunctionColumnPairs) {
       projectionColumns.add(aggregationFunctionColumnPair.toColumnName());
@@ -54,10 +54,11 @@ public class StarTreeTransformPlanNode implements PlanNode {
       projectionColumns.addAll(groupByColumns);
     } else {
       _groupByExpressions = Collections.emptyList();
-      groupByColumns = null;
+      groupByColumns = Collections.emptySet();
     }
     _starTreeProjectionPlanNode =
-        new StarTreeProjectionPlanNode(starTreeV2, projectionColumns, filter, groupByColumns, debugOptions);
+        new StarTreeProjectionPlanNode(starTreeV2, projectionColumns, predicateEvaluatorsMap, groupByColumns,
+            debugOptions);
   }
 
   @Override
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/BaseStarTreeV2Test.java b/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/BaseStarTreeV2Test.java
index 7b46277..4ea61cf 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/BaseStarTreeV2Test.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/BaseStarTreeV2Test.java
@@ -40,10 +40,10 @@ import org.apache.pinot.core.data.readers.GenericRowRecordReader;
 import org.apache.pinot.core.indexsegment.IndexSegment;
 import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
 import org.apache.pinot.core.indexsegment.immutable.ImmutableSegmentLoader;
+import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator;
 import org.apache.pinot.core.plan.FilterPlanNode;
 import org.apache.pinot.core.plan.PlanNode;
 import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
-import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils;
 import org.apache.pinot.core.query.request.context.ExpressionContext;
 import org.apache.pinot.core.query.request.context.FilterContext;
 import org.apache.pinot.core.query.request.context.QueryContext;
@@ -52,6 +52,7 @@ import org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl
 import org.apache.pinot.core.segment.index.readers.Dictionary;
 import org.apache.pinot.core.segment.index.readers.ForwardIndexReader;
 import org.apache.pinot.core.segment.index.readers.ForwardIndexReaderContext;
+import org.apache.pinot.core.startree.StarTreeUtils;
 import org.apache.pinot.core.startree.plan.StarTreeFilterPlanNode;
 import org.apache.pinot.core.startree.v2.builder.MultipleTreesBuilder;
 import org.apache.pinot.core.startree.v2.builder.MultipleTreesBuilder.BuildMode;
@@ -185,15 +186,11 @@ abstract class BaseStarTreeV2Test<R, A> {
 
     // Aggregations
     AggregationFunction[] aggregationFunctions = queryContext.getAggregationFunctions();
-    assert aggregationFunctions != null;
+    assertNotNull(aggregationFunctions);
     int numAggregations = aggregationFunctions.length;
-    List<AggregationFunctionColumnPair> functionColumnPairs = new ArrayList<>(numAggregations);
-    for (AggregationFunction aggregationFunction : aggregationFunctions) {
-      AggregationFunctionColumnPair aggregationFunctionColumnPair =
-          AggregationFunctionUtils.getAggregationFunctionColumnPair(aggregationFunction);
-      assertNotNull(aggregationFunctionColumnPair);
-      functionColumnPairs.add(aggregationFunctionColumnPair);
-    }
+    AggregationFunctionColumnPair[] aggregationFunctionColumnPairs =
+        StarTreeUtils.extractAggregationFunctionPairs(aggregationFunctions);
+    assertNotNull(aggregationFunctionColumnPairs);
 
     // Group-by columns
     Set<String> groupByColumnSet = new HashSet<>();
@@ -208,16 +205,15 @@ abstract class BaseStarTreeV2Test<R, A> {
 
     // Filter
     FilterContext filter = queryContext.getFilter();
+    Map<String, List<PredicateEvaluator>> predicateEvaluatorsMap =
+        StarTreeUtils.extractPredicateEvaluatorsMap(_indexSegment, filter);
+    assertNotNull(predicateEvaluatorsMap);
 
     // Extract values with star-tree
-    PlanNode starTreeFilterPlanNode;
-    if (groupByColumns.isEmpty()) {
-      starTreeFilterPlanNode = new StarTreeFilterPlanNode(_starTreeV2, filter, null, null);
-    } else {
-      starTreeFilterPlanNode = new StarTreeFilterPlanNode(_starTreeV2, filter, groupByColumnSet, null);
-    }
+    PlanNode starTreeFilterPlanNode =
+        new StarTreeFilterPlanNode(_starTreeV2, predicateEvaluatorsMap, groupByColumnSet, null);
     List<ForwardIndexReader> starTreeAggregationColumnReaders = new ArrayList<>(numAggregations);
-    for (AggregationFunctionColumnPair aggregationFunctionColumnPair : functionColumnPairs) {
+    for (AggregationFunctionColumnPair aggregationFunctionColumnPair : aggregationFunctionColumnPairs) {
       starTreeAggregationColumnReaders
           .add(_starTreeV2.getDataSource(aggregationFunctionColumnPair.toColumnName()).getForwardIndex());
     }
@@ -232,7 +228,7 @@ abstract class BaseStarTreeV2Test<R, A> {
     PlanNode nonStarTreeFilterPlanNode = new FilterPlanNode(_indexSegment, queryContext);
     List<ForwardIndexReader> nonStarTreeAggregationColumnReaders = new ArrayList<>(numAggregations);
     List<Dictionary> nonStarTreeAggregationColumnDictionaries = new ArrayList<>(numAggregations);
-    for (AggregationFunctionColumnPair aggregationFunctionColumnPair : functionColumnPairs) {
+    for (AggregationFunctionColumnPair aggregationFunctionColumnPair : aggregationFunctionColumnPairs) {
       if (aggregationFunctionColumnPair.getFunctionType() == AggregationFunctionType.COUNT) {
         nonStarTreeAggregationColumnReaders.add(null);
         nonStarTreeAggregationColumnDictionaries.add(null);
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index f2bf7dc..0d3c3b0 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -477,6 +477,12 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
     assertEquals(firstQueryResponse.get("totalDocs").asLong(), numTotalDocs);
     assertEquals(firstQueryResponse.get("numDocsScanned").asInt(), NUM_SEGMENTS);
 
+    // Should be able to use the star-tree with an additional match-all predicate on another dimension
+    firstQueryResponse = postQuery(TEST_STAR_TREE_QUERY_1 + " AND DaysSinceEpoch > 16070");
+    assertEquals(firstQueryResponse.get("aggregationResults").get(0).get("value").asInt(), firstQueryResult);
+    assertEquals(firstQueryResponse.get("totalDocs").asLong(), numTotalDocs);
+    assertEquals(firstQueryResponse.get("numDocsScanned").asInt(), NUM_SEGMENTS);
+
     // Test the second query
     JsonNode secondQueryResponse = postQuery(TEST_STAR_TREE_QUERY_2);
     int secondQueryResult = secondQueryResponse.get("aggregationResults").get(0).get("value").asInt();
@@ -518,6 +524,12 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
     assertEquals(secondQueryResponse.get("totalDocs").asLong(), numTotalDocs);
     assertEquals(secondQueryResponse.get("numDocsScanned").asInt(), NUM_SEGMENTS);
 
+    // Should be able to use the star-tree with an additional match-all predicate on another dimension
+    secondQueryResponse = postQuery(TEST_STAR_TREE_QUERY_2 + " AND DaysSinceEpoch > 16070");
+    assertEquals(secondQueryResponse.get("aggregationResults").get(0).get("value").asInt(), secondQueryResult);
+    assertEquals(secondQueryResponse.get("totalDocs").asLong(), numTotalDocs);
+    assertEquals(secondQueryResponse.get("numDocsScanned").asInt(), NUM_SEGMENTS);
+
     // Remove the star-tree index config and trigger reload
     indexingConfig.setStarTreeIndexConfigs(null);
     updateTableConfig(tableConfig);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org