You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2020/09/30 04:16:47 UTC
[incubator-pinot] branch master updated: Add FilterOptimizer which
supports optimizing both PQL and SQL query filter (#6056)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 009ab53 Add FilterOptimizer which supports optimizing both PQL and SQL query filter (#6056)
009ab53 is described below
commit 009ab53d1943829803fe308205dd951be2caffcb
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Tue Sep 29 21:16:34 2020 -0700
Add FilterOptimizer which supports optimizing both PQL and SQL query filter (#6056)
Add `FilterOptimizer` which supports optimizing query filter from both `BrokerRequest` and `PinotQuery`.
`FilterOptimizer` will replace `FilterQueryTreeOptimizer` which only works on `BrokerRequest` query filter.
In order to fully support SQL (#4219), the query optimizer should perform the same optimization to `PinotQuery` as `BrokerRequest`.
Add `FlattenAndOrFilterOptimizer` to replace `FlattenNestedPredicatesFilterQueryTreeOptimizer`, and removes the limitation of flatten depth
Add `MergeEqInFilterOptimizer` to replace `MultipleOrEqualitiesToInClauseFilterQueryTreeOptimizer`
Add `MergeRangeFilterOptimizer` to replace `RangeMergeOptimizer`, and supports merging range for all single-value columns (based on schema)
This PR only adds the new code. The following PR will wire the new code and remove the old code.
---
.../pinot/core/query/optimizer/QueryOptimizer.java | 68 +++
.../query/optimizer/filter/FilterOptimizer.java | 42 ++
.../filter/FlattenAndOrFilterOptimizer.java | 88 ++++
.../optimizer/filter/MergeEqInFilterOptimizer.java | 263 +++++++++++
.../filter/MergeRangeFilterOptimizer.java | 354 +++++++++++++++
.../request/context/predicate/RangePredicate.java | 2 +-
.../core/query/optimizer/QueryOptimizerTest.java | 495 +++++++++++++++++++++
7 files changed, 1311 insertions(+), 1 deletion(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/optimizer/QueryOptimizer.java b/pinot-core/src/main/java/org/apache/pinot/core/query/optimizer/QueryOptimizer.java
new file mode 100644
index 0000000..922fcc2
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/optimizer/QueryOptimizer.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.optimizer;
+
+import java.util.Arrays;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.request.BrokerRequest;
+import org.apache.pinot.common.request.Expression;
+import org.apache.pinot.common.request.FilterQuery;
+import org.apache.pinot.common.request.PinotQuery;
+import org.apache.pinot.common.utils.request.FilterQueryTree;
+import org.apache.pinot.common.utils.request.RequestUtils;
+import org.apache.pinot.core.query.optimizer.filter.FilterOptimizer;
+import org.apache.pinot.core.query.optimizer.filter.FlattenAndOrFilterOptimizer;
+import org.apache.pinot.core.query.optimizer.filter.MergeEqInFilterOptimizer;
+import org.apache.pinot.core.query.optimizer.filter.MergeRangeFilterOptimizer;
+import org.apache.pinot.spi.data.Schema;
+
+
+public class QueryOptimizer {
+ private static final List<FilterOptimizer> FILTER_OPTIMIZERS = Arrays
+ .asList(new FlattenAndOrFilterOptimizer(), new MergeEqInFilterOptimizer(), new MergeRangeFilterOptimizer());
+
+ /**
+ * Optimizes the given PQL query.
+ */
+ public void optimize(BrokerRequest brokerRequest, @Nullable Schema schema) {
+ FilterQuery filterQuery = brokerRequest.getFilterQuery();
+ if (filterQuery != null) {
+ FilterQueryTree filterQueryTree =
+ RequestUtils.buildFilterQuery(filterQuery.getId(), brokerRequest.getFilterSubQueryMap().getFilterQueryMap());
+ for (FilterOptimizer filterOptimizer : FILTER_OPTIMIZERS) {
+ filterQueryTree = filterOptimizer.optimize(filterQueryTree, schema);
+ }
+ RequestUtils.generateFilterFromTree(filterQueryTree, brokerRequest);
+ }
+ }
+
+ /**
+ * Optimizes the given SQL query.
+ */
+ public void optimize(PinotQuery pinotQuery, @Nullable Schema schema) {
+ Expression filterExpression = pinotQuery.getFilterExpression();
+ if (filterExpression != null) {
+ for (FilterOptimizer filterOptimizer : FILTER_OPTIMIZERS) {
+ filterExpression = filterOptimizer.optimize(filterExpression, schema);
+ }
+ pinotQuery.setFilterExpression(filterExpression);
+ }
+ }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/optimizer/filter/FilterOptimizer.java b/pinot-core/src/main/java/org/apache/pinot/core/query/optimizer/filter/FilterOptimizer.java
new file mode 100644
index 0000000..e49c9a6
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/optimizer/filter/FilterOptimizer.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.optimizer.filter;
+
+import javax.annotation.Nullable;
+import org.apache.pinot.common.request.Expression;
+import org.apache.pinot.common.utils.request.FilterQueryTree;
+import org.apache.pinot.spi.data.Schema;
+
+
+/**
+ * Interface for filter optimizers.
+ * TODO: Support AlwaysTrueFilter and AlwaysFalseFilter
+ */
+public interface FilterOptimizer {
+
+ /**
+ * Optimizes the given filter, returns the optimized filter.
+ */
+ FilterQueryTree optimize(FilterQueryTree filterQueryTree, @Nullable Schema schema);
+
+ /**
+ * Optimizes the given filter, returns the optimized filter.
+ */
+ Expression optimize(Expression filterExpression, @Nullable Schema schema);
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/optimizer/filter/FlattenAndOrFilterOptimizer.java b/pinot-core/src/main/java/org/apache/pinot/core/query/optimizer/filter/FlattenAndOrFilterOptimizer.java
new file mode 100644
index 0000000..37677d1
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/optimizer/filter/FlattenAndOrFilterOptimizer.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.optimizer.filter;
+
+import java.util.ArrayList;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.request.Expression;
+import org.apache.pinot.common.request.FilterOperator;
+import org.apache.pinot.common.request.Function;
+import org.apache.pinot.common.utils.request.FilterQueryTree;
+import org.apache.pinot.pql.parsers.pql2.ast.FilterKind;
+import org.apache.pinot.spi.data.Schema;
+
+
+/**
+ * The {@code FlattenAndOrFilterOptimizer} flattens the nested AND/OR filters. For example, AND(a, AND(b, c)) can
+ * be flattened to AND(a, b, c).
+ */
+public class FlattenAndOrFilterOptimizer implements FilterOptimizer {
+
+ @Override
+ public FilterQueryTree optimize(FilterQueryTree filterQueryTree, @Nullable Schema schema) {
+ return optimize(filterQueryTree);
+ }
+
+ private FilterQueryTree optimize(FilterQueryTree filterQueryTree) {
+ FilterOperator operator = filterQueryTree.getOperator();
+ if (operator != FilterOperator.AND && operator != FilterOperator.OR) {
+ return filterQueryTree;
+ }
+ List<FilterQueryTree> children = filterQueryTree.getChildren();
+ assert children != null;
+ List<FilterQueryTree> newChildren = new ArrayList<>();
+ for (FilterQueryTree child : children) {
+ FilterQueryTree optimizedChild = optimize(child);
+ if (optimizedChild.getOperator() == operator) {
+ newChildren.addAll(optimizedChild.getChildren());
+ } else {
+ newChildren.add(optimizedChild);
+ }
+ }
+ return new FilterQueryTree(null, null, operator, newChildren);
+ }
+
+ @Override
+ public Expression optimize(Expression filterExpression, @Nullable Schema schema) {
+ return optimize(filterExpression);
+ }
+
+ private Expression optimize(Expression filterExpression) {
+ Function function = filterExpression.getFunctionCall();
+ String operator = function.getOperator();
+ if (!operator.equals(FilterKind.AND.name()) && !operator.equals(FilterKind.OR.name())) {
+ return filterExpression;
+ }
+ List<Expression> children = function.getOperands();
+ assert children != null;
+ List<Expression> newChildren = new ArrayList<>();
+ for (Expression child : children) {
+ Expression optimizedChild = optimize(child);
+ Function childFunction = optimizedChild.getFunctionCall();
+ if (childFunction.getOperator().equals(operator)) {
+ newChildren.addAll(childFunction.getOperands());
+ } else {
+ newChildren.add(optimizedChild);
+ }
+ }
+ function.setOperands(newChildren);
+ return filterExpression;
+ }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/optimizer/filter/MergeEqInFilterOptimizer.java b/pinot-core/src/main/java/org/apache/pinot/core/query/optimizer/filter/MergeEqInFilterOptimizer.java
new file mode 100644
index 0000000..3c7c321
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/optimizer/filter/MergeEqInFilterOptimizer.java
@@ -0,0 +1,263 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.optimizer.filter;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.request.Expression;
+import org.apache.pinot.common.request.FilterOperator;
+import org.apache.pinot.common.request.Function;
+import org.apache.pinot.common.utils.request.FilterQueryTree;
+import org.apache.pinot.common.utils.request.RequestUtils;
+import org.apache.pinot.pql.parsers.pql2.ast.FilterKind;
+import org.apache.pinot.spi.data.Schema;
+
+
+/**
+ * The {@code MergeEqualInFilterOptimizer} merges EQ and IN predicates on the same column joined by OR, and performs the
+ * following optimizations:
+ * <ul>
+ * <li>Merge multiple EQ and IN predicates into one IN predicate (or one EQ predicate if possible)</li>
+ * <li>De-duplicates the values in the IN predicate</li>
+ * <li>Converts single value IN predicate to EQ predicate</li>
+ * <li>Pulls up the merged predicate in the absence of other predicates</li>
+ * </ul>
+ *
+ * NOTE: This optimizer follows the {@link FlattenAndOrFilterOptimizer}, so all the AND/OR filters are already
+ * flattened.
+ */
+public class MergeEqInFilterOptimizer implements FilterOptimizer {
+
+ @Override
+ public FilterQueryTree optimize(FilterQueryTree filterQueryTree, @Nullable Schema schema) {
+ FilterOperator operator = filterQueryTree.getOperator();
+ if (operator == FilterOperator.OR) {
+ List<FilterQueryTree> children = filterQueryTree.getChildren();
+ Map<String, Set<String>> valuesMap = new HashMap<>();
+ List<FilterQueryTree> newChildren = new ArrayList<>();
+ boolean recreateFilter = false;
+
+ // Iterate over all the child filters to merge EQ and IN predicates
+ for (FilterQueryTree child : children) {
+ FilterOperator childOperator = child.getOperator();
+ assert childOperator != FilterOperator.OR;
+ if (childOperator == FilterOperator.AND) {
+ child.getChildren().replaceAll(c -> optimize(c, schema));
+ newChildren.add(child);
+ } else if (childOperator == FilterOperator.EQUALITY) {
+ String column = child.getColumn();
+ String value = child.getValue().get(0);
+ Set<String> values = valuesMap.get(column);
+ if (values == null) {
+ values = new HashSet<>();
+ values.add(value);
+ valuesMap.put(column, values);
+ } else {
+ values.add(value);
+ // Recreate filter when multiple predicates can be merged
+ recreateFilter = true;
+ }
+ } else if (childOperator == FilterOperator.IN) {
+ String column = child.getColumn();
+ List<String> inPredicateValuesList = child.getValue();
+ Set<String> inPredicateValuesSet = new HashSet<>(inPredicateValuesList);
+ int numUniqueValues = inPredicateValuesSet.size();
+ if (numUniqueValues == 1 || numUniqueValues != inPredicateValuesList.size()) {
+ // Recreate filter when the IN predicate contains only 1 value (can be rewritten to EQ predicate), or values
+ // can be de-duplicated
+ recreateFilter = true;
+ }
+ Set<String> values = valuesMap.get(column);
+ if (values == null) {
+ valuesMap.put(column, inPredicateValuesSet);
+ } else {
+ values.addAll(inPredicateValuesSet);
+ // Recreate filter when multiple predicates can be merged
+ recreateFilter = true;
+ }
+ } else {
+ newChildren.add(child);
+ }
+ }
+
+ if (recreateFilter) {
+ if (newChildren.isEmpty() && valuesMap.size() == 1) {
+ // Single predicate without other filters
+ Map.Entry<String, Set<String>> entry = valuesMap.entrySet().iterator().next();
+ return getFilterQueryTree(entry.getKey(), entry.getValue());
+ } else {
+ for (Map.Entry<String, Set<String>> entry : valuesMap.entrySet()) {
+ newChildren.add(getFilterQueryTree(entry.getKey(), entry.getValue()));
+ }
+ return new FilterQueryTree(null, null, FilterOperator.OR, newChildren);
+ }
+ } else {
+ return filterQueryTree;
+ }
+ } else if (operator == FilterOperator.AND) {
+ filterQueryTree.getChildren().replaceAll(c -> optimize(c, schema));
+ return filterQueryTree;
+ } else if (operator == FilterOperator.IN) {
+ String column = filterQueryTree.getColumn();
+ List<String> valuesList = filterQueryTree.getValue();
+ Set<String> values = new HashSet<>(valuesList);
+ int numUniqueValues = values.size();
+ if (numUniqueValues == 1 || numUniqueValues != valuesList.size()) {
+ // Recreate filter when the IN predicate contains only 1 value (can be rewritten to EQ predicate), or values
+ // can be de-duplicated
+ return getFilterQueryTree(column, values);
+ } else {
+ return filterQueryTree;
+ }
+ } else {
+ return filterQueryTree;
+ }
+ }
+
+ /**
+ * Helper method to construct a EQ or IN predicate FilterQueryTree from the given column and values.
+ */
+ private static FilterQueryTree getFilterQueryTree(String column, Set<String> values) {
+ return new FilterQueryTree(column, new ArrayList<>(values),
+ values.size() == 1 ? FilterOperator.EQUALITY : FilterOperator.IN, null);
+ }
+
+ @Override
+ public Expression optimize(Expression filterExpression, @Nullable Schema schema) {
+ Function function = filterExpression.getFunctionCall();
+ String operator = function.getOperator();
+ if (operator.equals(FilterKind.OR.name())) {
+ List<Expression> children = function.getOperands();
+ Map<Expression, Set<Expression>> valuesMap = new HashMap<>();
+ List<Expression> newChildren = new ArrayList<>();
+ boolean recreateFilter = false;
+
+ // Iterate over all the child filters to merge EQ and IN predicates
+ for (Expression child : children) {
+ Function childFunction = child.getFunctionCall();
+ String childOperator = childFunction.getOperator();
+ assert !childOperator.equals(FilterKind.OR.name());
+ if (childOperator.equals(FilterKind.AND.name())) {
+ childFunction.getOperands().replaceAll(o -> optimize(o, schema));
+ newChildren.add(child);
+ } else if (childOperator.equals(FilterKind.EQUALS.name())) {
+ List<Expression> operands = childFunction.getOperands();
+ Expression lhs = operands.get(0);
+ Expression value = operands.get(1);
+ Set<Expression> values = valuesMap.get(lhs);
+ if (values == null) {
+ values = new HashSet<>();
+ values.add(value);
+ valuesMap.put(lhs, values);
+ } else {
+ values.add(value);
+ // Recreate filter when multiple predicates can be merged
+ recreateFilter = true;
+ }
+ } else if (childOperator.equals(FilterKind.IN.name())) {
+ List<Expression> operands = childFunction.getOperands();
+ Expression lhs = operands.get(0);
+ Set<Expression> inPredicateValuesSet = new HashSet<>();
+ int numOperands = operands.size();
+ for (int i = 1; i < numOperands; i++) {
+ inPredicateValuesSet.add(operands.get(i));
+ }
+ int numUniqueValues = inPredicateValuesSet.size();
+ if (numUniqueValues == 1 || numUniqueValues != numOperands - 1) {
+ // Recreate filter when the IN predicate contains only 1 value (can be rewritten to EQ predicate), or values
+ // can be de-duplicated
+ recreateFilter = true;
+ }
+ Set<Expression> values = valuesMap.get(lhs);
+ if (values == null) {
+ valuesMap.put(lhs, inPredicateValuesSet);
+ } else {
+ values.addAll(inPredicateValuesSet);
+ // Recreate filter when multiple predicates can be merged
+ recreateFilter = true;
+ }
+ } else {
+ newChildren.add(child);
+ }
+ }
+
+ if (recreateFilter) {
+ if (newChildren.isEmpty() && valuesMap.size() == 1) {
+ // Single range without other filters
+ Map.Entry<Expression, Set<Expression>> entry = valuesMap.entrySet().iterator().next();
+ return getFilterExpression(entry.getKey(), entry.getValue());
+ } else {
+ for (Map.Entry<Expression, Set<Expression>> entry : valuesMap.entrySet()) {
+ newChildren.add(getFilterExpression(entry.getKey(), entry.getValue()));
+ }
+ function.setOperands(newChildren);
+ return filterExpression;
+ }
+ } else {
+ return filterExpression;
+ }
+ } else if (operator.equals(FilterKind.AND.name())) {
+ function.getOperands().replaceAll(c -> optimize(c, schema));
+ return filterExpression;
+ } else if (operator.equals(FilterKind.IN.name())) {
+ List<Expression> operands = function.getOperands();
+ Expression lhs = operands.get(0);
+ Set<Expression> values = new HashSet<>();
+ int numOperands = operands.size();
+ for (int i = 1; i < numOperands; i++) {
+ values.add(operands.get(i));
+ }
+ int numUniqueValues = values.size();
+ if (numUniqueValues == 1 || numUniqueValues != numOperands - 1) {
+ // Recreate filter when the IN predicate contains only 1 value (can be rewritten to EQ predicate), or values
+ // can be de-duplicated
+ return getFilterExpression(lhs, values);
+ } else {
+ return filterExpression;
+ }
+ } else {
+ return filterExpression;
+ }
+ }
+
+ /**
+ * Helper method to construct a EQ or IN predicate filter Expression from the given lhs and values.
+ */
+ private static Expression getFilterExpression(Expression lhs, Set<Expression> values) {
+ int numValues = values.size();
+ if (numValues == 1) {
+ Expression eqFilter = RequestUtils.getFunctionExpression(FilterKind.EQUALS.name());
+ eqFilter.getFunctionCall().setOperands(Arrays.asList(lhs, values.iterator().next()));
+ return eqFilter;
+ } else {
+ Expression inFilter = RequestUtils.getFunctionExpression(FilterKind.IN.name());
+ List<Expression> operands = new ArrayList<>(numValues + 1);
+ operands.add(lhs);
+ operands.addAll(values);
+ inFilter.getFunctionCall().setOperands(operands);
+ return inFilter;
+ }
+ }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/optimizer/filter/MergeRangeFilterOptimizer.java b/pinot-core/src/main/java/org/apache/pinot/core/query/optimizer/filter/MergeRangeFilterOptimizer.java
new file mode 100644
index 0000000..2a01395
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/optimizer/filter/MergeRangeFilterOptimizer.java
@@ -0,0 +1,354 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.optimizer.filter;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.common.request.Expression;
+import org.apache.pinot.common.request.ExpressionType;
+import org.apache.pinot.common.request.FilterOperator;
+import org.apache.pinot.common.request.Function;
+import org.apache.pinot.common.utils.request.FilterQueryTree;
+import org.apache.pinot.common.utils.request.RequestUtils;
+import org.apache.pinot.core.query.request.context.predicate.RangePredicate;
+import org.apache.pinot.pql.parsers.pql2.ast.FilterKind;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.BytesUtils;
+
+
+/**
+ * The {@code MergeRangeFilterOptimizer} merges multiple RANGE predicates on the same column joined by AND by taking
+ * their intersection. It also pulls up the merged predicate in the absence of other predicates.
+ *
+ * NOTE: This optimizer follows the {@link FlattenAndOrFilterOptimizer}, so all the AND/OR filters are already
+ * flattened.
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class MergeRangeFilterOptimizer implements FilterOptimizer {
+
+ @Override
+ public FilterQueryTree optimize(FilterQueryTree filterQueryTree, @Nullable Schema schema) {
+ if (schema == null) {
+ return filterQueryTree;
+ }
+ FilterOperator operator = filterQueryTree.getOperator();
+ if (operator == FilterOperator.AND) {
+ List<FilterQueryTree> children = filterQueryTree.getChildren();
+ Map<String, Range> rangeMap = new HashMap<>();
+ List<FilterQueryTree> newChildren = new ArrayList<>();
+ boolean recreateFilter = false;
+
+ // Iterate over all the child filters to create and merge ranges
+ for (FilterQueryTree child : children) {
+ FilterOperator childOperator = child.getOperator();
+ assert childOperator != FilterOperator.AND;
+ if (childOperator == FilterOperator.OR) {
+ child.getChildren().replaceAll(c -> optimize(c, schema));
+ newChildren.add(child);
+ } else if (childOperator == FilterOperator.RANGE) {
+ String column = child.getColumn();
+ FieldSpec fieldSpec = schema.getFieldSpecFor(column);
+ if (fieldSpec == null || !fieldSpec.isSingleValueField()) {
+ // Skip optimizing transform expression and multi-value column
+ // NOTE: We cannot optimize multi-value column because [0, 10] will match filter "col < 1 AND col > 9", but
+ // not the merged one.
+ newChildren.add(child);
+ continue;
+ }
+ // Create a range and merge with current range if exists
+ Range range = getRange(child.getValue().get(0), fieldSpec.getDataType());
+ Range currentRange = rangeMap.get(column);
+ if (currentRange == null) {
+ rangeMap.put(column, range);
+ } else {
+ currentRange.intersect(range);
+ recreateFilter = true;
+ }
+ } else {
+ newChildren.add(child);
+ }
+ }
+
+ if (recreateFilter) {
+ if (newChildren.isEmpty() && rangeMap.size() == 1) {
+ // Single range without other filters
+ Map.Entry<String, Range> entry = rangeMap.entrySet().iterator().next();
+ return getRangeFilterQueryTree(entry.getKey(), entry.getValue());
+ } else {
+ for (Map.Entry<String, Range> entry : rangeMap.entrySet()) {
+ newChildren.add(getRangeFilterQueryTree(entry.getKey(), entry.getValue()));
+ }
+ return new FilterQueryTree(null, null, FilterOperator.AND, newChildren);
+ }
+ } else {
+ return filterQueryTree;
+ }
+ } else if (operator == FilterOperator.OR) {
+ filterQueryTree.getChildren().replaceAll(c -> optimize(c, schema));
+ return filterQueryTree;
+ } else {
+ return filterQueryTree;
+ }
+ }
+
+ /**
+ * Helper method to create a Range from the given string representation of the range and data type. See
+ * {@link RangePredicate} for details.
+ */
+ private static Range getRange(String rangeString, DataType dataType) {
+ String[] split = StringUtils.split(rangeString, RangePredicate.LEGACY_DELIMITER);
+ String lower = split[0];
+ boolean lowerInclusive = lower.charAt(0) == RangePredicate.LOWER_INCLUSIVE;
+ String stringLowerBound = lower.substring(1);
+ Comparable lowerBound =
+ stringLowerBound.equals(RangePredicate.UNBOUNDED) ? null : getComparable(stringLowerBound, dataType);
+ String upper = split[1];
+ int upperLength = upper.length();
+ boolean upperInclusive = upper.charAt(upperLength - 1) == RangePredicate.UPPER_INCLUSIVE;
+ String stringUpperBound = upper.substring(0, upperLength - 1);
+ Comparable upperBound =
+ stringUpperBound.equals(RangePredicate.UNBOUNDED) ? null : getComparable(stringUpperBound, dataType);
+ return new Range(lowerBound, lowerInclusive, upperBound, upperInclusive);
+ }
+
+ /**
+ * Helper method to create a Comparable from the given string value and data type.
+ */
+ private static Comparable getComparable(String stringValue, DataType dataType) {
+ switch (dataType) {
+ case INT:
+ return Integer.parseInt(stringValue);
+ case LONG:
+ return Long.parseLong(stringValue);
+ case FLOAT:
+ return Float.parseFloat(stringValue);
+ case DOUBLE:
+ return Double.parseDouble(stringValue);
+ case STRING:
+ return stringValue;
+ case BYTES:
+ return BytesUtils.toByteArray(stringValue);
+ default:
+ throw new IllegalStateException();
+ }
+ }
+
+ /**
+ * Helper method to construct a RANGE predicate FilterQueryTree from the given column and range.
+ */
+ private static FilterQueryTree getRangeFilterQueryTree(String column, Range range) {
+ return new FilterQueryTree(column, Collections.singletonList(range.getRangeString()), FilterOperator.RANGE, null);
+ }
+
+ @Override
+ public Expression optimize(Expression filterExpression, @Nullable Schema schema) {
+ if (schema == null) {
+ return filterExpression;
+ }
+ Function function = filterExpression.getFunctionCall();
+ String operator = function.getOperator();
+ if (operator.equals(FilterKind.AND.name())) {
+ List<Expression> children = function.getOperands();
+ Map<String, Range> rangeMap = new HashMap<>();
+ List<Expression> newChildren = new ArrayList<>();
+ boolean recreateFilter = false;
+
+ // Iterate over all the child filters to create and merge ranges
+ for (Expression child : children) {
+ Function childFunction = child.getFunctionCall();
+ FilterKind filterKind = FilterKind.valueOf(childFunction.getOperator());
+ assert filterKind != FilterKind.AND;
+ if (filterKind == FilterKind.OR) {
+ childFunction.getOperands().replaceAll(o -> optimize(o, schema));
+ newChildren.add(child);
+ } else if (filterKind.isRange()) {
+ List<Expression> operands = childFunction.getOperands();
+ Expression lhs = operands.get(0);
+ if (lhs.getType() != ExpressionType.IDENTIFIER) {
+ // Skip optimizing transform expression
+ newChildren.add(child);
+ continue;
+ }
+ String column = lhs.getIdentifier().getName();
+ FieldSpec fieldSpec = schema.getFieldSpecFor(column);
+ if (fieldSpec == null || !fieldSpec.isSingleValueField()) {
+ // Skip optimizing multi-value column
+ // NOTE: We cannot optimize multi-value column because [0, 10] will match filter "col < 1 AND col > 9", but
+ // not the merged one.
+ newChildren.add(child);
+ continue;
+ }
+ // Create a range and merge with current range if exists
+ DataType dataType = fieldSpec.getDataType();
+ Range range = getRange(filterKind, operands, dataType);
+ Range currentRange = rangeMap.get(column);
+ if (currentRange == null) {
+ rangeMap.put(column, range);
+ } else {
+ currentRange.intersect(range);
+ recreateFilter = true;
+ }
+ } else {
+ newChildren.add(child);
+ }
+ }
+
+ if (recreateFilter) {
+ if (newChildren.isEmpty() && rangeMap.size() == 1) {
+ // Single range without other filters
+ Map.Entry<String, Range> entry = rangeMap.entrySet().iterator().next();
+ return getRangeFilterExpression(entry.getKey(), entry.getValue());
+ } else {
+ for (Map.Entry<String, Range> entry : rangeMap.entrySet()) {
+ newChildren.add(getRangeFilterExpression(entry.getKey(), entry.getValue()));
+ }
+ function.setOperands(newChildren);
+ return filterExpression;
+ }
+ } else {
+ return filterExpression;
+ }
+ } else if (operator.equals(FilterKind.OR.name())) {
+ function.getOperands().replaceAll(c -> optimize(c, schema));
+ return filterExpression;
+ } else {
+ return filterExpression;
+ }
+ }
+
+ /**
+ * Helper method to create a Range from the given filter kind, operands and data type.
+ */
+ private static Range getRange(FilterKind filterKind, List<Expression> operands, DataType dataType) {
+ switch (filterKind) {
+ case GREATER_THAN:
+ return new Range(getComparable(operands.get(1), dataType), false, null, false);
+ case GREATER_THAN_OR_EQUAL:
+ return new Range(getComparable(operands.get(1), dataType), true, null, false);
+ case LESS_THAN:
+ return new Range(null, false, getComparable(operands.get(1), dataType), false);
+ case LESS_THAN_OR_EQUAL:
+ return new Range(null, false, getComparable(operands.get(1), dataType), true);
+ case BETWEEN:
+ return new Range(getComparable(operands.get(1), dataType), true, getComparable(operands.get(2), dataType),
+ true);
+ default:
+ throw new IllegalStateException("Unsupported filter kind: " + filterKind);
+ }
+ }
+
+ /**
+ * Helper method to create a Comparable from the given literal expression and data type.
+ */
+ private static Comparable getComparable(Expression literalExpression, DataType dataType) {
+ return getComparable(literalExpression.getLiteral().getFieldValue().toString(), dataType);
+ }
+
+ /**
+ * Helper method to construct a RANGE predicate filter Expression from the given column and range.
+ */
+ private static Expression getRangeFilterExpression(String column, Range range) {
+ Expression rangeFilter = RequestUtils.getFunctionExpression(FilterKind.RANGE.name());
+ rangeFilter.getFunctionCall().setOperands(Arrays.asList(RequestUtils.createIdentifierExpression(column),
+ RequestUtils.getLiteralExpression(range.getRangeString())));
+ return rangeFilter;
+ }
+
+ /**
+ * Helper class to represent a value range.
+ */
+ private static class Range {
+ Comparable _lowerBound;
+ boolean _lowerInclusive;
+ Comparable _upperBound;
+ boolean _upperInclusive;
+
+ Range(@Nullable Comparable lowerBound, boolean lowerInclusive, @Nullable Comparable upperBound,
+ boolean upperInclusive) {
+ _lowerBound = lowerBound;
+ _lowerInclusive = lowerInclusive;
+ _upperBound = upperBound;
+ _upperInclusive = upperInclusive;
+ }
+
+ /**
+ * Intersects the current range with another range.
+ */
+ void intersect(Range range) {
+ if (range._lowerBound != null) {
+ if (_lowerBound == null) {
+ _lowerInclusive = range._lowerInclusive;
+ _lowerBound = range._lowerBound;
+ } else {
+ int result = _lowerBound.compareTo(range._lowerBound);
+ if (result < 0) {
+ _lowerBound = range._lowerBound;
+ _lowerInclusive = range._lowerInclusive;
+ } else if (result == 0) {
+ _lowerInclusive &= range._lowerInclusive;
+ }
+ }
+ }
+ if (range._upperBound != null) {
+ if (_upperBound == null) {
+ _upperInclusive = range._upperInclusive;
+ _upperBound = range._upperBound;
+ } else {
+ int result = _upperBound.compareTo(range._upperBound);
+ if (result > 0) {
+ _upperBound = range._upperBound;
+ _upperInclusive = range._upperInclusive;
+ } else if (result == 0) {
+ _upperInclusive &= range._upperInclusive;
+ }
+ }
+ }
+ }
+
+ /**
+ * Returns the string representation of the range. See {@link RangePredicate} for details.
+ */
+ String getRangeString() {
+ StringBuilder stringBuilder = new StringBuilder();
+ if (_lowerBound == null) {
+ stringBuilder.append(RangePredicate.LOWER_EXCLUSIVE).append(RangePredicate.UNBOUNDED);
+ } else {
+ stringBuilder.append(_lowerInclusive ? RangePredicate.LOWER_INCLUSIVE : RangePredicate.LOWER_EXCLUSIVE);
+ stringBuilder.append(_lowerBound.toString());
+ }
+ // TODO: Switch to RangePredicate.DELIMITER after releasing 0.6.0
+ stringBuilder.append(RangePredicate.LEGACY_DELIMITER);
+ if (_upperBound == null) {
+ stringBuilder.append(RangePredicate.UNBOUNDED).append(RangePredicate.UPPER_EXCLUSIVE);
+ } else {
+ stringBuilder.append(_upperBound.toString());
+ stringBuilder.append(_upperInclusive ? RangePredicate.UPPER_INCLUSIVE : RangePredicate.UPPER_EXCLUSIVE);
+ }
+ return stringBuilder.toString();
+ }
+ }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/predicate/RangePredicate.java b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/predicate/RangePredicate.java
index d2b5b26..c80bbec 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/predicate/RangePredicate.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/predicate/RangePredicate.java
@@ -29,7 +29,7 @@ import org.apache.pinot.core.query.request.context.ExpressionContext;
*/
public class RangePredicate implements Predicate {
public static final char DELIMITER = '\0';
- // TODO: Remove the legacy delimiter after releasing 0.5.0
+ // TODO: Remove the legacy delimiter after releasing 0.6.0
public static final String LEGACY_DELIMITER = "\t\t";
public static final char LOWER_INCLUSIVE = '[';
public static final char LOWER_EXCLUSIVE = '(';
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/optimizer/QueryOptimizerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/optimizer/QueryOptimizerTest.java
new file mode 100644
index 0000000..d37bdfc
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/core/query/optimizer/QueryOptimizerTest.java
@@ -0,0 +1,495 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.optimizer;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import org.apache.pinot.common.request.BrokerRequest;
+import org.apache.pinot.common.request.Expression;
+import org.apache.pinot.common.request.FilterOperator;
+import org.apache.pinot.common.request.Function;
+import org.apache.pinot.common.request.PinotQuery;
+import org.apache.pinot.common.utils.request.FilterQueryTree;
+import org.apache.pinot.common.utils.request.RequestUtils;
+import org.apache.pinot.pql.parsers.Pql2Compiler;
+import org.apache.pinot.pql.parsers.pql2.ast.FilterKind;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.sql.parsers.CalciteSqlCompiler;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.*;
+
+
+public class QueryOptimizerTest {
+ private static final QueryOptimizer OPTIMIZER = new QueryOptimizer();
+ private static final CalciteSqlCompiler SQL_COMPILER = new CalciteSqlCompiler();
+ private static final Pql2Compiler PQL_COMPILER = new Pql2Compiler();
+ private static final Schema SCHEMA =
+ new Schema.SchemaBuilder().setSchemaName("testTable").addSingleValueDimension("int", DataType.INT)
+ .addSingleValueDimension("long", DataType.LONG).addSingleValueDimension("float", DataType.FLOAT)
+ .addSingleValueDimension("double", DataType.DOUBLE).addSingleValueDimension("string", DataType.STRING)
+ .addSingleValueDimension("bytes", DataType.BYTES).addMultiValueDimension("mvInt", DataType.INT).build();
+
+ @Test
+ public void testNoFilter() {
+ String query = "SELECT * FROM testTable";
+
+ BrokerRequest sqlBrokerRequest = SQL_COMPILER.compileToBrokerRequest(query);
+ PinotQuery pinotQuery = sqlBrokerRequest.getPinotQuery();
+ BrokerRequest pqlBrokerRequest = PQL_COMPILER.compileToBrokerRequest(query);
+
+ OPTIMIZER.optimize(sqlBrokerRequest, SCHEMA);
+ assertNull(sqlBrokerRequest.getFilterQuery());
+
+ OPTIMIZER.optimize(pinotQuery, SCHEMA);
+ assertNull(pinotQuery.getFilterExpression());
+
+ OPTIMIZER.optimize(pqlBrokerRequest, SCHEMA);
+ assertNull(pqlBrokerRequest.getFilterQuery());
+ }
+
+ @Test
+ public void testFlattenAndOrFilter() {
+ String query =
+ "SELECT * FROM testTable WHERE ((int = 4 OR (long = 5 AND (float = 9 AND double = 7.5))) OR string = 'foo') OR bytes = 'abc'";
+
+ BrokerRequest sqlBrokerRequest = SQL_COMPILER.compileToBrokerRequest(query);
+ PinotQuery pinotQuery = sqlBrokerRequest.getPinotQuery();
+ BrokerRequest pqlBrokerRequest = PQL_COMPILER.compileToBrokerRequest(query);
+
+ for (BrokerRequest brokerRequest : Arrays.asList(sqlBrokerRequest, pqlBrokerRequest)) {
+ OPTIMIZER.optimize(brokerRequest, SCHEMA);
+ FilterQueryTree filterQueryTree = RequestUtils.buildFilterQuery(brokerRequest.getFilterQuery().getId(),
+ brokerRequest.getFilterSubQueryMap().getFilterQueryMap());
+ assertEquals(filterQueryTree.getOperator(), FilterOperator.OR);
+ List<FilterQueryTree> children = filterQueryTree.getChildren();
+ assertEquals(children.size(), 4);
+ assertEquals(children.get(0).toString(), "int EQUALITY [4]");
+ assertEquals(children.get(2).toString(), "string EQUALITY [foo]");
+ assertEquals(children.get(3).toString(), "bytes EQUALITY [abc]");
+
+ FilterQueryTree andFilter = children.get(1);
+ assertEquals(andFilter.getOperator(), FilterOperator.AND);
+ List<FilterQueryTree> andFilterChildren = andFilter.getChildren();
+ assertEquals(andFilterChildren.size(), 3);
+ assertEquals(andFilterChildren.get(0).toString(), "long EQUALITY [5]");
+ assertEquals(andFilterChildren.get(1).toString(), "float EQUALITY [9]");
+ assertEquals(andFilterChildren.get(2).toString(), "double EQUALITY [7.5]");
+ }
+
+ OPTIMIZER.optimize(pinotQuery, SCHEMA);
+ Function filterFunction = pinotQuery.getFilterExpression().getFunctionCall();
+ assertEquals(filterFunction.getOperator(), FilterKind.OR.name());
+ List<Expression> children = filterFunction.getOperands();
+ assertEquals(children.size(), 4);
+ assertEquals(children.get(0), getEqFilterExpression("int", 4));
+ assertEquals(children.get(2), getEqFilterExpression("string", "foo"));
+ assertEquals(children.get(3), getEqFilterExpression("bytes", "abc"));
+
+ Function secondChildFunction = children.get(1).getFunctionCall();
+ assertEquals(secondChildFunction.getOperator(), FilterKind.AND.name());
+ List<Expression> secondChildChildren = secondChildFunction.getOperands();
+ assertEquals(secondChildChildren.size(), 3);
+ assertEquals(secondChildChildren.get(0), getEqFilterExpression("long", 5));
+ assertEquals(secondChildChildren.get(1), getEqFilterExpression("float", 9));
+ assertEquals(secondChildChildren.get(2), getEqFilterExpression("double", 7.5));
+ }
+
+ private static Expression getEqFilterExpression(String column, Object value) {
+ Expression eqFilterExpression = RequestUtils.getFunctionExpression(FilterKind.EQUALS.name());
+ eqFilterExpression.getFunctionCall().setOperands(
+ Arrays.asList(RequestUtils.getIdentifierExpression(column), RequestUtils.getLiteralExpression(value)));
+ return eqFilterExpression;
+ }
+
+ @Test
+ public void testMergeEqInFilter() {
+ String query =
+ "SELECT * FROM testTable WHERE int IN (1, 1) AND (long IN (2, 3) OR long IN (3, 4) OR long = 2) AND (float = 3.5 OR double IN (1.1, 1.2) OR float = 4.5 OR float > 5.5 OR double = 1.3)";
+
+ BrokerRequest sqlBrokerRequest = SQL_COMPILER.compileToBrokerRequest(query);
+ PinotQuery pinotQuery = sqlBrokerRequest.getPinotQuery();
+ BrokerRequest pqlBrokerRequest = PQL_COMPILER.compileToBrokerRequest(query);
+
+ for (BrokerRequest brokerRequest : Arrays.asList(sqlBrokerRequest, pqlBrokerRequest)) {
+ OPTIMIZER.optimize(brokerRequest, SCHEMA);
+ FilterQueryTree filterQueryTree = RequestUtils.buildFilterQuery(brokerRequest.getFilterQuery().getId(),
+ brokerRequest.getFilterSubQueryMap().getFilterQueryMap());
+ assertEquals(filterQueryTree.getOperator(), FilterOperator.AND);
+ List<FilterQueryTree> children = filterQueryTree.getChildren();
+ assertEquals(children.size(), 3);
+ assertEquals(children.get(0).toString(), "int EQUALITY [1]");
+
+ FilterQueryTree secondChild = children.get(1);
+ assertEquals(secondChild.getColumn(), "long");
+ assertEquals(secondChild.getOperator(), FilterOperator.IN);
+ assertEqualsNoOrder(secondChild.getValue().toArray(), new Object[]{"2", "3", "4"});
+
+ FilterQueryTree thirdChild = children.get(2);
+ assertEquals(thirdChild.getOperator(), FilterOperator.OR);
+ List<FilterQueryTree> orFilterChildren = thirdChild.getChildren();
+ assertEquals(orFilterChildren.size(), 3);
+ assertEquals(orFilterChildren.get(0).toString(), "float RANGE [(5.5\t\t*)]");
+
+ // Order of second and third child is not deterministic
+ FilterQueryTree secondOrFilterChild = orFilterChildren.get(1);
+ assertEquals(secondOrFilterChild.getOperator(), FilterOperator.IN);
+ FilterQueryTree thirdOrFilterChild = orFilterChildren.get(2);
+ assertEquals(thirdOrFilterChild.getOperator(), FilterOperator.IN);
+ if (secondOrFilterChild.getColumn().equals("float")) {
+ assertEqualsNoOrder(secondOrFilterChild.getValue().toArray(), new Object[]{"3.5", "4.5"});
+ assertEquals(thirdOrFilterChild.getColumn(), "double");
+ assertEqualsNoOrder(thirdOrFilterChild.getValue().toArray(), new Object[]{"1.1", "1.2", "1.3"});
+ } else {
+ assertEquals(secondOrFilterChild.getColumn(), "double");
+ assertEqualsNoOrder(secondOrFilterChild.getValue().toArray(), new Object[]{"1.1", "1.2", "1.3"});
+ assertEquals(thirdOrFilterChild.getColumn(), "float");
+ assertEqualsNoOrder(thirdOrFilterChild.getValue().toArray(), new Object[]{"3.5", "4.5"});
+ }
+ }
+
+ OPTIMIZER.optimize(pinotQuery, SCHEMA);
+ Function filterFunction = pinotQuery.getFilterExpression().getFunctionCall();
+ assertEquals(filterFunction.getOperator(), FilterKind.AND.name());
+ List<Expression> children = filterFunction.getOperands();
+ assertEquals(children.size(), 3);
+ assertEquals(children.get(0), getEqFilterExpression("int", 1));
+ checkInFilterFunction(children.get(1).getFunctionCall(), "long", Arrays.asList(2, 3, 4));
+
+ Function thirdChildFunction = children.get(2).getFunctionCall();
+ assertEquals(thirdChildFunction.getOperator(), FilterKind.OR.name());
+ List<Expression> thirdChildChildren = thirdChildFunction.getOperands();
+ assertEquals(thirdChildChildren.size(), 3);
+ assertEquals(thirdChildChildren.get(0).getFunctionCall().getOperator(), FilterKind.GREATER_THAN.name());
+
+ // Order of second and third child is not deterministic
+ Function secondGrandChildFunction = thirdChildChildren.get(1).getFunctionCall();
+ assertEquals(secondGrandChildFunction.getOperator(), FilterKind.IN.name());
+ Function thirdGrandChildFunction = thirdChildChildren.get(2).getFunctionCall();
+ assertEquals(thirdGrandChildFunction.getOperator(), FilterKind.IN.name());
+ if (secondGrandChildFunction.getOperands().get(0).getIdentifier().getName().equals("float")) {
+ checkInFilterFunction(secondGrandChildFunction, "float", Arrays.asList(3.5, 4.5));
+ checkInFilterFunction(thirdGrandChildFunction, "double", Arrays.asList(1.1, 1.2, 1.3));
+ } else {
+ checkInFilterFunction(secondGrandChildFunction, "double", Arrays.asList(1.1, 1.2, 1.3));
+ checkInFilterFunction(thirdGrandChildFunction, "float", Arrays.asList(3.5, 4.5));
+ }
+ }
+
+ private static void checkInFilterFunction(Function inFilterFunction, String column, List<Object> values) {
+ assertEquals(inFilterFunction.getOperator(), FilterKind.IN.name());
+ List<Expression> operands = inFilterFunction.getOperands();
+ int numOperands = operands.size();
+ assertEquals(numOperands, values.size() + 1);
+ assertEquals(operands.get(0).getIdentifier().getName(), column);
+ Set<Expression> valueExpressions = new HashSet<>();
+ for (Object value : values) {
+ valueExpressions.add(RequestUtils.getLiteralExpression(value));
+ }
+ for (int i = 1; i < numOperands; i++) {
+ assertTrue(valueExpressions.contains(operands.get(i)));
+ }
+ }
+
+ @Test
+ public void testMergeRangeFilter() {
+ String query =
+ "SELECT * FROM testTable WHERE (int > 10 AND int <= 100 AND int BETWEEN 10 AND 20) OR (float BETWEEN 5.5 AND 7.5 AND float = 6 AND float < 6.5 AND float BETWEEN 6 AND 8) OR (string > '123' AND string > '23') OR (mvInt > 5 AND mvInt < 0)";
+
+ BrokerRequest sqlBrokerRequest = SQL_COMPILER.compileToBrokerRequest(query);
+ PinotQuery pinotQuery = sqlBrokerRequest.getPinotQuery();
+ BrokerRequest pqlBrokerRequest = PQL_COMPILER.compileToBrokerRequest(query);
+
+ for (BrokerRequest brokerRequest : Arrays.asList(sqlBrokerRequest, pqlBrokerRequest)) {
+ OPTIMIZER.optimize(brokerRequest, SCHEMA);
+ FilterQueryTree filterQueryTree = RequestUtils.buildFilterQuery(brokerRequest.getFilterQuery().getId(),
+ brokerRequest.getFilterSubQueryMap().getFilterQueryMap());
+ assertEquals(filterQueryTree.getOperator(), FilterOperator.OR);
+ List<FilterQueryTree> children = filterQueryTree.getChildren();
+ assertEquals(children.size(), 4);
+ assertEquals(children.get(0).toString(), "int RANGE [(10\t\t20]]");
+ // Alphabetical order for STRING column ('23' > '123')
+ assertEquals(children.get(2).toString(), "string RANGE [(23\t\t*)]");
+
+ FilterQueryTree secondChild = children.get(1);
+ assertEquals(secondChild.getOperator(), FilterOperator.AND);
+ assertEquals(secondChild.getChildren().size(), 2);
+ assertEquals(secondChild.getChildren().get(0).toString(), "float EQUALITY [6]");
+ assertEquals(secondChild.getChildren().get(1).toString(), "float RANGE [[6.0\t\t6.5)]");
+
+ // Range filter on multi-value column should not be merged ([-5, 10] can match this filter)
+ FilterQueryTree fourthChild = children.get(3);
+ assertEquals(fourthChild.getOperator(), FilterOperator.AND);
+ assertEquals(fourthChild.getChildren().size(), 2);
+ assertEquals(fourthChild.getChildren().get(0).toString(), "mvInt RANGE [(5\t\t*)]");
+ assertEquals(fourthChild.getChildren().get(1).toString(), "mvInt RANGE [(*\t\t0)]");
+ }
+
+ OPTIMIZER.optimize(pinotQuery, SCHEMA);
+ Function filterFunction = pinotQuery.getFilterExpression().getFunctionCall();
+ assertEquals(filterFunction.getOperator(), FilterKind.OR.name());
+ List<Expression> operands = filterFunction.getOperands();
+ assertEquals(operands.size(), 4);
+ assertEquals(operands.get(0), getRangeFilterExpression("int", "(10\t\t20]"));
+ // Alphabetical order for STRING column ('23' > '123')
+ assertEquals(operands.get(2), getRangeFilterExpression("string", "(23\t\t*)"));
+
+ Function secondChildFunction = operands.get(1).getFunctionCall();
+ assertEquals(secondChildFunction.getOperator(), FilterKind.AND.name());
+ List<Expression> secondChildChildren = secondChildFunction.getOperands();
+ assertEquals(secondChildChildren.size(), 2);
+ assertEquals(secondChildChildren.get(0), getEqFilterExpression("float", 6));
+ assertEquals(secondChildChildren.get(1), getRangeFilterExpression("float", "[6.0\t\t6.5)"));
+
+ // Range filter on multi-value column should not be merged ([-5, 10] can match this filter)
+ Function fourthChildFunction = operands.get(3).getFunctionCall();
+ assertEquals(fourthChildFunction.getOperator(), FilterKind.AND.name());
+ List<Expression> fourthChildChildren = fourthChildFunction.getOperands();
+ assertEquals(fourthChildChildren.size(), 2);
+ assertEquals(fourthChildChildren.get(0).getFunctionCall().getOperator(), FilterKind.GREATER_THAN.name());
+ assertEquals(fourthChildChildren.get(1).getFunctionCall().getOperator(), FilterKind.LESS_THAN.name());
+ }
+
+ private static Expression getRangeFilterExpression(String column, String rangeString) {
+ Expression rangeFilterExpression = RequestUtils.getFunctionExpression(FilterKind.RANGE.name());
+ rangeFilterExpression.getFunctionCall().setOperands(
+ Arrays.asList(RequestUtils.getIdentifierExpression(column), RequestUtils.getLiteralExpression(rangeString)));
+ return rangeFilterExpression;
+ }
+
+ @Test
+ public void testQueries() {
+ // MergeEqInFilter
+ testQuery("SELECT * FROM testTable WHERE int = 1 OR int = 2 OR int = 3",
+ "SELECT * FROM testTable WHERE int IN (1, 2, 3)");
+ testQuery("SELECT * FROM testTable WHERE int = 1 OR int = 2 OR int = 3 AND long = 4",
+ "SELECT * FROM testTable WHERE int IN (1, 2) OR (int = 3 AND long = 4)");
+ testQuery("SELECT * FROM testTable WHERE int = 1 OR int = 2 OR int = 3 OR long = 4 OR long = 5 OR long = 6",
+ "SELECT * FROM testTable WHERE int IN (1, 2, 3) OR long IN (4, 5, 6)");
+ testQuery("SELECT * FROM testTable WHERE int = 1 OR long = 4 OR int = 2 OR long = 5 OR int = 3 OR long = 6",
+ "SELECT * FROM testTable WHERE int IN (1, 2, 3) OR long IN (4, 5, 6)");
+ testQuery("SELECT * FROM testTable WHERE int = 1 OR int = 1", "SELECT * FROM testTable WHERE int = 1");
+ testQuery("SELECT * FROM testTable WHERE (int = 1 OR int = 1) AND long = 2",
+ "SELECT * FROM testTable WHERE int = 1 AND long = 2");
+ testQuery("SELECT * FROM testTable WHERE int = 1 OR int IN (2, 3, 4, 5)",
+ "SELECT * FROM testTable WHERE int IN (1, 2, 3, 4, 5)");
+ testQuery("SELECT * FROM testTable WHERE int IN (1, 1) OR int = 1", "SELECT * FROM testTable WHERE int = 1");
+ testQuery("SELECT * FROM testTable WHERE string = 'foo' OR string = 'bar' OR string = 'foobar'",
+ "SELECT * FROM testTable WHERE string IN ('foo', 'bar', 'foobar')");
+ testQuery("SELECT * FROM testTable WHERE bytes = 'dead' OR bytes = 'beef' OR bytes = 'deadbeef'",
+ "SELECT * FROM testTable WHERE bytes IN ('dead', 'beef', 'deadbeef')");
+
+ // MergeRangeFilter
+ testQuery("SELECT * FROM testTable WHERE int >= 10 AND int <= 20",
+ "SELECT * FROM testTable WHERE int BETWEEN 10 AND 20");
+ testQuery("SELECT * FROM testTable WHERE int BETWEEN 10 AND 20 AND int > 7 AND int <= 17 OR int > 20",
+ "SELECT * FROM testTable WHERE int BETWEEN 10 AND 17 OR int > 20");
+ testQuery("SELECT * FROM testTable WHERE long BETWEEN 10 AND 20 AND long > 7 AND long <= 17 OR long > 20",
+ "SELECT * FROM testTable WHERE long BETWEEN 10 AND 17 OR long > 20");
+ testQuery("SELECT * FROM testTable WHERE float BETWEEN 10.5 AND 20 AND float > 7 AND float <= 17.5 OR float > 20",
+ "SELECT * FROM testTable WHERE float BETWEEN 10.5 AND 17.5 OR float > 20");
+ testQuery(
+ "SELECT * FROM testTable WHERE double BETWEEN 10.5 AND 20 AND double > 7 AND double <= 17.5 OR double > 20",
+ "SELECT * FROM testTable WHERE double BETWEEN 10.5 AND 17.5 OR double > 20");
+ testQuery(
+ "SELECT * FROM testTable WHERE string BETWEEN '10' AND '20' AND string > '7' AND string <= '17' OR string > '20'",
+ "SELECT * FROM testTable WHERE string > '7' AND string <= '17' OR string > '20'");
+ testQuery(
+ "SELECT * FROM testTable WHERE bytes BETWEEN '10' AND '20' AND bytes > '07' AND bytes <= '17' OR bytes > '20'",
+ "SELECT * FROM testTable WHERE bytes BETWEEN '10' AND '17' OR bytes > '20'");
+ testQuery(
+ "SELECT * FROM testTable WHERE int > 10 AND long > 20 AND int <= 30 AND long <= 40 AND int >= 15 AND long >= 25",
+ "SELECT * FROM testTable WHERE int BETWEEN 15 AND 30 AND long BETWEEN 25 AND 40");
+ testQuery("SELECT * FROM testTable WHERE int > 10 AND int > 20 OR int < 30 AND int < 40",
+ "SELECT * FROM testTable WHERE int > 20 OR int < 30");
+ testQuery("SELECT * FROM testTable WHERE int > 10 AND int > 20 OR long < 30 AND long < 40",
+ "SELECT * FROM testTable WHERE int > 20 OR long < 30");
+
+ // Mixed
+ testQuery(
+ "SELECT * FROM testTable WHERE int >= 20 AND (int > 10 AND (int IN (1, 2) OR (int = 2 OR int = 3)) AND int <= 30)",
+ "SELECT * FROM testTable WHERE int BETWEEN 20 AND 30 AND int IN (1, 2, 3)");
+ }
+
+ private static void testQuery(String actual, String expected) {
+ BrokerRequest actualSqlBrokerRequest = SQL_COMPILER.compileToBrokerRequest(actual);
+ OPTIMIZER.optimize(actualSqlBrokerRequest, SCHEMA);
+ PinotQuery actualPinotQuery = actualSqlBrokerRequest.getPinotQuery();
+ OPTIMIZER.optimize(actualPinotQuery, SCHEMA);
+ BrokerRequest actualPqlBrokerRequest = PQL_COMPILER.compileToBrokerRequest(actual);
+ OPTIMIZER.optimize(actualPqlBrokerRequest, SCHEMA);
+
+ // Also optimize the expected query because the expected range can only be generate via optimizer
+ BrokerRequest expectedSqlBrokerRequest = SQL_COMPILER.compileToBrokerRequest(expected);
+ OPTIMIZER.optimize(expectedSqlBrokerRequest, SCHEMA);
+ PinotQuery expectedPinotQuery = expectedSqlBrokerRequest.getPinotQuery();
+ OPTIMIZER.optimize(expectedPinotQuery, SCHEMA);
+ BrokerRequest expectedPqlBrokerRequest = PQL_COMPILER.compileToBrokerRequest(expected);
+ OPTIMIZER.optimize(expectedPqlBrokerRequest, SCHEMA);
+
+ // Cross compare PQL and SQL BrokerRequest
+ compareBrokerRequest(actualPqlBrokerRequest, expectedPqlBrokerRequest);
+ compareBrokerRequest(actualPqlBrokerRequest, expectedSqlBrokerRequest);
+ compareBrokerRequest(actualSqlBrokerRequest, expectedPqlBrokerRequest);
+ compareBrokerRequest(actualSqlBrokerRequest, expectedSqlBrokerRequest);
+ comparePinotQuery(actualPinotQuery, expectedPinotQuery);
+ }
+
+ private static void compareBrokerRequest(BrokerRequest actual, BrokerRequest expected) {
+ if (expected.getFilterQuery() == null) {
+ assertNull(actual.getFilterQuery());
+ return;
+ }
+ FilterQueryTree actualFilter = RequestUtils
+ .buildFilterQuery(actual.getFilterQuery().getId(), actual.getFilterSubQueryMap().getFilterQueryMap());
+ FilterQueryTree expectedFilter = RequestUtils
+ .buildFilterQuery(expected.getFilterQuery().getId(), expected.getFilterSubQueryMap().getFilterQueryMap());
+ compareFilterQueryTree(actualFilter, expectedFilter);
+ }
+
+ private static void compareFilterQueryTree(FilterQueryTree actual, FilterQueryTree expected) {
+ assertEquals(actual.getOperator(), expected.getOperator());
+ FilterOperator operator = actual.getOperator();
+ if (operator == FilterOperator.AND || operator == FilterOperator.OR) {
+ assertNull(actual.getColumn());
+ assertNull(actual.getValue());
+ compareFilterQueryTreeChildren(actual.getChildren(), expected.getChildren());
+ } else {
+ assertEquals(actual.getColumn(), expected.getColumn());
+ assertNull(actual.getChildren());
+ if (operator == FilterOperator.IN || operator == FilterOperator.NOT_IN) {
+ assertEqualsNoOrder(actual.getValue().toArray(), expected.getValue().toArray());
+ } else {
+ assertEquals(actual.getValue(), expected.getValue());
+ }
+ }
+ }
+
+ /**
+ * Handles different order of children under AND/OR filter.
+ */
+ private static void compareFilterQueryTreeChildren(List<FilterQueryTree> actual, List<FilterQueryTree> expected) {
+ assertEquals(actual.size(), expected.size());
+ List<FilterQueryTree> unmatchedExpectedChildren = new ArrayList<>(expected);
+ for (FilterQueryTree actualChild : actual) {
+ Iterator<FilterQueryTree> iterator = unmatchedExpectedChildren.iterator();
+ boolean findMatchingChild = false;
+ while (iterator.hasNext()) {
+ try {
+ compareFilterQueryTree(actualChild, iterator.next());
+ iterator.remove();
+ findMatchingChild = true;
+ break;
+ } catch (AssertionError e) {
+ // Ignore
+ }
+ }
+ if (!findMatchingChild) {
+ fail("Failed to find matching child");
+ }
+ }
+ }
+
+ private static void comparePinotQuery(PinotQuery actual, PinotQuery expected) {
+ if (expected.getFilterExpression() == null) {
+ assertNull(actual.getFilterExpression());
+ return;
+ }
+ compareFilterExpression(actual.getFilterExpression(), expected.getFilterExpression());
+ }
+
+ private static void compareFilterExpression(Expression actual, Expression expected) {
+ Function actualFilterFunction = actual.getFunctionCall();
+ Function expectedFilterFunction = expected.getFunctionCall();
+ FilterKind actualFilterKind = FilterKind.valueOf(actualFilterFunction.getOperator());
+ FilterKind expectedFilterKind = FilterKind.valueOf(expectedFilterFunction.getOperator());
+ List<Expression> actualOperands = actualFilterFunction.getOperands();
+ List<Expression> expectedOperands = expectedFilterFunction.getOperands();
+ if (!actualFilterKind.isRange()) {
+ assertEquals(actualFilterKind, expectedFilterKind);
+ assertEquals(actualOperands.size(), expectedOperands.size());
+ if (actualFilterKind == FilterKind.AND || actualFilterKind == FilterKind.OR) {
+ compareFilterExpressionChildren(actualOperands, expectedOperands);
+ } else {
+ assertEquals(actualOperands.get(0), expectedOperands.get(0));
+ if (actualFilterKind == FilterKind.IN || actualFilterKind == FilterKind.NOT_IN) {
+ // Handle different order of values
+ assertEqualsNoOrder(actualOperands.toArray(), expectedOperands.toArray());
+ } else {
+ assertEquals(actualOperands, expectedOperands);
+ }
+ }
+ } else {
+ assertTrue(expectedFilterKind.isRange());
+ assertEquals(getRangeString(actualFilterKind, actualOperands),
+ getRangeString(expectedFilterKind, expectedOperands));
+ }
+ }
+
+ /**
+ * Handles different order of children under AND/OR filter.
+ */
+ private static void compareFilterExpressionChildren(List<Expression> actual, List<Expression> expected) {
+ assertEquals(actual.size(), expected.size());
+ List<Expression> unmatchedExpectedChildren = new ArrayList<>(expected);
+ for (Expression actualChild : actual) {
+ Iterator<Expression> iterator = unmatchedExpectedChildren.iterator();
+ boolean findMatchingChild = false;
+ while (iterator.hasNext()) {
+ try {
+ compareFilterExpression(actualChild, iterator.next());
+ iterator.remove();
+ findMatchingChild = true;
+ break;
+ } catch (AssertionError e) {
+ // Ignore
+ }
+ }
+ if (!findMatchingChild) {
+ fail("Failed to find matching child");
+ }
+ }
+ }
+
+ private static String getRangeString(FilterKind filterKind, List<Expression> operands) {
+ // TODO: Use the new delimiter after releasing 0.6.0
+ switch (filterKind) {
+ case GREATER_THAN:
+ return "(" + operands.get(1).getLiteral().getFieldValue().toString() + "\t\t*)";
+ case GREATER_THAN_OR_EQUAL:
+ return "[" + operands.get(1).getLiteral().getFieldValue().toString() + "\t\t*)";
+ case LESS_THAN:
+ return "(*\t\t" + operands.get(1).getLiteral().getFieldValue().toString() + ")";
+ case LESS_THAN_OR_EQUAL:
+ return "(*\t\t" + operands.get(1).getLiteral().getFieldValue().toString() + "]";
+ case BETWEEN:
+ return "[" + operands.get(1).getLiteral().getFieldValue().toString() + "\t\t" + operands.get(2).getLiteral()
+ .getFieldValue().toString() + "]";
+ case RANGE:
+ return operands.get(1).getLiteral().getStringValue();
+ default:
+ throw new IllegalStateException();
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org