You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2021/07/27 05:12:55 UTC

[pinot] branch master updated: Introduce OR Predicate Execution On Star Tree Index (#7184)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 717940b  Introduce OR Predicate Execution On Star Tree Index (#7184)
717940b is described below

commit 717940b5d1c0b9aa0fea149384f12c77b31cefd7
Author: Atri Sharma <at...@gmail.com>
AuthorDate: Tue Jul 27 10:42:35 2021 +0530

    Introduce OR Predicate Execution On Star Tree Index (#7184)
    
    Allows OR predicates to be executed on Star Tree indices. We only allow a single dimension to
    be present in the OR predicate in order to be qualified for execution on Star Tree node. This is to ensure
    that we do not double count pre aggregated documents while traversing the tree for two dimensions.
---
 .../plan/AggregationGroupByOrderByPlanNode.java    |   5 +-
 .../core/plan/AggregationGroupByPlanNode.java      |   4 +-
 .../pinot/core/plan/AggregationPlanNode.java       |   5 +-
 .../core/startree/CompositePredicateEvaluator.java |  57 ++++++++
 .../apache/pinot/core/startree/StarTreeUtils.java  | 151 +++++++++++++++++----
 .../startree/operator/StarTreeFilterOperator.java  | 117 ++++++++++------
 .../startree/plan/StarTreeDocIdSetPlanNode.java    |   7 +-
 .../core/startree/plan/StarTreeFilterPlanNode.java |   9 +-
 .../startree/plan/StarTreeProjectionPlanNode.java  |   4 +-
 .../startree/plan/StarTreeTransformPlanNode.java   |   5 +-
 .../pinot/core/startree/v2/BaseStarTreeV2Test.java |  28 +++-
 11 files changed, 302 insertions(+), 90 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationGroupByOrderByPlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationGroupByOrderByPlanNode.java
index 915eb5e..47df15a 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationGroupByOrderByPlanNode.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationGroupByOrderByPlanNode.java
@@ -22,17 +22,18 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import org.apache.pinot.common.request.context.ExpressionContext;
-import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator;
 import org.apache.pinot.core.operator.query.AggregationGroupByOrderByOperator;
 import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
 import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils;
 import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.startree.CompositePredicateEvaluator;
 import org.apache.pinot.core.startree.StarTreeUtils;
 import org.apache.pinot.core.startree.plan.StarTreeTransformPlanNode;
 import org.apache.pinot.segment.spi.IndexSegment;
 import org.apache.pinot.segment.spi.index.startree.AggregationFunctionColumnPair;
 import org.apache.pinot.segment.spi.index.startree.StarTreeV2;
 
+
 /**
  * The <code>AggregationGroupByOrderByPlanNode</code> class provides the execution plan for aggregation group-by order-by query on a
  * single segment.
@@ -67,7 +68,7 @@ public class AggregationGroupByOrderByPlanNode implements PlanNode {
       AggregationFunctionColumnPair[] aggregationFunctionColumnPairs =
           StarTreeUtils.extractAggregationFunctionPairs(_aggregationFunctions);
       if (aggregationFunctionColumnPairs != null) {
-        Map<String, List<PredicateEvaluator>> predicateEvaluatorsMap =
+        Map<String, List<CompositePredicateEvaluator>> predicateEvaluatorsMap =
             StarTreeUtils.extractPredicateEvaluatorsMap(indexSegment, queryContext.getFilter());
         if (predicateEvaluatorsMap != null) {
           for (StarTreeV2 starTreeV2 : starTrees) {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationGroupByPlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationGroupByPlanNode.java
index 25db1dd..9554bc9 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationGroupByPlanNode.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationGroupByPlanNode.java
@@ -22,11 +22,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import org.apache.pinot.common.request.context.ExpressionContext;
-import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator;
 import org.apache.pinot.core.operator.query.AggregationGroupByOperator;
 import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
 import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils;
 import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.startree.CompositePredicateEvaluator;
 import org.apache.pinot.core.startree.StarTreeUtils;
 import org.apache.pinot.core.startree.plan.StarTreeTransformPlanNode;
 import org.apache.pinot.segment.spi.IndexSegment;
@@ -64,7 +64,7 @@ public class AggregationGroupByPlanNode implements PlanNode {
       AggregationFunctionColumnPair[] aggregationFunctionColumnPairs =
           StarTreeUtils.extractAggregationFunctionPairs(_aggregationFunctions);
       if (aggregationFunctionColumnPairs != null) {
-        Map<String, List<PredicateEvaluator>> predicateEvaluatorsMap =
+        Map<String, List<CompositePredicateEvaluator>> predicateEvaluatorsMap =
             StarTreeUtils.extractPredicateEvaluatorsMap(indexSegment, queryContext.getFilter());
         if (predicateEvaluatorsMap != null) {
           for (StarTreeV2 starTreeV2 : starTrees) {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java
index a31ec13..ec47d8e 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java
@@ -22,11 +22,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import org.apache.pinot.common.request.context.ExpressionContext;
-import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator;
 import org.apache.pinot.core.operator.query.AggregationOperator;
 import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
 import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils;
 import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.startree.CompositePredicateEvaluator;
 import org.apache.pinot.core.startree.StarTreeUtils;
 import org.apache.pinot.core.startree.plan.StarTreeTransformPlanNode;
 import org.apache.pinot.segment.spi.IndexSegment;
@@ -55,13 +55,14 @@ public class AggregationPlanNode implements PlanNode {
       AggregationFunctionColumnPair[] aggregationFunctionColumnPairs =
           StarTreeUtils.extractAggregationFunctionPairs(_aggregationFunctions);
       if (aggregationFunctionColumnPairs != null) {
-        Map<String, List<PredicateEvaluator>> predicateEvaluatorsMap =
+        Map<String, List<CompositePredicateEvaluator>> predicateEvaluatorsMap =
             StarTreeUtils.extractPredicateEvaluatorsMap(indexSegment, queryContext.getFilter());
         if (predicateEvaluatorsMap != null) {
           for (StarTreeV2 starTreeV2 : starTrees) {
             if (StarTreeUtils.isFitForStarTree(starTreeV2.getMetadata(), aggregationFunctionColumnPairs, null,
                 predicateEvaluatorsMap.keySet())) {
               _transformPlanNode = null;
+
               _starTreeTransformPlanNode =
                   new StarTreeTransformPlanNode(starTreeV2, aggregationFunctionColumnPairs, null,
                       predicateEvaluatorsMap, queryContext.getDebugOptions());
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/startree/CompositePredicateEvaluator.java b/pinot-core/src/main/java/org/apache/pinot/core/startree/CompositePredicateEvaluator.java
new file mode 100644
index 0000000..9424bc0
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/startree/CompositePredicateEvaluator.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.startree;
+
+import java.util.List;
+import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator;
+
+
+/**
+ * Represents a composite predicate.
+ *
+ * A composite predicate evaluator represents a single predicate evaluator or multiple predicate evaluators conjoined
+ * with OR.
+ * Consider the given predicate: (d1 > 10 OR d1 < 50). A composite predicate will represent two predicates -- (d1 > 10)
+ * and (d1 < 50) and represent that they are related by the operator OR.
+ */
+public class CompositePredicateEvaluator {
+  private final List<PredicateEvaluator> _predicateEvaluators;
+
+  public CompositePredicateEvaluator(List<PredicateEvaluator> predicateEvaluators) {
+    assert !predicateEvaluators.isEmpty();
+    _predicateEvaluators = predicateEvaluators;
+  }
+
+  public List<PredicateEvaluator> getPredicateEvaluators() {
+    return _predicateEvaluators;
+  }
+
+  /**
+   * Applies a dictionary id to the composite predicate evaluator. Returns {@code true} if the dictionary id matches any
+   * predicate evaluator, {@code false} otherwise.
+   */
+  public boolean apply(int dictId) {
+    for (PredicateEvaluator predicateEvaluator : _predicateEvaluators) {
+      if (predicateEvaluator.applySV(dictId)) {
+        return true;
+      }
+    }
+    return false;
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/startree/StarTreeUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/startree/StarTreeUtils.java
index e80d5e3..15fa414 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/startree/StarTreeUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/startree/StarTreeUtils.java
@@ -28,6 +28,7 @@ import java.util.Map;
 import java.util.Queue;
 import java.util.Set;
 import javax.annotation.Nullable;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.common.request.context.FilterContext;
 import org.apache.pinot.common.request.context.predicate.Predicate;
@@ -84,15 +85,20 @@ public class StarTreeUtils {
   /**
    * Extracts a map from the column to a list of {@link PredicateEvaluator}s for it. Returns {@code null} if the filter
    * cannot be solved by the star-tree.
+   *
+   * A predicate can be simple (d1 > 10) or composite (d1 > 10 AND d2 < 50) or multi levelled
+   * (d1 > 50 AND (d2 > 10 OR d2 < 35)).
+   * This method represents a list of CompositePredicates per dimension. For each dimension, all CompositePredicates in
+   * the list are implicitly ANDed together. Any OR predicates are nested within a CompositePredicate.
    */
   @Nullable
-  public static Map<String, List<PredicateEvaluator>> extractPredicateEvaluatorsMap(IndexSegment indexSegment,
+  public static Map<String, List<CompositePredicateEvaluator>> extractPredicateEvaluatorsMap(IndexSegment indexSegment,
       @Nullable FilterContext filter) {
     if (filter == null) {
       return Collections.emptyMap();
     }
 
-    Map<String, List<PredicateEvaluator>> predicateEvaluatorsMap = new HashMap<>();
+    Map<String, List<CompositePredicateEvaluator>> predicateEvaluatorsMap = new HashMap<>();
     Queue<FilterContext> queue = new LinkedList<>();
     queue.add(filter);
     FilterContext filterNode;
@@ -102,40 +108,26 @@ public class StarTreeUtils {
           queue.addAll(filterNode.getChildren());
           break;
         case OR:
-          // Star-tree does not support OR filter
-          return null;
-        case PREDICATE:
-          Predicate predicate = filterNode.getPredicate();
-          ExpressionContext lhs = predicate.getLhs();
-          if (lhs.getType() != ExpressionContext.Type.IDENTIFIER) {
-            // Star-tree does not support non-identifier expression
-            return null;
-          }
-          String column = lhs.getIdentifier();
-          DataSource dataSource = indexSegment.getDataSource(column);
-          Dictionary dictionary = dataSource.getDictionary();
-          if (dictionary == null) {
-            // Star-tree does not support non-dictionary encoded dimension
+          Pair<String, List<PredicateEvaluator>> pair = isOrClauseValidForStarTree(indexSegment, filterNode);
+          if (pair == null) {
             return null;
           }
-          switch (predicate.getType()) {
-            // Do not use star-tree for the following predicates because:
-            //   - REGEXP_LIKE: Need to scan the whole dictionary to gather the matching dictionary ids
-            //   - TEXT_MATCH/IS_NULL/IS_NOT_NULL: No way to gather the matching dictionary ids
-            case REGEXP_LIKE:
-            case TEXT_MATCH:
-            case IS_NULL:
-            case IS_NOT_NULL:
-              return null;
+          List<PredicateEvaluator> predicateEvaluators = pair.getRight();
+          // NOTE: Empty list means always true
+          if (!predicateEvaluators.isEmpty()) {
+            predicateEvaluatorsMap.computeIfAbsent(pair.getLeft(), k -> new ArrayList<>())
+                .add(new CompositePredicateEvaluator(predicateEvaluators));
           }
-          PredicateEvaluator predicateEvaluator = PredicateEvaluatorProvider
-              .getPredicateEvaluator(predicate, dictionary, dataSource.getDataSourceMetadata().getDataType());
-          if (predicateEvaluator.isAlwaysFalse()) {
-            // Do not use star-tree if there is no matching record
+          break;
+        case PREDICATE:
+          Predicate predicate = filterNode.getPredicate();
+          PredicateEvaluator predicateEvaluator = getPredicateEvaluatorForPredicate(indexSegment, predicate);
+          if (predicateEvaluator == null) {
             return null;
           }
           if (!predicateEvaluator.isAlwaysTrue()) {
-            predicateEvaluatorsMap.computeIfAbsent(column, k -> new ArrayList<>()).add(predicateEvaluator);
+            predicateEvaluatorsMap.computeIfAbsent(predicate.getLhs().getIdentifier(), k -> new ArrayList<>())
+                .add(new CompositePredicateEvaluator(Collections.singletonList(predicateEvaluator)));
           }
           break;
         default:
@@ -179,4 +171,103 @@ public class StarTreeUtils {
     // Check predicate columns
     return starTreeDimensions.containsAll(predicateColumns);
   }
+
+  /**
+   * Evaluates whether the given OR clause is valid for StarTree processing.
+   * StarTree supports OR predicates on a single dimension only (d1 < 10 OR d1 > 50).
+   *
+   * @return The pair of single identifier and predicate evaluators applied to it if true; {@code null} if the OR clause
+   *         cannot be solved with star-tree; empty predicate evaluator list if the OR clause always evaluates to true.
+   */
+  @Nullable
+  private static Pair<String, List<PredicateEvaluator>> isOrClauseValidForStarTree(IndexSegment indexSegment,
+      FilterContext filter) {
+    assert filter.getType() == FilterContext.Type.OR;
+
+    List<Predicate> predicates = new ArrayList<>();
+    extractOrClausePredicates(filter, predicates);
+
+    String identifier = null;
+    List<PredicateEvaluator> predicateEvaluators = new ArrayList<>();
+    for (Predicate predicate : predicates) {
+      PredicateEvaluator predicateEvaluator = getPredicateEvaluatorForPredicate(indexSegment, predicate);
+      if (predicateEvaluator == null) {
+        // The predicate cannot be solved with star-tree
+        return null;
+      }
+      if (predicateEvaluator.isAlwaysTrue()) {
+        // Use empty predicate evaluators to represent always true
+        return Pair.of(null, Collections.emptyList());
+      }
+      if (!predicateEvaluator.isAlwaysFalse()) {
+        String predicateIdentifier = predicate.getLhs().getIdentifier();
+        if (identifier == null) {
+          identifier = predicateIdentifier;
+        } else {
+          if (!identifier.equals(predicateIdentifier)) {
+            // The predicates are applied to multiple columns
+            return null;
+          }
+        }
+        predicateEvaluators.add(predicateEvaluator);
+      }
+    }
+    return Pair.of(identifier, predicateEvaluators);
+  }
+
+  /**
+   * Extracts the predicates under the given OR clause, returns {@code false} if there is nested AND under OR clause.
+   */
+  private static boolean extractOrClausePredicates(FilterContext filter, List<Predicate> predicates) {
+    assert filter.getType() == FilterContext.Type.OR;
+
+    for (FilterContext child : filter.getChildren()) {
+      switch (child.getType()) {
+        case AND:
+          return false;
+        case OR:
+          if (!extractOrClausePredicates(child, predicates)) {
+            return false;
+          }
+        case PREDICATE:
+          predicates.add(child.getPredicate());
+          break;
+        default:
+          throw new IllegalStateException();
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Returns the predicate evaluator for the given predicate, or {@code null} if the predicate cannot be solved with
+   * star-tree.
+   */
+  @Nullable
+  private static PredicateEvaluator getPredicateEvaluatorForPredicate(IndexSegment indexSegment, Predicate predicate) {
+    ExpressionContext lhs = predicate.getLhs();
+    if (lhs.getType() != ExpressionContext.Type.IDENTIFIER) {
+      // Star-tree does not support non-identifier expression
+      return null;
+    }
+    String column = lhs.getIdentifier();
+    DataSource dataSource = indexSegment.getDataSource(column);
+    Dictionary dictionary = dataSource.getDictionary();
+    if (dictionary == null) {
+      // Star-tree does not support non-dictionary encoded dimension
+      return null;
+    }
+    switch (predicate.getType()) {
+      // Do not use star-tree for the following predicates because:
+      //   - REGEXP_LIKE: Need to scan the whole dictionary to gather the matching dictionary ids
+      //   - TEXT_MATCH/IS_NULL/IS_NOT_NULL: No way to gather the matching dictionary ids
+      case REGEXP_LIKE:
+      case TEXT_MATCH:
+      case IS_NULL:
+      case IS_NOT_NULL:
+        return null;
+    }
+    return PredicateEvaluatorProvider
+        .getPredicateEvaluator(predicate, dictionary, dataSource.getDataSourceMetadata().getDataType());
+  }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/startree/operator/StarTreeFilterOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/startree/operator/StarTreeFilterOperator.java
index 2872edb..20b1558 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/startree/operator/StarTreeFilterOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/startree/operator/StarTreeFilterOperator.java
@@ -40,6 +40,7 @@ import org.apache.pinot.core.operator.filter.BitmapBasedFilterOperator;
 import org.apache.pinot.core.operator.filter.EmptyFilterOperator;
 import org.apache.pinot.core.operator.filter.FilterOperatorUtils;
 import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator;
+import org.apache.pinot.core.startree.CompositePredicateEvaluator;
 import org.apache.pinot.segment.spi.datasource.DataSource;
 import org.apache.pinot.segment.spi.index.startree.StarTree;
 import org.apache.pinot.segment.spi.index.startree.StarTreeNode;
@@ -53,7 +54,8 @@ import org.roaringbitmap.buffer.MutableRoaringBitmap;
  * <p>High-level algorithm:
  * <ul>
  *   <li>
- *     Traverse the filter tree and generate a map from column to a list of {@link PredicateEvaluator}s applied to it
+ *     Traverse the filter tree and generate a map from column to a list of {@link CompositePredicateEvaluator}s applied
+ *     to it
  *   </li>
  *   <li>
  *     Traverse the star tree index, try to match as many predicates as possible, add the matching documents into a
@@ -75,7 +77,7 @@ import org.roaringbitmap.buffer.MutableRoaringBitmap;
  *     </ul>
  *   </li>
  *   <li>
- *     For each remaining predicate columns, use the list of {@link PredicateEvaluator}s to generate separate
+ *     For each remaining predicate columns, use the list of {@link CompositePredicateEvaluator}s to generate separate
  *     {@link BaseFilterOperator}s for it
  *   </li>
  *   <li>Conjoin all {@link BaseFilterOperator}s with AND if we have multiple of them</li>
@@ -119,15 +121,17 @@ public class StarTreeFilterOperator extends BaseFilterOperator {
   // Star-tree
   private final StarTreeV2 _starTreeV2;
   // Map from column to predicate evaluators
-  private final Map<String, List<PredicateEvaluator>> _predicateEvaluatorsMap;
+  private final Map<String, List<CompositePredicateEvaluator>> _predicateEvaluatorsMap;
   // Set of group-by columns
   private final Set<String> _groupByColumns;
 
   private final Map<String, String> _debugOptions;
+
   boolean _resultEmpty = false;
 
-  public StarTreeFilterOperator(StarTreeV2 starTreeV2, Map<String, List<PredicateEvaluator>> predicateEvaluatorsMap,
-      Set<String> groupByColumns, @Nullable Map<String, String> debugOptions) {
+  public StarTreeFilterOperator(StarTreeV2 starTreeV2,
+      Map<String, List<CompositePredicateEvaluator>> predicateEvaluatorsMap, Set<String> groupByColumns,
+      @Nullable Map<String, String> debugOptions) {
     _starTreeV2 = starTreeV2;
     _predicateEvaluatorsMap = predicateEvaluatorsMap;
     _debugOptions = debugOptions;
@@ -183,10 +187,26 @@ public class StarTreeFilterOperator extends BaseFilterOperator {
 
     // Add remaining predicates
     for (String remainingPredicateColumn : starTreeResult._remainingPredicateColumns) {
-      List<PredicateEvaluator> predicateEvaluators = _predicateEvaluatorsMap.get(remainingPredicateColumn);
+      List<CompositePredicateEvaluator> compositePredicateEvaluators =
+          _predicateEvaluatorsMap.get(remainingPredicateColumn);
       DataSource dataSource = _starTreeV2.getDataSource(remainingPredicateColumn);
-      for (PredicateEvaluator predicateEvaluator : predicateEvaluators) {
-        childFilterOperators.add(FilterOperatorUtils.getLeafFilterOperator(predicateEvaluator, dataSource, numDocs));
+      for (CompositePredicateEvaluator compositePredicateEvaluator : compositePredicateEvaluators) {
+        List<PredicateEvaluator> predicateEvaluators = compositePredicateEvaluator.getPredicateEvaluators();
+        int numPredicateEvaluators = predicateEvaluators.size();
+        if (numPredicateEvaluators == 1) {
+          // Single predicate evaluator
+          childFilterOperators
+              .add(FilterOperatorUtils.getLeafFilterOperator(predicateEvaluators.get(0), dataSource, numDocs));
+        } else {
+          // Predicate evaluators conjoined with OR
+          List<BaseFilterOperator> orChildFilterOperators = new ArrayList<>(numPredicateEvaluators);
+          for (PredicateEvaluator childPredicateEvaluator : predicateEvaluators) {
+            orChildFilterOperators
+                .add(FilterOperatorUtils.getLeafFilterOperator(childPredicateEvaluator, dataSource, numDocs));
+          }
+          childFilterOperators
+              .add(FilterOperatorUtils.getOrFilterOperator(orChildFilterOperators, numDocs, _debugOptions));
+        }
       }
     }
 
@@ -306,61 +326,69 @@ public class StarTreeFilterOperator extends BaseFilterOperator {
   }
 
   /**
-   * Helper method to get a set of matching dictionary ids from a list of predicate evaluators conjoined with AND.
+   * Helper method to get a set of matching dictionary ids from a list of composite predicate evaluators conjoined with
+   * AND.
+   * <p>When there are multiple composite predicate evaluators:
    * <ul>
    *   <li>
-   *     We sort all predicate evaluators with priority: EQ > IN > RANGE > NOT_IN/NEQ > REGEXP_LIKE so that we process
-   *     less dictionary ids.
+   *     We sort all composite predicate evaluators with priority: EQ > IN > RANGE > NOT_IN/NEQ > REGEXP_LIKE > multiple
+   *     predicate evaluators conjoined with OR so that we process less dictionary ids.
    *   </li>
    *   <li>
-   *     For the first predicate evaluator, we get all the matching dictionary ids, then apply them to other predicate
-   *     evaluators to get the final set of matching dictionary ids.
+   *     For the first composite predicate evaluator, we get all the matching dictionary ids, then apply them to other
+   *     composite predicate evaluators to get the final set of matching dictionary ids.
    *   </li>
    * </ul>
    */
-  private IntSet getMatchingDictIds(List<PredicateEvaluator> predicateEvaluators) {
+  private IntSet getMatchingDictIds(List<CompositePredicateEvaluator> compositePredicateEvaluators) {
+    int numCompositePredicateEvaluators = compositePredicateEvaluators.size();
+    if (numCompositePredicateEvaluators == 1) {
+      return getMatchingDictIds(compositePredicateEvaluators.get(0));
+    }
+
     // Sort the predicate evaluators so that we process less dictionary ids
-    predicateEvaluators.sort(new Comparator<PredicateEvaluator>() {
+    compositePredicateEvaluators.sort(new Comparator<CompositePredicateEvaluator>() {
       @Override
-      public int compare(PredicateEvaluator o1, PredicateEvaluator o2) {
+      public int compare(CompositePredicateEvaluator o1, CompositePredicateEvaluator o2) {
         return getPriority(o1) - getPriority(o2);
       }
 
-      int getPriority(PredicateEvaluator predicateEvaluator) {
-        switch (predicateEvaluator.getPredicateType()) {
-          case EQ:
-            return 1;
-          case IN:
-            return 2;
-          case RANGE:
-            return 3;
-          case NOT_EQ:
-          case NOT_IN:
-            return 4;
-          default:
-            throw new UnsupportedOperationException();
+      int getPriority(CompositePredicateEvaluator compositePredicateEvaluator) {
+        List<PredicateEvaluator> predicateEvaluators = compositePredicateEvaluator.getPredicateEvaluators();
+        if (predicateEvaluators.size() == 1) {
+          switch (predicateEvaluators.get(0).getPredicateType()) {
+            case EQ:
+              return 1;
+            case IN:
+              return 2;
+            case RANGE:
+              return 3;
+            case NOT_EQ:
+            case NOT_IN:
+              return 4;
+            default:
+              throw new UnsupportedOperationException();
+          }
+        } else {
+          // Process OR at last
+          return 5;
         }
       }
     });
 
     // Initialize matching dictionary ids with the first predicate evaluator
-    IntSet matchingDictIds = new IntOpenHashSet();
-    PredicateEvaluator firstPredicateEvaluator = predicateEvaluators.get(0);
-    for (int matchingDictId : firstPredicateEvaluator.getMatchingDictIds()) {
-      matchingDictIds.add(matchingDictId);
-    }
+    IntSet matchingDictIds = getMatchingDictIds(compositePredicateEvaluators.get(0));
 
     // Process other predicate evaluators
-    int numPredicateEvaluators = predicateEvaluators.size();
-    for (int i = 1; i < numPredicateEvaluators; i++) {
+    for (int i = 1; i < numCompositePredicateEvaluators; i++) {
       // We don't need to apply other predicate evaluators if all matching dictionary ids have already been removed
       if (matchingDictIds.isEmpty()) {
         return matchingDictIds;
       }
-      PredicateEvaluator predicateEvaluator = predicateEvaluators.get(i);
+      CompositePredicateEvaluator compositePredicateEvaluator = compositePredicateEvaluators.get(i);
       IntIterator iterator = matchingDictIds.iterator();
       while (iterator.hasNext()) {
-        if (!predicateEvaluator.applySV(iterator.nextInt())) {
+        if (!compositePredicateEvaluator.apply(iterator.nextInt())) {
           iterator.remove();
         }
       }
@@ -368,4 +396,17 @@ public class StarTreeFilterOperator extends BaseFilterOperator {
 
     return matchingDictIds;
   }
+
+  /**
+   * Returns the matching dictionary ids for the given composite predicate evaluator.
+   */
+  private IntSet getMatchingDictIds(CompositePredicateEvaluator compositePredicateEvaluator) {
+    IntSet matchingDictIds = new IntOpenHashSet();
+    for (PredicateEvaluator predicateEvaluator : compositePredicateEvaluator.getPredicateEvaluators()) {
+      for (int matchingDictId : predicateEvaluator.getMatchingDictIds()) {
+        matchingDictIds.add(matchingDictId);
+      }
+    }
+    return matchingDictIds;
+  }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/startree/plan/StarTreeDocIdSetPlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/startree/plan/StarTreeDocIdSetPlanNode.java
index a463bcd..a27e524 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/startree/plan/StarTreeDocIdSetPlanNode.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/startree/plan/StarTreeDocIdSetPlanNode.java
@@ -23,17 +23,18 @@ import java.util.Map;
 import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.pinot.core.operator.DocIdSetOperator;
-import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator;
 import org.apache.pinot.core.plan.DocIdSetPlanNode;
 import org.apache.pinot.core.plan.PlanNode;
+import org.apache.pinot.core.startree.CompositePredicateEvaluator;
 import org.apache.pinot.segment.spi.index.startree.StarTreeV2;
 
 
 public class StarTreeDocIdSetPlanNode implements PlanNode {
   private final StarTreeFilterPlanNode _starTreeFilterPlanNode;
 
-  public StarTreeDocIdSetPlanNode(StarTreeV2 starTreeV2, Map<String, List<PredicateEvaluator>> predicateEvaluatorsMap,
-      @Nullable Set<String> groupByColumns, @Nullable Map<String, String> debugOptions) {
+  public StarTreeDocIdSetPlanNode(StarTreeV2 starTreeV2,
+      Map<String, List<CompositePredicateEvaluator>> predicateEvaluatorsMap, @Nullable Set<String> groupByColumns,
+      @Nullable Map<String, String> debugOptions) {
     _starTreeFilterPlanNode =
         new StarTreeFilterPlanNode(starTreeV2, predicateEvaluatorsMap, groupByColumns, debugOptions);
   }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/startree/plan/StarTreeFilterPlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/startree/plan/StarTreeFilterPlanNode.java
index b7e38be..78a0aa4 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/startree/plan/StarTreeFilterPlanNode.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/startree/plan/StarTreeFilterPlanNode.java
@@ -22,20 +22,21 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import javax.annotation.Nullable;
-import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator;
 import org.apache.pinot.core.plan.PlanNode;
+import org.apache.pinot.core.startree.CompositePredicateEvaluator;
 import org.apache.pinot.core.startree.operator.StarTreeFilterOperator;
 import org.apache.pinot.segment.spi.index.startree.StarTreeV2;
 
 
 public class StarTreeFilterPlanNode implements PlanNode {
   private final StarTreeV2 _starTreeV2;
-  private final Map<String, List<PredicateEvaluator>> _predicateEvaluatorsMap;
+  private final Map<String, List<CompositePredicateEvaluator>> _predicateEvaluatorsMap;
   private final Set<String> _groupByColumns;
   private final Map<String, String> _debugOptions;
 
-  public StarTreeFilterPlanNode(StarTreeV2 starTreeV2, Map<String, List<PredicateEvaluator>> predicateEvaluatorsMap,
-      @Nullable Set<String> groupByColumns, @Nullable Map<String, String> debugOptions) {
+  public StarTreeFilterPlanNode(StarTreeV2 starTreeV2,
+      Map<String, List<CompositePredicateEvaluator>> predicateEvaluatorsMap, @Nullable Set<String> groupByColumns,
+      @Nullable Map<String, String> debugOptions) {
     _starTreeV2 = starTreeV2;
     _predicateEvaluatorsMap = predicateEvaluatorsMap;
     _groupByColumns = groupByColumns;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/startree/plan/StarTreeProjectionPlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/startree/plan/StarTreeProjectionPlanNode.java
index 9ab5262..b27ebcd 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/startree/plan/StarTreeProjectionPlanNode.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/startree/plan/StarTreeProjectionPlanNode.java
@@ -24,8 +24,8 @@ import java.util.Map;
 import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.pinot.core.operator.ProjectionOperator;
-import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator;
 import org.apache.pinot.core.plan.PlanNode;
+import org.apache.pinot.core.startree.CompositePredicateEvaluator;
 import org.apache.pinot.segment.spi.datasource.DataSource;
 import org.apache.pinot.segment.spi.index.startree.StarTreeV2;
 
@@ -35,7 +35,7 @@ public class StarTreeProjectionPlanNode implements PlanNode {
   private final StarTreeDocIdSetPlanNode _starTreeDocIdSetPlanNode;
 
   public StarTreeProjectionPlanNode(StarTreeV2 starTreeV2, Set<String> projectionColumns,
-      Map<String, List<PredicateEvaluator>> predicateEvaluatorsMap, @Nullable Set<String> groupByColumns,
+      Map<String, List<CompositePredicateEvaluator>> predicateEvaluatorsMap, @Nullable Set<String> groupByColumns,
       @Nullable Map<String, String> debugOptions) {
     _dataSourceMap = new HashMap<>();
     for (String projectionColumn : projectionColumns) {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/startree/plan/StarTreeTransformPlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/startree/plan/StarTreeTransformPlanNode.java
index 8be3e13..e7aeaef 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/startree/plan/StarTreeTransformPlanNode.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/startree/plan/StarTreeTransformPlanNode.java
@@ -26,9 +26,9 @@ import java.util.Map;
 import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.pinot.common.request.context.ExpressionContext;
-import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator;
 import org.apache.pinot.core.operator.transform.TransformOperator;
 import org.apache.pinot.core.plan.PlanNode;
+import org.apache.pinot.core.startree.CompositePredicateEvaluator;
 import org.apache.pinot.segment.spi.index.startree.AggregationFunctionColumnPair;
 import org.apache.pinot.segment.spi.index.startree.StarTreeV2;
 
@@ -39,7 +39,8 @@ public class StarTreeTransformPlanNode implements PlanNode {
 
   public StarTreeTransformPlanNode(StarTreeV2 starTreeV2,
       AggregationFunctionColumnPair[] aggregationFunctionColumnPairs, @Nullable ExpressionContext[] groupByExpressions,
-      Map<String, List<PredicateEvaluator>> predicateEvaluatorsMap, @Nullable Map<String, String> debugOptions) {
+      Map<String, List<CompositePredicateEvaluator>> predicateEvaluatorsMap,
+      @Nullable Map<String, String> debugOptions) {
     Set<String> projectionColumns = new HashSet<>();
     for (AggregationFunctionColumnPair aggregationFunctionColumnPair : aggregationFunctionColumnPairs) {
       projectionColumns.add(aggregationFunctionColumnPair.toColumnName());
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/BaseStarTreeV2Test.java b/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/BaseStarTreeV2Test.java
index 2e9de2b..648934e 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/BaseStarTreeV2Test.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/BaseStarTreeV2Test.java
@@ -33,12 +33,12 @@ import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.common.request.context.FilterContext;
 import org.apache.pinot.core.common.BlockDocIdIterator;
-import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator;
 import org.apache.pinot.core.plan.FilterPlanNode;
 import org.apache.pinot.core.plan.PlanNode;
 import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
+import org.apache.pinot.core.startree.CompositePredicateEvaluator;
 import org.apache.pinot.core.startree.StarTreeUtils;
 import org.apache.pinot.core.startree.plan.StarTreeFilterPlanNode;
 import org.apache.pinot.segment.local.aggregator.ValueAggregator;
@@ -94,7 +94,15 @@ abstract class BaseStarTreeV2Test<R, A> {
   private static final String DIMENSION_D2 = "d2";
   private static final int DIMENSION_CARDINALITY = 100;
   private static final String METRIC = "m";
-  private static final String QUERY_FILTER = " WHERE d1 = 0 AND d2 < 10";
+  private static final String QUERY_FILTER_AND = " WHERE d1 = 0 AND d2 < 10";
+  // StarTree supports OR predicates only on a single dimension
+  private static final String QUERY_FILTER_OR = " WHERE d1 > 10 OR d1 < 50";
+  private static final String QUERY_FILTER_COMPLEX_OR_MULTIPLE_DIMENSIONS = " WHERE d2 < 95 AND (d1 > 10 OR d1 < 50)";
+  private static final String QUERY_FILTER_COMPLEX_AND_MULTIPLE_DIMENSIONS_THREE_PREDICATES =
+      " WHERE d2 < 95 AND d2 > 25 AND (d1 > 10 OR d1 < 50)";
+  private static final String QUERY_FILTER_COMPLEX_OR_MULTIPLE_DIMENSIONS_THREE_PREDICATES =
+      " WHERE (d2 > 95 OR d2 < 25) AND (d1 > 10 OR d1 < 50)";
+  private static final String QUERY_FILTER_COMPLEX_OR_SINGLE_DIMENSION = " WHERE d1 = 95 AND (d1 > 90 OR d1 < 100)";
   private static final String QUERY_GROUP_BY = " GROUP BY d2";
 
   private ValueAggregator _valueAggregator;
@@ -170,9 +178,19 @@ abstract class BaseStarTreeV2Test<R, A> {
 
     String baseQuery = String.format("SELECT %s FROM %s", aggregation, TABLE_NAME);
     testQuery(baseQuery);
-    testQuery(baseQuery + QUERY_FILTER);
+    testQuery(baseQuery + QUERY_FILTER_AND);
+    testQuery(baseQuery + QUERY_FILTER_OR);
+    testQuery(baseQuery + QUERY_FILTER_COMPLEX_OR_MULTIPLE_DIMENSIONS);
+    testQuery(baseQuery + QUERY_FILTER_COMPLEX_AND_MULTIPLE_DIMENSIONS_THREE_PREDICATES);
+    testQuery(baseQuery + QUERY_FILTER_COMPLEX_OR_MULTIPLE_DIMENSIONS_THREE_PREDICATES);
+    testQuery(baseQuery + QUERY_FILTER_COMPLEX_OR_SINGLE_DIMENSION);
     testQuery(baseQuery + QUERY_GROUP_BY);
-    testQuery(baseQuery + QUERY_FILTER + QUERY_GROUP_BY);
+    testQuery(baseQuery + QUERY_FILTER_AND + QUERY_GROUP_BY);
+    testQuery(baseQuery + QUERY_FILTER_OR + QUERY_GROUP_BY);
+    testQuery(baseQuery + QUERY_FILTER_COMPLEX_OR_MULTIPLE_DIMENSIONS + QUERY_GROUP_BY);
+    testQuery(baseQuery + QUERY_FILTER_COMPLEX_OR_MULTIPLE_DIMENSIONS_THREE_PREDICATES + QUERY_GROUP_BY);
+    testQuery(baseQuery + QUERY_FILTER_COMPLEX_AND_MULTIPLE_DIMENSIONS_THREE_PREDICATES + QUERY_GROUP_BY);
+    testQuery(baseQuery + QUERY_FILTER_COMPLEX_OR_SINGLE_DIMENSION + QUERY_GROUP_BY);
   }
 
   @AfterClass
@@ -207,7 +225,7 @@ abstract class BaseStarTreeV2Test<R, A> {
 
     // Filter
     FilterContext filter = queryContext.getFilter();
-    Map<String, List<PredicateEvaluator>> predicateEvaluatorsMap =
+    Map<String, List<CompositePredicateEvaluator>> predicateEvaluatorsMap =
         StarTreeUtils.extractPredicateEvaluatorsMap(_indexSegment, filter);
     assertNotNull(predicateEvaluatorsMap);
 

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org