You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2020/09/30 04:16:47 UTC

[incubator-pinot] branch master updated: Add FilterOptimizer which supports optimizing both PQL and SQL query filter (#6056)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 009ab53  Add FilterOptimizer which supports optimizing both PQL and SQL query filter (#6056)
009ab53 is described below

commit 009ab53d1943829803fe308205dd951be2caffcb
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Tue Sep 29 21:16:34 2020 -0700

    Add FilterOptimizer which supports optimizing both PQL and SQL query filter (#6056)
    
    Add `FilterOptimizer` which supports optimizing query filter from both `BrokerRequest` and `PinotQuery`.
    `FilterOptimizer` will replace `FilterQueryTreeOptimizer` which only works on `BrokerRequest` query filter.
    In order to fully support SQL (#4219), the query optimizer should perform the same optimization to `PinotQuery` as `BrokerRequest`.
    
    Add `FlattenAndOrFilterOptimizer` to replace `FlattenNestedPredicatesFilterQueryTreeOptimizer`, and removes the limitation of flatten depth
    Add `MergeEqInFilterOptimizer` to replace `MultipleOrEqualitiesToInClauseFilterQueryTreeOptimizer`
    Add `MergeRangeFilterOptimizer` to replace `RangeMergeOptimizer`, and supports merging range for all single-value columns (based on schema)
    
    This PR only adds the new code. The following PR will wire the new code and remove the old code.
---
 .../pinot/core/query/optimizer/QueryOptimizer.java |  68 +++
 .../query/optimizer/filter/FilterOptimizer.java    |  42 ++
 .../filter/FlattenAndOrFilterOptimizer.java        |  88 ++++
 .../optimizer/filter/MergeEqInFilterOptimizer.java | 263 +++++++++++
 .../filter/MergeRangeFilterOptimizer.java          | 354 +++++++++++++++
 .../request/context/predicate/RangePredicate.java  |   2 +-
 .../core/query/optimizer/QueryOptimizerTest.java   | 495 +++++++++++++++++++++
 7 files changed, 1311 insertions(+), 1 deletion(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/optimizer/QueryOptimizer.java b/pinot-core/src/main/java/org/apache/pinot/core/query/optimizer/QueryOptimizer.java
new file mode 100644
index 0000000..922fcc2
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/optimizer/QueryOptimizer.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.optimizer;
+
+import java.util.Arrays;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.request.BrokerRequest;
+import org.apache.pinot.common.request.Expression;
+import org.apache.pinot.common.request.FilterQuery;
+import org.apache.pinot.common.request.PinotQuery;
+import org.apache.pinot.common.utils.request.FilterQueryTree;
+import org.apache.pinot.common.utils.request.RequestUtils;
+import org.apache.pinot.core.query.optimizer.filter.FilterOptimizer;
+import org.apache.pinot.core.query.optimizer.filter.FlattenAndOrFilterOptimizer;
+import org.apache.pinot.core.query.optimizer.filter.MergeEqInFilterOptimizer;
+import org.apache.pinot.core.query.optimizer.filter.MergeRangeFilterOptimizer;
+import org.apache.pinot.spi.data.Schema;
+
+
+public class QueryOptimizer {
+  private static final List<FilterOptimizer> FILTER_OPTIMIZERS = Arrays
+      .asList(new FlattenAndOrFilterOptimizer(), new MergeEqInFilterOptimizer(), new MergeRangeFilterOptimizer());
+
+  /**
+   * Optimizes the given PQL query.
+   */
+  public void optimize(BrokerRequest brokerRequest, @Nullable Schema schema) {
+    FilterQuery filterQuery = brokerRequest.getFilterQuery();
+    if (filterQuery != null) {
+      FilterQueryTree filterQueryTree =
+          RequestUtils.buildFilterQuery(filterQuery.getId(), brokerRequest.getFilterSubQueryMap().getFilterQueryMap());
+      for (FilterOptimizer filterOptimizer : FILTER_OPTIMIZERS) {
+        filterQueryTree = filterOptimizer.optimize(filterQueryTree, schema);
+      }
+      RequestUtils.generateFilterFromTree(filterQueryTree, brokerRequest);
+    }
+  }
+
+  /**
+   * Optimizes the given SQL query.
+   */
+  public void optimize(PinotQuery pinotQuery, @Nullable Schema schema) {
+    Expression filterExpression = pinotQuery.getFilterExpression();
+    if (filterExpression != null) {
+      for (FilterOptimizer filterOptimizer : FILTER_OPTIMIZERS) {
+        filterExpression = filterOptimizer.optimize(filterExpression, schema);
+      }
+      pinotQuery.setFilterExpression(filterExpression);
+    }
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/optimizer/filter/FilterOptimizer.java b/pinot-core/src/main/java/org/apache/pinot/core/query/optimizer/filter/FilterOptimizer.java
new file mode 100644
index 0000000..e49c9a6
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/optimizer/filter/FilterOptimizer.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.optimizer.filter;
+
+import javax.annotation.Nullable;
+import org.apache.pinot.common.request.Expression;
+import org.apache.pinot.common.utils.request.FilterQueryTree;
+import org.apache.pinot.spi.data.Schema;
+
+
+/**
+ * Interface for filter optimizers.
+ * TODO: Support AlwaysTrueFilter and AlwaysFalseFilter
+ */
+public interface FilterOptimizer {
+
+  /**
+   * Optimizes the given filter, returns the optimized filter.
+   */
+  FilterQueryTree optimize(FilterQueryTree filterQueryTree, @Nullable Schema schema);
+
+  /**
+   * Optimizes the given filter, returns the optimized filter.
+   */
+  Expression optimize(Expression filterExpression, @Nullable Schema schema);
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/optimizer/filter/FlattenAndOrFilterOptimizer.java b/pinot-core/src/main/java/org/apache/pinot/core/query/optimizer/filter/FlattenAndOrFilterOptimizer.java
new file mode 100644
index 0000000..37677d1
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/optimizer/filter/FlattenAndOrFilterOptimizer.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.optimizer.filter;
+
+import java.util.ArrayList;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.request.Expression;
+import org.apache.pinot.common.request.FilterOperator;
+import org.apache.pinot.common.request.Function;
+import org.apache.pinot.common.utils.request.FilterQueryTree;
+import org.apache.pinot.pql.parsers.pql2.ast.FilterKind;
+import org.apache.pinot.spi.data.Schema;
+
+
+/**
+ * The {@code FlattenAndOrFilterOptimizer} flattens the nested AND/OR filters. For example, AND(a, AND(b, c)) can
+ * be flattened to AND(a, b, c).
+ */
+public class FlattenAndOrFilterOptimizer implements FilterOptimizer {
+
+  @Override
+  public FilterQueryTree optimize(FilterQueryTree filterQueryTree, @Nullable Schema schema) {
+    return optimize(filterQueryTree);
+  }
+
+  private FilterQueryTree optimize(FilterQueryTree filterQueryTree) {
+    FilterOperator operator = filterQueryTree.getOperator();
+    if (operator != FilterOperator.AND && operator != FilterOperator.OR) {
+      return filterQueryTree;
+    }
+    List<FilterQueryTree> children = filterQueryTree.getChildren();
+    assert children != null;
+    List<FilterQueryTree> newChildren = new ArrayList<>();
+    for (FilterQueryTree child : children) {
+      FilterQueryTree optimizedChild = optimize(child);
+      if (optimizedChild.getOperator() == operator) {
+        newChildren.addAll(optimizedChild.getChildren());
+      } else {
+        newChildren.add(optimizedChild);
+      }
+    }
+    return new FilterQueryTree(null, null, operator, newChildren);
+  }
+
+  @Override
+  public Expression optimize(Expression filterExpression, @Nullable Schema schema) {
+    return optimize(filterExpression);
+  }
+
+  private Expression optimize(Expression filterExpression) {
+    Function function = filterExpression.getFunctionCall();
+    String operator = function.getOperator();
+    if (!operator.equals(FilterKind.AND.name()) && !operator.equals(FilterKind.OR.name())) {
+      return filterExpression;
+    }
+    List<Expression> children = function.getOperands();
+    assert children != null;
+    List<Expression> newChildren = new ArrayList<>();
+    for (Expression child : children) {
+      Expression optimizedChild = optimize(child);
+      Function childFunction = optimizedChild.getFunctionCall();
+      if (childFunction.getOperator().equals(operator)) {
+        newChildren.addAll(childFunction.getOperands());
+      } else {
+        newChildren.add(optimizedChild);
+      }
+    }
+    function.setOperands(newChildren);
+    return filterExpression;
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/optimizer/filter/MergeEqInFilterOptimizer.java b/pinot-core/src/main/java/org/apache/pinot/core/query/optimizer/filter/MergeEqInFilterOptimizer.java
new file mode 100644
index 0000000..3c7c321
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/optimizer/filter/MergeEqInFilterOptimizer.java
@@ -0,0 +1,263 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.optimizer.filter;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.request.Expression;
+import org.apache.pinot.common.request.FilterOperator;
+import org.apache.pinot.common.request.Function;
+import org.apache.pinot.common.utils.request.FilterQueryTree;
+import org.apache.pinot.common.utils.request.RequestUtils;
+import org.apache.pinot.pql.parsers.pql2.ast.FilterKind;
+import org.apache.pinot.spi.data.Schema;
+
+
+/**
+ * The {@code MergeEqualInFilterOptimizer} merges EQ and IN predicates on the same column joined by OR, and performs the
+ * following optimizations:
+ * <ul>
+ *   <li>Merge multiple EQ and IN predicates into one IN predicate (or one EQ predicate if possible)</li>
+ *   <li>De-duplicates the values in the IN predicate</li>
+ *   <li>Converts single value IN predicate to EQ predicate</li>
+ *   <li>Pulls up the merged predicate in the absence of other predicates</li>
+ * </ul>
+ *
+ * NOTE: This optimizer follows the {@link FlattenAndOrFilterOptimizer}, so all the AND/OR filters are already
+ *       flattened.
+ */
+public class MergeEqInFilterOptimizer implements FilterOptimizer {
+
+  @Override
+  public FilterQueryTree optimize(FilterQueryTree filterQueryTree, @Nullable Schema schema) {
+    FilterOperator operator = filterQueryTree.getOperator();
+    if (operator == FilterOperator.OR) {
+      List<FilterQueryTree> children = filterQueryTree.getChildren();
+      Map<String, Set<String>> valuesMap = new HashMap<>();
+      List<FilterQueryTree> newChildren = new ArrayList<>();
+      boolean recreateFilter = false;
+
+      // Iterate over all the child filters to merge EQ and IN predicates
+      for (FilterQueryTree child : children) {
+        FilterOperator childOperator = child.getOperator();
+        assert childOperator != FilterOperator.OR;
+        if (childOperator == FilterOperator.AND) {
+          child.getChildren().replaceAll(c -> optimize(c, schema));
+          newChildren.add(child);
+        } else if (childOperator == FilterOperator.EQUALITY) {
+          String column = child.getColumn();
+          String value = child.getValue().get(0);
+          Set<String> values = valuesMap.get(column);
+          if (values == null) {
+            values = new HashSet<>();
+            values.add(value);
+            valuesMap.put(column, values);
+          } else {
+            values.add(value);
+            // Recreate filter when multiple predicates can be merged
+            recreateFilter = true;
+          }
+        } else if (childOperator == FilterOperator.IN) {
+          String column = child.getColumn();
+          List<String> inPredicateValuesList = child.getValue();
+          Set<String> inPredicateValuesSet = new HashSet<>(inPredicateValuesList);
+          int numUniqueValues = inPredicateValuesSet.size();
+          if (numUniqueValues == 1 || numUniqueValues != inPredicateValuesList.size()) {
+            // Recreate filter when the IN predicate contains only 1 value (can be rewritten to EQ predicate), or values
+            // can be de-duplicated
+            recreateFilter = true;
+          }
+          Set<String> values = valuesMap.get(column);
+          if (values == null) {
+            valuesMap.put(column, inPredicateValuesSet);
+          } else {
+            values.addAll(inPredicateValuesSet);
+            // Recreate filter when multiple predicates can be merged
+            recreateFilter = true;
+          }
+        } else {
+          newChildren.add(child);
+        }
+      }
+
+      if (recreateFilter) {
+        if (newChildren.isEmpty() && valuesMap.size() == 1) {
+          // Single predicate without other filters
+          Map.Entry<String, Set<String>> entry = valuesMap.entrySet().iterator().next();
+          return getFilterQueryTree(entry.getKey(), entry.getValue());
+        } else {
+          for (Map.Entry<String, Set<String>> entry : valuesMap.entrySet()) {
+            newChildren.add(getFilterQueryTree(entry.getKey(), entry.getValue()));
+          }
+          return new FilterQueryTree(null, null, FilterOperator.OR, newChildren);
+        }
+      } else {
+        return filterQueryTree;
+      }
+    } else if (operator == FilterOperator.AND) {
+      filterQueryTree.getChildren().replaceAll(c -> optimize(c, schema));
+      return filterQueryTree;
+    } else if (operator == FilterOperator.IN) {
+      String column = filterQueryTree.getColumn();
+      List<String> valuesList = filterQueryTree.getValue();
+      Set<String> values = new HashSet<>(valuesList);
+      int numUniqueValues = values.size();
+      if (numUniqueValues == 1 || numUniqueValues != valuesList.size()) {
+        // Recreate filter when the IN predicate contains only 1 value (can be rewritten to EQ predicate), or values
+        // can be de-duplicated
+        return getFilterQueryTree(column, values);
+      } else {
+        return filterQueryTree;
+      }
+    } else {
+      return filterQueryTree;
+    }
+  }
+
+  /**
+   * Helper method to construct a EQ or IN predicate FilterQueryTree from the given column and values.
+   */
+  private static FilterQueryTree getFilterQueryTree(String column, Set<String> values) {
+    return new FilterQueryTree(column, new ArrayList<>(values),
+        values.size() == 1 ? FilterOperator.EQUALITY : FilterOperator.IN, null);
+  }
+
+  @Override
+  public Expression optimize(Expression filterExpression, @Nullable Schema schema) {
+    Function function = filterExpression.getFunctionCall();
+    String operator = function.getOperator();
+    if (operator.equals(FilterKind.OR.name())) {
+      List<Expression> children = function.getOperands();
+      Map<Expression, Set<Expression>> valuesMap = new HashMap<>();
+      List<Expression> newChildren = new ArrayList<>();
+      boolean recreateFilter = false;
+
+      // Iterate over all the child filters to merge EQ and IN predicates
+      for (Expression child : children) {
+        Function childFunction = child.getFunctionCall();
+        String childOperator = childFunction.getOperator();
+        assert !childOperator.equals(FilterKind.OR.name());
+        if (childOperator.equals(FilterKind.AND.name())) {
+          childFunction.getOperands().replaceAll(o -> optimize(o, schema));
+          newChildren.add(child);
+        } else if (childOperator.equals(FilterKind.EQUALS.name())) {
+          List<Expression> operands = childFunction.getOperands();
+          Expression lhs = operands.get(0);
+          Expression value = operands.get(1);
+          Set<Expression> values = valuesMap.get(lhs);
+          if (values == null) {
+            values = new HashSet<>();
+            values.add(value);
+            valuesMap.put(lhs, values);
+          } else {
+            values.add(value);
+            // Recreate filter when multiple predicates can be merged
+            recreateFilter = true;
+          }
+        } else if (childOperator.equals(FilterKind.IN.name())) {
+          List<Expression> operands = childFunction.getOperands();
+          Expression lhs = operands.get(0);
+          Set<Expression> inPredicateValuesSet = new HashSet<>();
+          int numOperands = operands.size();
+          for (int i = 1; i < numOperands; i++) {
+            inPredicateValuesSet.add(operands.get(i));
+          }
+          int numUniqueValues = inPredicateValuesSet.size();
+          if (numUniqueValues == 1 || numUniqueValues != numOperands - 1) {
+            // Recreate filter when the IN predicate contains only 1 value (can be rewritten to EQ predicate), or values
+            // can be de-duplicated
+            recreateFilter = true;
+          }
+          Set<Expression> values = valuesMap.get(lhs);
+          if (values == null) {
+            valuesMap.put(lhs, inPredicateValuesSet);
+          } else {
+            values.addAll(inPredicateValuesSet);
+            // Recreate filter when multiple predicates can be merged
+            recreateFilter = true;
+          }
+        } else {
+          newChildren.add(child);
+        }
+      }
+
+      if (recreateFilter) {
+        if (newChildren.isEmpty() && valuesMap.size() == 1) {
+          // Single range without other filters
+          Map.Entry<Expression, Set<Expression>> entry = valuesMap.entrySet().iterator().next();
+          return getFilterExpression(entry.getKey(), entry.getValue());
+        } else {
+          for (Map.Entry<Expression, Set<Expression>> entry : valuesMap.entrySet()) {
+            newChildren.add(getFilterExpression(entry.getKey(), entry.getValue()));
+          }
+          function.setOperands(newChildren);
+          return filterExpression;
+        }
+      } else {
+        return filterExpression;
+      }
+    } else if (operator.equals(FilterKind.AND.name())) {
+      function.getOperands().replaceAll(c -> optimize(c, schema));
+      return filterExpression;
+    } else if (operator.equals(FilterKind.IN.name())) {
+      List<Expression> operands = function.getOperands();
+      Expression lhs = operands.get(0);
+      Set<Expression> values = new HashSet<>();
+      int numOperands = operands.size();
+      for (int i = 1; i < numOperands; i++) {
+        values.add(operands.get(i));
+      }
+      int numUniqueValues = values.size();
+      if (numUniqueValues == 1 || numUniqueValues != numOperands - 1) {
+        // Recreate filter when the IN predicate contains only 1 value (can be rewritten to EQ predicate), or values
+        // can be de-duplicated
+        return getFilterExpression(lhs, values);
+      } else {
+        return filterExpression;
+      }
+    } else {
+      return filterExpression;
+    }
+  }
+
+  /**
+   * Helper method to construct a EQ or IN predicate filter Expression from the given lhs and values.
+   */
+  private static Expression getFilterExpression(Expression lhs, Set<Expression> values) {
+    int numValues = values.size();
+    if (numValues == 1) {
+      Expression eqFilter = RequestUtils.getFunctionExpression(FilterKind.EQUALS.name());
+      eqFilter.getFunctionCall().setOperands(Arrays.asList(lhs, values.iterator().next()));
+      return eqFilter;
+    } else {
+      Expression inFilter = RequestUtils.getFunctionExpression(FilterKind.IN.name());
+      List<Expression> operands = new ArrayList<>(numValues + 1);
+      operands.add(lhs);
+      operands.addAll(values);
+      inFilter.getFunctionCall().setOperands(operands);
+      return inFilter;
+    }
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/optimizer/filter/MergeRangeFilterOptimizer.java b/pinot-core/src/main/java/org/apache/pinot/core/query/optimizer/filter/MergeRangeFilterOptimizer.java
new file mode 100644
index 0000000..2a01395
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/optimizer/filter/MergeRangeFilterOptimizer.java
@@ -0,0 +1,354 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.optimizer.filter;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.common.request.Expression;
+import org.apache.pinot.common.request.ExpressionType;
+import org.apache.pinot.common.request.FilterOperator;
+import org.apache.pinot.common.request.Function;
+import org.apache.pinot.common.utils.request.FilterQueryTree;
+import org.apache.pinot.common.utils.request.RequestUtils;
+import org.apache.pinot.core.query.request.context.predicate.RangePredicate;
+import org.apache.pinot.pql.parsers.pql2.ast.FilterKind;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.BytesUtils;
+
+
+/**
+ * The {@code MergeRangeFilterOptimizer} merges multiple RANGE predicates on the same column joined by AND by taking
+ * their intersection. It also pulls up the merged predicate in the absence of other predicates.
+ *
+ * NOTE: This optimizer follows the {@link FlattenAndOrFilterOptimizer}, so all the AND/OR filters are already
+ *       flattened.
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class MergeRangeFilterOptimizer implements FilterOptimizer {
+
+  @Override
+  public FilterQueryTree optimize(FilterQueryTree filterQueryTree, @Nullable Schema schema) {
+    if (schema == null) {
+      return filterQueryTree;
+    }
+    FilterOperator operator = filterQueryTree.getOperator();
+    if (operator == FilterOperator.AND) {
+      List<FilterQueryTree> children = filterQueryTree.getChildren();
+      Map<String, Range> rangeMap = new HashMap<>();
+      List<FilterQueryTree> newChildren = new ArrayList<>();
+      boolean recreateFilter = false;
+
+      // Iterate over all the child filters to create and merge ranges
+      for (FilterQueryTree child : children) {
+        FilterOperator childOperator = child.getOperator();
+        assert childOperator != FilterOperator.AND;
+        if (childOperator == FilterOperator.OR) {
+          child.getChildren().replaceAll(c -> optimize(c, schema));
+          newChildren.add(child);
+        } else if (childOperator == FilterOperator.RANGE) {
+          String column = child.getColumn();
+          FieldSpec fieldSpec = schema.getFieldSpecFor(column);
+          if (fieldSpec == null || !fieldSpec.isSingleValueField()) {
+            // Skip optimizing transform expression and multi-value column
+            // NOTE: We cannot optimize multi-value column because [0, 10] will match filter "col < 1 AND col > 9", but
+            //       not the merged one.
+            newChildren.add(child);
+            continue;
+          }
+          // Create a range and merge with current range if exists
+          Range range = getRange(child.getValue().get(0), fieldSpec.getDataType());
+          Range currentRange = rangeMap.get(column);
+          if (currentRange == null) {
+            rangeMap.put(column, range);
+          } else {
+            currentRange.intersect(range);
+            recreateFilter = true;
+          }
+        } else {
+          newChildren.add(child);
+        }
+      }
+
+      if (recreateFilter) {
+        if (newChildren.isEmpty() && rangeMap.size() == 1) {
+          // Single range without other filters
+          Map.Entry<String, Range> entry = rangeMap.entrySet().iterator().next();
+          return getRangeFilterQueryTree(entry.getKey(), entry.getValue());
+        } else {
+          for (Map.Entry<String, Range> entry : rangeMap.entrySet()) {
+            newChildren.add(getRangeFilterQueryTree(entry.getKey(), entry.getValue()));
+          }
+          return new FilterQueryTree(null, null, FilterOperator.AND, newChildren);
+        }
+      } else {
+        return filterQueryTree;
+      }
+    } else if (operator == FilterOperator.OR) {
+      filterQueryTree.getChildren().replaceAll(c -> optimize(c, schema));
+      return filterQueryTree;
+    } else {
+      return filterQueryTree;
+    }
+  }
+
+  /**
+   * Helper method to create a Range from the given string representation of the range and data type. See
+   * {@link RangePredicate} for details.
+   */
+  private static Range getRange(String rangeString, DataType dataType) {
+    String[] split = StringUtils.split(rangeString, RangePredicate.LEGACY_DELIMITER);
+    String lower = split[0];
+    boolean lowerInclusive = lower.charAt(0) == RangePredicate.LOWER_INCLUSIVE;
+    String stringLowerBound = lower.substring(1);
+    Comparable lowerBound =
+        stringLowerBound.equals(RangePredicate.UNBOUNDED) ? null : getComparable(stringLowerBound, dataType);
+    String upper = split[1];
+    int upperLength = upper.length();
+    boolean upperInclusive = upper.charAt(upperLength - 1) == RangePredicate.UPPER_INCLUSIVE;
+    String stringUpperBound = upper.substring(0, upperLength - 1);
+    Comparable upperBound =
+        stringUpperBound.equals(RangePredicate.UNBOUNDED) ? null : getComparable(stringUpperBound, dataType);
+    return new Range(lowerBound, lowerInclusive, upperBound, upperInclusive);
+  }
+
+  /**
+   * Helper method to create a Comparable from the given string value and data type.
+   */
+  private static Comparable getComparable(String stringValue, DataType dataType) {
+    switch (dataType) {
+      case INT:
+        return Integer.parseInt(stringValue);
+      case LONG:
+        return Long.parseLong(stringValue);
+      case FLOAT:
+        return Float.parseFloat(stringValue);
+      case DOUBLE:
+        return Double.parseDouble(stringValue);
+      case STRING:
+        return stringValue;
+      case BYTES:
+        return BytesUtils.toByteArray(stringValue);
+      default:
+        throw new IllegalStateException();
+    }
+  }
+
+  /**
+   * Helper method to construct a RANGE predicate FilterQueryTree from the given column and range.
+   */
+  private static FilterQueryTree getRangeFilterQueryTree(String column, Range range) {
+    return new FilterQueryTree(column, Collections.singletonList(range.getRangeString()), FilterOperator.RANGE, null);
+  }
+
+  @Override
+  public Expression optimize(Expression filterExpression, @Nullable Schema schema) {
+    if (schema == null) {
+      return filterExpression;
+    }
+    Function function = filterExpression.getFunctionCall();
+    String operator = function.getOperator();
+    if (operator.equals(FilterKind.AND.name())) {
+      List<Expression> children = function.getOperands();
+      Map<String, Range> rangeMap = new HashMap<>();
+      List<Expression> newChildren = new ArrayList<>();
+      boolean recreateFilter = false;
+
+      // Iterate over all the child filters to create and merge ranges
+      for (Expression child : children) {
+        Function childFunction = child.getFunctionCall();
+        FilterKind filterKind = FilterKind.valueOf(childFunction.getOperator());
+        assert filterKind != FilterKind.AND;
+        if (filterKind == FilterKind.OR) {
+          childFunction.getOperands().replaceAll(o -> optimize(o, schema));
+          newChildren.add(child);
+        } else if (filterKind.isRange()) {
+          List<Expression> operands = childFunction.getOperands();
+          Expression lhs = operands.get(0);
+          if (lhs.getType() != ExpressionType.IDENTIFIER) {
+            // Skip optimizing transform expression
+            newChildren.add(child);
+            continue;
+          }
+          String column = lhs.getIdentifier().getName();
+          FieldSpec fieldSpec = schema.getFieldSpecFor(column);
+          if (fieldSpec == null || !fieldSpec.isSingleValueField()) {
+            // Skip optimizing multi-value column
+            // NOTE: We cannot optimize multi-value column because [0, 10] will match filter "col < 1 AND col > 9", but
+            //       not the merged one.
+            newChildren.add(child);
+            continue;
+          }
+          // Create a range and merge with current range if exists
+          DataType dataType = fieldSpec.getDataType();
+          Range range = getRange(filterKind, operands, dataType);
+          Range currentRange = rangeMap.get(column);
+          if (currentRange == null) {
+            rangeMap.put(column, range);
+          } else {
+            currentRange.intersect(range);
+            recreateFilter = true;
+          }
+        } else {
+          newChildren.add(child);
+        }
+      }
+
+      if (recreateFilter) {
+        if (newChildren.isEmpty() && rangeMap.size() == 1) {
+          // Single range without other filters
+          Map.Entry<String, Range> entry = rangeMap.entrySet().iterator().next();
+          return getRangeFilterExpression(entry.getKey(), entry.getValue());
+        } else {
+          for (Map.Entry<String, Range> entry : rangeMap.entrySet()) {
+            newChildren.add(getRangeFilterExpression(entry.getKey(), entry.getValue()));
+          }
+          function.setOperands(newChildren);
+          return filterExpression;
+        }
+      } else {
+        return filterExpression;
+      }
+    } else if (operator.equals(FilterKind.OR.name())) {
+      function.getOperands().replaceAll(c -> optimize(c, schema));
+      return filterExpression;
+    } else {
+      return filterExpression;
+    }
+  }
+
+  /**
+   * Helper method to create a Range from the given filter kind, operands and data type.
+   */
+  private static Range getRange(FilterKind filterKind, List<Expression> operands, DataType dataType) {
+    switch (filterKind) {
+      case GREATER_THAN:
+        return new Range(getComparable(operands.get(1), dataType), false, null, false);
+      case GREATER_THAN_OR_EQUAL:
+        return new Range(getComparable(operands.get(1), dataType), true, null, false);
+      case LESS_THAN:
+        return new Range(null, false, getComparable(operands.get(1), dataType), false);
+      case LESS_THAN_OR_EQUAL:
+        return new Range(null, false, getComparable(operands.get(1), dataType), true);
+      case BETWEEN:
+        return new Range(getComparable(operands.get(1), dataType), true, getComparable(operands.get(2), dataType),
+            true);
+      default:
+        throw new IllegalStateException("Unsupported filter kind: " + filterKind);
+    }
+  }
+
+  /**
+   * Helper method to create a Comparable from the given literal expression and data type.
+   */
+  private static Comparable getComparable(Expression literalExpression, DataType dataType) {
+    return getComparable(literalExpression.getLiteral().getFieldValue().toString(), dataType);
+  }
+
+  /**
+   * Helper method to construct a RANGE predicate filter Expression from the given column and range.
+   */
+  private static Expression getRangeFilterExpression(String column, Range range) {
+    Expression rangeFilter = RequestUtils.getFunctionExpression(FilterKind.RANGE.name());
+    rangeFilter.getFunctionCall().setOperands(Arrays.asList(RequestUtils.createIdentifierExpression(column),
+        RequestUtils.getLiteralExpression(range.getRangeString())));
+    return rangeFilter;
+  }
+
+  /**
+   * Helper class to represent a value range.
+   */
+  private static class Range {
+    Comparable _lowerBound;
+    boolean _lowerInclusive;
+    Comparable _upperBound;
+    boolean _upperInclusive;
+
+    Range(@Nullable Comparable lowerBound, boolean lowerInclusive, @Nullable Comparable upperBound,
+        boolean upperInclusive) {
+      _lowerBound = lowerBound;
+      _lowerInclusive = lowerInclusive;
+      _upperBound = upperBound;
+      _upperInclusive = upperInclusive;
+    }
+
+    /**
+     * Intersects the current range with another range.
+     */
+    void intersect(Range range) {
+      if (range._lowerBound != null) {
+        if (_lowerBound == null) {
+          _lowerInclusive = range._lowerInclusive;
+          _lowerBound = range._lowerBound;
+        } else {
+          int result = _lowerBound.compareTo(range._lowerBound);
+          if (result < 0) {
+            _lowerBound = range._lowerBound;
+            _lowerInclusive = range._lowerInclusive;
+          } else if (result == 0) {
+            _lowerInclusive &= range._lowerInclusive;
+          }
+        }
+      }
+      if (range._upperBound != null) {
+        if (_upperBound == null) {
+          _upperInclusive = range._upperInclusive;
+          _upperBound = range._upperBound;
+        } else {
+          int result = _upperBound.compareTo(range._upperBound);
+          if (result > 0) {
+            _upperBound = range._upperBound;
+            _upperInclusive = range._upperInclusive;
+          } else if (result == 0) {
+            _upperInclusive &= range._upperInclusive;
+          }
+        }
+      }
+    }
+
+    /**
+     * Returns the string representation of the range. See {@link RangePredicate} for details.
+     */
+    String getRangeString() {
+      StringBuilder stringBuilder = new StringBuilder();
+      if (_lowerBound == null) {
+        stringBuilder.append(RangePredicate.LOWER_EXCLUSIVE).append(RangePredicate.UNBOUNDED);
+      } else {
+        stringBuilder.append(_lowerInclusive ? RangePredicate.LOWER_INCLUSIVE : RangePredicate.LOWER_EXCLUSIVE);
+        stringBuilder.append(_lowerBound.toString());
+      }
+      // TODO: Switch to RangePredicate.DELIMITER after releasing 0.6.0
+      stringBuilder.append(RangePredicate.LEGACY_DELIMITER);
+      if (_upperBound == null) {
+        stringBuilder.append(RangePredicate.UNBOUNDED).append(RangePredicate.UPPER_EXCLUSIVE);
+      } else {
+        stringBuilder.append(_upperBound.toString());
+        stringBuilder.append(_upperInclusive ? RangePredicate.UPPER_INCLUSIVE : RangePredicate.UPPER_EXCLUSIVE);
+      }
+      return stringBuilder.toString();
+    }
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/predicate/RangePredicate.java b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/predicate/RangePredicate.java
index d2b5b26..c80bbec 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/predicate/RangePredicate.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/predicate/RangePredicate.java
@@ -29,7 +29,7 @@ import org.apache.pinot.core.query.request.context.ExpressionContext;
  */
 public class RangePredicate implements Predicate {
   public static final char DELIMITER = '\0';
-  // TODO: Remove the legacy delimiter after releasing 0.5.0
+  // TODO: Remove the legacy delimiter after releasing 0.6.0
   public static final String LEGACY_DELIMITER = "\t\t";
   public static final char LOWER_INCLUSIVE = '[';
   public static final char LOWER_EXCLUSIVE = '(';
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/optimizer/QueryOptimizerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/optimizer/QueryOptimizerTest.java
new file mode 100644
index 0000000..d37bdfc
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/core/query/optimizer/QueryOptimizerTest.java
@@ -0,0 +1,495 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.optimizer;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import org.apache.pinot.common.request.BrokerRequest;
+import org.apache.pinot.common.request.Expression;
+import org.apache.pinot.common.request.FilterOperator;
+import org.apache.pinot.common.request.Function;
+import org.apache.pinot.common.request.PinotQuery;
+import org.apache.pinot.common.utils.request.FilterQueryTree;
+import org.apache.pinot.common.utils.request.RequestUtils;
+import org.apache.pinot.pql.parsers.Pql2Compiler;
+import org.apache.pinot.pql.parsers.pql2.ast.FilterKind;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.sql.parsers.CalciteSqlCompiler;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.*;
+
+
+public class QueryOptimizerTest {
+  private static final QueryOptimizer OPTIMIZER = new QueryOptimizer();
+  private static final CalciteSqlCompiler SQL_COMPILER = new CalciteSqlCompiler();
+  private static final Pql2Compiler PQL_COMPILER = new Pql2Compiler();
+  private static final Schema SCHEMA =
+      new Schema.SchemaBuilder().setSchemaName("testTable").addSingleValueDimension("int", DataType.INT)
+          .addSingleValueDimension("long", DataType.LONG).addSingleValueDimension("float", DataType.FLOAT)
+          .addSingleValueDimension("double", DataType.DOUBLE).addSingleValueDimension("string", DataType.STRING)
+          .addSingleValueDimension("bytes", DataType.BYTES).addMultiValueDimension("mvInt", DataType.INT).build();
+
+  @Test
+  public void testNoFilter() {
+    String query = "SELECT * FROM testTable";
+
+    BrokerRequest sqlBrokerRequest = SQL_COMPILER.compileToBrokerRequest(query);
+    PinotQuery pinotQuery = sqlBrokerRequest.getPinotQuery();
+    BrokerRequest pqlBrokerRequest = PQL_COMPILER.compileToBrokerRequest(query);
+
+    OPTIMIZER.optimize(sqlBrokerRequest, SCHEMA);
+    assertNull(sqlBrokerRequest.getFilterQuery());
+
+    OPTIMIZER.optimize(pinotQuery, SCHEMA);
+    assertNull(pinotQuery.getFilterExpression());
+
+    OPTIMIZER.optimize(pqlBrokerRequest, SCHEMA);
+    assertNull(pqlBrokerRequest.getFilterQuery());
+  }
+
+  @Test
+  public void testFlattenAndOrFilter() {
+    String query =
+        "SELECT * FROM testTable WHERE ((int = 4 OR (long = 5 AND (float = 9 AND double = 7.5))) OR string = 'foo') OR bytes = 'abc'";
+
+    BrokerRequest sqlBrokerRequest = SQL_COMPILER.compileToBrokerRequest(query);
+    PinotQuery pinotQuery = sqlBrokerRequest.getPinotQuery();
+    BrokerRequest pqlBrokerRequest = PQL_COMPILER.compileToBrokerRequest(query);
+
+    for (BrokerRequest brokerRequest : Arrays.asList(sqlBrokerRequest, pqlBrokerRequest)) {
+      OPTIMIZER.optimize(brokerRequest, SCHEMA);
+      FilterQueryTree filterQueryTree = RequestUtils.buildFilterQuery(brokerRequest.getFilterQuery().getId(),
+          brokerRequest.getFilterSubQueryMap().getFilterQueryMap());
+      assertEquals(filterQueryTree.getOperator(), FilterOperator.OR);
+      List<FilterQueryTree> children = filterQueryTree.getChildren();
+      assertEquals(children.size(), 4);
+      assertEquals(children.get(0).toString(), "int EQUALITY [4]");
+      assertEquals(children.get(2).toString(), "string EQUALITY [foo]");
+      assertEquals(children.get(3).toString(), "bytes EQUALITY [abc]");
+
+      FilterQueryTree andFilter = children.get(1);
+      assertEquals(andFilter.getOperator(), FilterOperator.AND);
+      List<FilterQueryTree> andFilterChildren = andFilter.getChildren();
+      assertEquals(andFilterChildren.size(), 3);
+      assertEquals(andFilterChildren.get(0).toString(), "long EQUALITY [5]");
+      assertEquals(andFilterChildren.get(1).toString(), "float EQUALITY [9]");
+      assertEquals(andFilterChildren.get(2).toString(), "double EQUALITY [7.5]");
+    }
+
+    OPTIMIZER.optimize(pinotQuery, SCHEMA);
+    Function filterFunction = pinotQuery.getFilterExpression().getFunctionCall();
+    assertEquals(filterFunction.getOperator(), FilterKind.OR.name());
+    List<Expression> children = filterFunction.getOperands();
+    assertEquals(children.size(), 4);
+    assertEquals(children.get(0), getEqFilterExpression("int", 4));
+    assertEquals(children.get(2), getEqFilterExpression("string", "foo"));
+    assertEquals(children.get(3), getEqFilterExpression("bytes", "abc"));
+
+    Function secondChildFunction = children.get(1).getFunctionCall();
+    assertEquals(secondChildFunction.getOperator(), FilterKind.AND.name());
+    List<Expression> secondChildChildren = secondChildFunction.getOperands();
+    assertEquals(secondChildChildren.size(), 3);
+    assertEquals(secondChildChildren.get(0), getEqFilterExpression("long", 5));
+    assertEquals(secondChildChildren.get(1), getEqFilterExpression("float", 9));
+    assertEquals(secondChildChildren.get(2), getEqFilterExpression("double", 7.5));
+  }
+
+  private static Expression getEqFilterExpression(String column, Object value) {
+    Expression eqFilterExpression = RequestUtils.getFunctionExpression(FilterKind.EQUALS.name());
+    eqFilterExpression.getFunctionCall().setOperands(
+        Arrays.asList(RequestUtils.getIdentifierExpression(column), RequestUtils.getLiteralExpression(value)));
+    return eqFilterExpression;
+  }
+
+  @Test
+  public void testMergeEqInFilter() {
+    String query =
+        "SELECT * FROM testTable WHERE int IN (1, 1) AND (long IN (2, 3) OR long IN (3, 4) OR long = 2) AND (float = 3.5 OR double IN (1.1, 1.2) OR float = 4.5 OR float > 5.5 OR double = 1.3)";
+
+    BrokerRequest sqlBrokerRequest = SQL_COMPILER.compileToBrokerRequest(query);
+    PinotQuery pinotQuery = sqlBrokerRequest.getPinotQuery();
+    BrokerRequest pqlBrokerRequest = PQL_COMPILER.compileToBrokerRequest(query);
+
+    for (BrokerRequest brokerRequest : Arrays.asList(sqlBrokerRequest, pqlBrokerRequest)) {
+      OPTIMIZER.optimize(brokerRequest, SCHEMA);
+      FilterQueryTree filterQueryTree = RequestUtils.buildFilterQuery(brokerRequest.getFilterQuery().getId(),
+          brokerRequest.getFilterSubQueryMap().getFilterQueryMap());
+      assertEquals(filterQueryTree.getOperator(), FilterOperator.AND);
+      List<FilterQueryTree> children = filterQueryTree.getChildren();
+      assertEquals(children.size(), 3);
+      assertEquals(children.get(0).toString(), "int EQUALITY [1]");
+
+      FilterQueryTree secondChild = children.get(1);
+      assertEquals(secondChild.getColumn(), "long");
+      assertEquals(secondChild.getOperator(), FilterOperator.IN);
+      assertEqualsNoOrder(secondChild.getValue().toArray(), new Object[]{"2", "3", "4"});
+
+      FilterQueryTree thirdChild = children.get(2);
+      assertEquals(thirdChild.getOperator(), FilterOperator.OR);
+      List<FilterQueryTree> orFilterChildren = thirdChild.getChildren();
+      assertEquals(orFilterChildren.size(), 3);
+      assertEquals(orFilterChildren.get(0).toString(), "float RANGE [(5.5\t\t*)]");
+
+      // Order of second and third child is not deterministic
+      FilterQueryTree secondOrFilterChild = orFilterChildren.get(1);
+      assertEquals(secondOrFilterChild.getOperator(), FilterOperator.IN);
+      FilterQueryTree thirdOrFilterChild = orFilterChildren.get(2);
+      assertEquals(thirdOrFilterChild.getOperator(), FilterOperator.IN);
+      if (secondOrFilterChild.getColumn().equals("float")) {
+        assertEqualsNoOrder(secondOrFilterChild.getValue().toArray(), new Object[]{"3.5", "4.5"});
+        assertEquals(thirdOrFilterChild.getColumn(), "double");
+        assertEqualsNoOrder(thirdOrFilterChild.getValue().toArray(), new Object[]{"1.1", "1.2", "1.3"});
+      } else {
+        assertEquals(secondOrFilterChild.getColumn(), "double");
+        assertEqualsNoOrder(secondOrFilterChild.getValue().toArray(), new Object[]{"1.1", "1.2", "1.3"});
+        assertEquals(thirdOrFilterChild.getColumn(), "float");
+        assertEqualsNoOrder(thirdOrFilterChild.getValue().toArray(), new Object[]{"3.5", "4.5"});
+      }
+    }
+
+    OPTIMIZER.optimize(pinotQuery, SCHEMA);
+    Function filterFunction = pinotQuery.getFilterExpression().getFunctionCall();
+    assertEquals(filterFunction.getOperator(), FilterKind.AND.name());
+    List<Expression> children = filterFunction.getOperands();
+    assertEquals(children.size(), 3);
+    assertEquals(children.get(0), getEqFilterExpression("int", 1));
+    checkInFilterFunction(children.get(1).getFunctionCall(), "long", Arrays.asList(2, 3, 4));
+
+    Function thirdChildFunction = children.get(2).getFunctionCall();
+    assertEquals(thirdChildFunction.getOperator(), FilterKind.OR.name());
+    List<Expression> thirdChildChildren = thirdChildFunction.getOperands();
+    assertEquals(thirdChildChildren.size(), 3);
+    assertEquals(thirdChildChildren.get(0).getFunctionCall().getOperator(), FilterKind.GREATER_THAN.name());
+
+    // Order of second and third child is not deterministic
+    Function secondGrandChildFunction = thirdChildChildren.get(1).getFunctionCall();
+    assertEquals(secondGrandChildFunction.getOperator(), FilterKind.IN.name());
+    Function thirdGrandChildFunction = thirdChildChildren.get(2).getFunctionCall();
+    assertEquals(thirdGrandChildFunction.getOperator(), FilterKind.IN.name());
+    if (secondGrandChildFunction.getOperands().get(0).getIdentifier().getName().equals("float")) {
+      checkInFilterFunction(secondGrandChildFunction, "float", Arrays.asList(3.5, 4.5));
+      checkInFilterFunction(thirdGrandChildFunction, "double", Arrays.asList(1.1, 1.2, 1.3));
+    } else {
+      checkInFilterFunction(secondGrandChildFunction, "double", Arrays.asList(1.1, 1.2, 1.3));
+      checkInFilterFunction(thirdGrandChildFunction, "float", Arrays.asList(3.5, 4.5));
+    }
+  }
+
+  private static void checkInFilterFunction(Function inFilterFunction, String column, List<Object> values) {
+    assertEquals(inFilterFunction.getOperator(), FilterKind.IN.name());
+    List<Expression> operands = inFilterFunction.getOperands();
+    int numOperands = operands.size();
+    assertEquals(numOperands, values.size() + 1);
+    assertEquals(operands.get(0).getIdentifier().getName(), column);
+    Set<Expression> valueExpressions = new HashSet<>();
+    for (Object value : values) {
+      valueExpressions.add(RequestUtils.getLiteralExpression(value));
+    }
+    for (int i = 1; i < numOperands; i++) {
+      assertTrue(valueExpressions.contains(operands.get(i)));
+    }
+  }
+
+  @Test
+  public void testMergeRangeFilter() {
+    String query =
+        "SELECT * FROM testTable WHERE (int > 10 AND int <= 100 AND int BETWEEN 10 AND 20) OR (float BETWEEN 5.5 AND 7.5 AND float = 6 AND float < 6.5 AND float BETWEEN 6 AND 8) OR (string > '123' AND string > '23') OR (mvInt > 5 AND mvInt < 0)";
+
+    BrokerRequest sqlBrokerRequest = SQL_COMPILER.compileToBrokerRequest(query);
+    PinotQuery pinotQuery = sqlBrokerRequest.getPinotQuery();
+    BrokerRequest pqlBrokerRequest = PQL_COMPILER.compileToBrokerRequest(query);
+
+    for (BrokerRequest brokerRequest : Arrays.asList(sqlBrokerRequest, pqlBrokerRequest)) {
+      OPTIMIZER.optimize(brokerRequest, SCHEMA);
+      FilterQueryTree filterQueryTree = RequestUtils.buildFilterQuery(brokerRequest.getFilterQuery().getId(),
+          brokerRequest.getFilterSubQueryMap().getFilterQueryMap());
+      assertEquals(filterQueryTree.getOperator(), FilterOperator.OR);
+      List<FilterQueryTree> children = filterQueryTree.getChildren();
+      assertEquals(children.size(), 4);
+      assertEquals(children.get(0).toString(), "int RANGE [(10\t\t20]]");
+      // Alphabetical order for STRING column ('23' > '123')
+      assertEquals(children.get(2).toString(), "string RANGE [(23\t\t*)]");
+
+      FilterQueryTree secondChild = children.get(1);
+      assertEquals(secondChild.getOperator(), FilterOperator.AND);
+      assertEquals(secondChild.getChildren().size(), 2);
+      assertEquals(secondChild.getChildren().get(0).toString(), "float EQUALITY [6]");
+      assertEquals(secondChild.getChildren().get(1).toString(), "float RANGE [[6.0\t\t6.5)]");
+
+      // Range filter on multi-value column should not be merged ([-5, 10] can match this filter)
+      FilterQueryTree fourthChild = children.get(3);
+      assertEquals(fourthChild.getOperator(), FilterOperator.AND);
+      assertEquals(fourthChild.getChildren().size(), 2);
+      assertEquals(fourthChild.getChildren().get(0).toString(), "mvInt RANGE [(5\t\t*)]");
+      assertEquals(fourthChild.getChildren().get(1).toString(), "mvInt RANGE [(*\t\t0)]");
+    }
+
+    OPTIMIZER.optimize(pinotQuery, SCHEMA);
+    Function filterFunction = pinotQuery.getFilterExpression().getFunctionCall();
+    assertEquals(filterFunction.getOperator(), FilterKind.OR.name());
+    List<Expression> operands = filterFunction.getOperands();
+    assertEquals(operands.size(), 4);
+    assertEquals(operands.get(0), getRangeFilterExpression("int", "(10\t\t20]"));
+    // Alphabetical order for STRING column ('23' > '123')
+    assertEquals(operands.get(2), getRangeFilterExpression("string", "(23\t\t*)"));
+
+    Function secondChildFunction = operands.get(1).getFunctionCall();
+    assertEquals(secondChildFunction.getOperator(), FilterKind.AND.name());
+    List<Expression> secondChildChildren = secondChildFunction.getOperands();
+    assertEquals(secondChildChildren.size(), 2);
+    assertEquals(secondChildChildren.get(0), getEqFilterExpression("float", 6));
+    assertEquals(secondChildChildren.get(1), getRangeFilterExpression("float", "[6.0\t\t6.5)"));
+
+    // Range filter on multi-value column should not be merged ([-5, 10] can match this filter)
+    Function fourthChildFunction = operands.get(3).getFunctionCall();
+    assertEquals(fourthChildFunction.getOperator(), FilterKind.AND.name());
+    List<Expression> fourthChildChildren = fourthChildFunction.getOperands();
+    assertEquals(fourthChildChildren.size(), 2);
+    assertEquals(fourthChildChildren.get(0).getFunctionCall().getOperator(), FilterKind.GREATER_THAN.name());
+    assertEquals(fourthChildChildren.get(1).getFunctionCall().getOperator(), FilterKind.LESS_THAN.name());
+  }
+
+  private static Expression getRangeFilterExpression(String column, String rangeString) {
+    Expression rangeFilterExpression = RequestUtils.getFunctionExpression(FilterKind.RANGE.name());
+    rangeFilterExpression.getFunctionCall().setOperands(
+        Arrays.asList(RequestUtils.getIdentifierExpression(column), RequestUtils.getLiteralExpression(rangeString)));
+    return rangeFilterExpression;
+  }
+
+  @Test
+  public void testQueries() {
+    // MergeEqInFilter
+    testQuery("SELECT * FROM testTable WHERE int = 1 OR int = 2 OR int = 3",
+        "SELECT * FROM testTable WHERE int IN (1, 2, 3)");
+    testQuery("SELECT * FROM testTable WHERE int = 1 OR int = 2 OR int = 3 AND long = 4",
+        "SELECT * FROM testTable WHERE int IN (1, 2) OR (int = 3 AND long = 4)");
+    testQuery("SELECT * FROM testTable WHERE int = 1 OR int = 2 OR int = 3 OR long = 4 OR long = 5 OR long = 6",
+        "SELECT * FROM testTable WHERE int IN (1, 2, 3) OR long IN (4, 5, 6)");
+    testQuery("SELECT * FROM testTable WHERE int = 1 OR long = 4 OR int = 2 OR long = 5 OR int = 3 OR long = 6",
+        "SELECT * FROM testTable WHERE int IN (1, 2, 3) OR long IN (4, 5, 6)");
+    testQuery("SELECT * FROM testTable WHERE int = 1 OR int = 1", "SELECT * FROM testTable WHERE int = 1");
+    testQuery("SELECT * FROM testTable WHERE (int = 1 OR int = 1) AND long = 2",
+        "SELECT * FROM testTable WHERE int = 1 AND long = 2");
+    testQuery("SELECT * FROM testTable WHERE int = 1 OR int IN (2, 3, 4, 5)",
+        "SELECT * FROM testTable WHERE int IN (1, 2, 3, 4, 5)");
+    testQuery("SELECT * FROM testTable WHERE int IN (1, 1) OR int = 1", "SELECT * FROM testTable WHERE int = 1");
+    testQuery("SELECT * FROM testTable WHERE string = 'foo' OR string = 'bar' OR string = 'foobar'",
+        "SELECT * FROM testTable WHERE string IN ('foo', 'bar', 'foobar')");
+    testQuery("SELECT * FROM testTable WHERE bytes = 'dead' OR bytes = 'beef' OR bytes = 'deadbeef'",
+        "SELECT * FROM testTable WHERE bytes IN ('dead', 'beef', 'deadbeef')");
+
+    // MergeRangeFilter
+    testQuery("SELECT * FROM testTable WHERE int >= 10 AND int <= 20",
+        "SELECT * FROM testTable WHERE int BETWEEN 10 AND 20");
+    testQuery("SELECT * FROM testTable WHERE int BETWEEN 10 AND 20 AND int > 7 AND int <= 17 OR int > 20",
+        "SELECT * FROM testTable WHERE int BETWEEN 10 AND 17 OR int > 20");
+    testQuery("SELECT * FROM testTable WHERE long BETWEEN 10 AND 20 AND long > 7 AND long <= 17 OR long > 20",
+        "SELECT * FROM testTable WHERE long BETWEEN 10 AND 17 OR long > 20");
+    testQuery("SELECT * FROM testTable WHERE float BETWEEN 10.5 AND 20 AND float > 7 AND float <= 17.5 OR float > 20",
+        "SELECT * FROM testTable WHERE float BETWEEN 10.5 AND 17.5 OR float > 20");
+    testQuery(
+        "SELECT * FROM testTable WHERE double BETWEEN 10.5 AND 20 AND double > 7 AND double <= 17.5 OR double > 20",
+        "SELECT * FROM testTable WHERE double BETWEEN 10.5 AND 17.5 OR double > 20");
+    testQuery(
+        "SELECT * FROM testTable WHERE string BETWEEN '10' AND '20' AND string > '7' AND string <= '17' OR string > '20'",
+        "SELECT * FROM testTable WHERE string > '7' AND string <= '17' OR string > '20'");
+    testQuery(
+        "SELECT * FROM testTable WHERE bytes BETWEEN '10' AND '20' AND bytes > '07' AND bytes <= '17' OR bytes > '20'",
+        "SELECT * FROM testTable WHERE bytes BETWEEN '10' AND '17' OR bytes > '20'");
+    testQuery(
+        "SELECT * FROM testTable WHERE int > 10 AND long > 20 AND int <= 30 AND long <= 40 AND int >= 15 AND long >= 25",
+        "SELECT * FROM testTable WHERE int BETWEEN 15 AND 30 AND long BETWEEN 25 AND 40");
+    testQuery("SELECT * FROM testTable WHERE int > 10 AND int > 20 OR int < 30 AND int < 40",
+        "SELECT * FROM testTable WHERE int > 20 OR int < 30");
+    testQuery("SELECT * FROM testTable WHERE int > 10 AND int > 20 OR long < 30 AND long < 40",
+        "SELECT * FROM testTable WHERE int > 20 OR long < 30");
+
+    // Mixed
+    testQuery(
+        "SELECT * FROM testTable WHERE int >= 20 AND (int > 10 AND (int IN (1, 2) OR (int = 2 OR int = 3)) AND int <= 30)",
+        "SELECT * FROM testTable WHERE int BETWEEN 20 AND 30 AND int IN (1, 2, 3)");
+  }
+
+  private static void testQuery(String actual, String expected) {
+    BrokerRequest actualSqlBrokerRequest = SQL_COMPILER.compileToBrokerRequest(actual);
+    OPTIMIZER.optimize(actualSqlBrokerRequest, SCHEMA);
+    PinotQuery actualPinotQuery = actualSqlBrokerRequest.getPinotQuery();
+    OPTIMIZER.optimize(actualPinotQuery, SCHEMA);
+    BrokerRequest actualPqlBrokerRequest = PQL_COMPILER.compileToBrokerRequest(actual);
+    OPTIMIZER.optimize(actualPqlBrokerRequest, SCHEMA);
+
+    // Also optimize the expected query because the expected range can only be generate via optimizer
+    BrokerRequest expectedSqlBrokerRequest = SQL_COMPILER.compileToBrokerRequest(expected);
+    OPTIMIZER.optimize(expectedSqlBrokerRequest, SCHEMA);
+    PinotQuery expectedPinotQuery = expectedSqlBrokerRequest.getPinotQuery();
+    OPTIMIZER.optimize(expectedPinotQuery, SCHEMA);
+    BrokerRequest expectedPqlBrokerRequest = PQL_COMPILER.compileToBrokerRequest(expected);
+    OPTIMIZER.optimize(expectedPqlBrokerRequest, SCHEMA);
+
+    // Cross compare PQL and SQL BrokerRequest
+    compareBrokerRequest(actualPqlBrokerRequest, expectedPqlBrokerRequest);
+    compareBrokerRequest(actualPqlBrokerRequest, expectedSqlBrokerRequest);
+    compareBrokerRequest(actualSqlBrokerRequest, expectedPqlBrokerRequest);
+    compareBrokerRequest(actualSqlBrokerRequest, expectedSqlBrokerRequest);
+    comparePinotQuery(actualPinotQuery, expectedPinotQuery);
+  }
+
+  private static void compareBrokerRequest(BrokerRequest actual, BrokerRequest expected) {
+    if (expected.getFilterQuery() == null) {
+      assertNull(actual.getFilterQuery());
+      return;
+    }
+    FilterQueryTree actualFilter = RequestUtils
+        .buildFilterQuery(actual.getFilterQuery().getId(), actual.getFilterSubQueryMap().getFilterQueryMap());
+    FilterQueryTree expectedFilter = RequestUtils
+        .buildFilterQuery(expected.getFilterQuery().getId(), expected.getFilterSubQueryMap().getFilterQueryMap());
+    compareFilterQueryTree(actualFilter, expectedFilter);
+  }
+
+  private static void compareFilterQueryTree(FilterQueryTree actual, FilterQueryTree expected) {
+    assertEquals(actual.getOperator(), expected.getOperator());
+    FilterOperator operator = actual.getOperator();
+    if (operator == FilterOperator.AND || operator == FilterOperator.OR) {
+      assertNull(actual.getColumn());
+      assertNull(actual.getValue());
+      compareFilterQueryTreeChildren(actual.getChildren(), expected.getChildren());
+    } else {
+      assertEquals(actual.getColumn(), expected.getColumn());
+      assertNull(actual.getChildren());
+      if (operator == FilterOperator.IN || operator == FilterOperator.NOT_IN) {
+        assertEqualsNoOrder(actual.getValue().toArray(), expected.getValue().toArray());
+      } else {
+        assertEquals(actual.getValue(), expected.getValue());
+      }
+    }
+  }
+
+  /**
+   * Handles different order of children under AND/OR filter.
+   */
+  private static void compareFilterQueryTreeChildren(List<FilterQueryTree> actual, List<FilterQueryTree> expected) {
+    assertEquals(actual.size(), expected.size());
+    List<FilterQueryTree> unmatchedExpectedChildren = new ArrayList<>(expected);
+    for (FilterQueryTree actualChild : actual) {
+      Iterator<FilterQueryTree> iterator = unmatchedExpectedChildren.iterator();
+      boolean findMatchingChild = false;
+      while (iterator.hasNext()) {
+        try {
+          compareFilterQueryTree(actualChild, iterator.next());
+          iterator.remove();
+          findMatchingChild = true;
+          break;
+        } catch (AssertionError e) {
+          // Ignore
+        }
+      }
+      if (!findMatchingChild) {
+        fail("Failed to find matching child");
+      }
+    }
+  }
+
+  private static void comparePinotQuery(PinotQuery actual, PinotQuery expected) {
+    if (expected.getFilterExpression() == null) {
+      assertNull(actual.getFilterExpression());
+      return;
+    }
+    compareFilterExpression(actual.getFilterExpression(), expected.getFilterExpression());
+  }
+
+  private static void compareFilterExpression(Expression actual, Expression expected) {
+    Function actualFilterFunction = actual.getFunctionCall();
+    Function expectedFilterFunction = expected.getFunctionCall();
+    FilterKind actualFilterKind = FilterKind.valueOf(actualFilterFunction.getOperator());
+    FilterKind expectedFilterKind = FilterKind.valueOf(expectedFilterFunction.getOperator());
+    List<Expression> actualOperands = actualFilterFunction.getOperands();
+    List<Expression> expectedOperands = expectedFilterFunction.getOperands();
+    if (!actualFilterKind.isRange()) {
+      assertEquals(actualFilterKind, expectedFilterKind);
+      assertEquals(actualOperands.size(), expectedOperands.size());
+      if (actualFilterKind == FilterKind.AND || actualFilterKind == FilterKind.OR) {
+        compareFilterExpressionChildren(actualOperands, expectedOperands);
+      } else {
+        assertEquals(actualOperands.get(0), expectedOperands.get(0));
+        if (actualFilterKind == FilterKind.IN || actualFilterKind == FilterKind.NOT_IN) {
+          // Handle different order of values
+          assertEqualsNoOrder(actualOperands.toArray(), expectedOperands.toArray());
+        } else {
+          assertEquals(actualOperands, expectedOperands);
+        }
+      }
+    } else {
+      assertTrue(expectedFilterKind.isRange());
+      assertEquals(getRangeString(actualFilterKind, actualOperands),
+          getRangeString(expectedFilterKind, expectedOperands));
+    }
+  }
+
+  /**
+   * Handles different order of children under AND/OR filter.
+   */
+  private static void compareFilterExpressionChildren(List<Expression> actual, List<Expression> expected) {
+    assertEquals(actual.size(), expected.size());
+    List<Expression> unmatchedExpectedChildren = new ArrayList<>(expected);
+    for (Expression actualChild : actual) {
+      Iterator<Expression> iterator = unmatchedExpectedChildren.iterator();
+      boolean findMatchingChild = false;
+      while (iterator.hasNext()) {
+        try {
+          compareFilterExpression(actualChild, iterator.next());
+          iterator.remove();
+          findMatchingChild = true;
+          break;
+        } catch (AssertionError e) {
+          // Ignore
+        }
+      }
+      if (!findMatchingChild) {
+        fail("Failed to find matching child");
+      }
+    }
+  }
+
+  private static String getRangeString(FilterKind filterKind, List<Expression> operands) {
+    // TODO: Use the new delimiter after releasing 0.6.0
+    switch (filterKind) {
+      case GREATER_THAN:
+        return "(" + operands.get(1).getLiteral().getFieldValue().toString() + "\t\t*)";
+      case GREATER_THAN_OR_EQUAL:
+        return "[" + operands.get(1).getLiteral().getFieldValue().toString() + "\t\t*)";
+      case LESS_THAN:
+        return "(*\t\t" + operands.get(1).getLiteral().getFieldValue().toString() + ")";
+      case LESS_THAN_OR_EQUAL:
+        return "(*\t\t" + operands.get(1).getLiteral().getFieldValue().toString() + "]";
+      case BETWEEN:
+        return "[" + operands.get(1).getLiteral().getFieldValue().toString() + "\t\t" + operands.get(2).getLiteral()
+            .getFieldValue().toString() + "]";
+      case RANGE:
+        return operands.get(1).getLiteral().getStringValue();
+      default:
+        throw new IllegalStateException();
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org