You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2019/10/12 01:02:51 UTC
[incubator-pinot] 01/01: Hotfix for Selection Comparator
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch selection_comparator_hotfix
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 4f4ad6f1174387b0b051656f0fe201dc7901285e
Author: Jackie (Xiaotian) Jiang <xa...@linkedin.com>
AuthorDate: Fri Oct 11 17:58:52 2019 -0700
Hotfix for Selection Comparator
- Enhance the comparator for selection order-by queries to avoid doing row based switch
- Enhance the 'SELECT *' to not compile the expression
- Do not project order-by columns for 'LIMIT 0' (EmptySelection) case
---
.../operator/query/SelectionOrderByOperator.java | 87 +++++++++++-----------
.../apache/pinot/core/plan/TransformPlanNode.java | 64 +++++++++-------
.../query/selection/SelectionOperatorService.java | 49 ++++++------
.../query/selection/SelectionOperatorUtils.java | 50 +++++++++++--
4 files changed, 150 insertions(+), 100 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java
index eee2ceb..f6cf5a6 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java
@@ -19,6 +19,7 @@
package org.apache.pinot.core.operator.query;
import java.io.Serializable;
+import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.PriorityQueue;
@@ -88,51 +89,49 @@ public class SelectionOrderByOperator extends BaseOperator<IntermediateResultsBl
}
private Comparator<Serializable[]> getComparator() {
- return (o1, o2) -> {
- int numOrderByExpressions = _sortSequence.size();
- for (int i = 0; i < numOrderByExpressions; i++) {
- // Only compare single-value columns
- if (!_expressionMetadata[i].isSingleValue()) {
- continue;
- }
-
- Serializable v1 = o1[i];
- Serializable v2 = o2[i];
-
- int result;
- switch (_expressionMetadata[i].getDataType()) {
- case INT:
- result = ((Integer) v1).compareTo((Integer) v2);
- break;
- case LONG:
- result = ((Long) v1).compareTo((Long) v2);
- break;
- case FLOAT:
- result = ((Float) v1).compareTo((Float) v2);
- break;
- case DOUBLE:
- result = ((Double) v1).compareTo((Double) v2);
- break;
- case STRING:
- result = ((String) v1).compareTo((String) v2);
- break;
- case BYTES:
- result = ByteArray.compare((byte[]) v1, (byte[]) v2);
- break;
- default:
- throw new IllegalStateException();
- }
-
- if (result != 0) {
- if (_sortSequence.get(i).isIsAsc()) {
- return -result;
- } else {
- return result;
- }
- }
+ // Compare all single-value columns
+ int numOrderByExpressions = _sortSequence.size();
+ List<Integer> valueIndexList = new ArrayList<>(numOrderByExpressions);
+ for (int i = 0; i < numOrderByExpressions; i++) {
+ if (_expressionMetadata[i].isSingleValue()) {
+ valueIndexList.add(i);
}
- return 0;
- };
+ }
+
+ int numValuesToCompare = valueIndexList.size();
+ int[] valueIndices = new int[numValuesToCompare];
+ Comparator[] valueComparators = new Comparator[numValuesToCompare];
+ for (int i = 0; i < numValuesToCompare; i++) {
+ int valueIndex = valueIndexList.get(i);
+ valueIndices[i] = valueIndex;
+ switch (_expressionMetadata[valueIndex].getDataType()) {
+ case INT:
+ valueComparators[i] = (Comparator<Integer>) Integer::compare;
+ break;
+ case LONG:
+ valueComparators[i] = (Comparator<Long>) Long::compare;
+ break;
+ case FLOAT:
+ valueComparators[i] = (Comparator<Float>) Float::compare;
+ break;
+ case DOUBLE:
+ valueComparators[i] = (Comparator<Double>) Double::compare;
+ break;
+ case STRING:
+ valueComparators[i] = Comparator.naturalOrder();
+ break;
+ case BYTES:
+ valueComparators[i] = (Comparator<byte[]>) ByteArray::compare;
+ break;
+ default:
+ throw new IllegalStateException();
+ }
+ if (_sortSequence.get(valueIndex).isIsAsc()) {
+ valueComparators[i] = valueComparators[i].reversed();
+ }
+ }
+
+ return new SelectionOperatorUtils.RowComparator(valueIndices, valueComparators);
}
@Override
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/TransformPlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/TransformPlanNode.java
index d8ceebb..84d9287 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/TransformPlanNode.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/TransformPlanNode.java
@@ -18,7 +18,6 @@
*/
package org.apache.pinot.core.plan;
-import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -31,6 +30,7 @@ import org.apache.pinot.common.request.transform.TransformExpressionTree;
import org.apache.pinot.core.indexsegment.IndexSegment;
import org.apache.pinot.core.operator.transform.TransformOperator;
import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils;
+import org.apache.pinot.pql.parsers.pql2.ast.IdentifierAstNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -59,48 +59,60 @@ public class TransformPlanNode implements PlanNode {
*/
private void extractColumnsAndTransforms(BrokerRequest brokerRequest, IndexSegment indexSegment) {
if (brokerRequest.isSetAggregationsInfo()) {
+ // Extract aggregation expressions
for (AggregationInfo aggregationInfo : brokerRequest.getAggregationsInfo()) {
if (!aggregationInfo.getAggregationType().equalsIgnoreCase(AggregationFunctionType.COUNT.getName())) {
- String expression = AggregationFunctionUtils.getColumn(aggregationInfo);
- TransformExpressionTree transformExpressionTree = TransformExpressionTree.compileToExpressionTree(expression);
- transformExpressionTree.getColumns(_projectionColumns);
- _expressions.add(transformExpressionTree);
+ addExpressionColumn(AggregationFunctionUtils.getColumn(aggregationInfo));
}
}
- // Process all group-by expressions
+ // Extract group-by expressions
if (brokerRequest.isSetGroupBy()) {
- for (String expression : brokerRequest.getGroupBy().getExpressions()) {
- TransformExpressionTree transformExpressionTree = TransformExpressionTree.compileToExpressionTree(expression);
- transformExpressionTree.getColumns(_projectionColumns);
- _expressions.add(transformExpressionTree);
+ for (String column : brokerRequest.getGroupBy().getExpressions()) {
+ addExpressionColumn(column);
}
}
} else {
Selection selection = brokerRequest.getSelections();
- List<String> columns = selection.getSelectionColumns();
- if (columns.size() == 1 && columns.get(0).equals("*")) {
- columns = new ArrayList<>(indexSegment.getPhysicalColumnNames());
- }
- List<SelectionSort> sortSequence = selection.getSelectionSortSequence();
- if (sortSequence == null) {
- // For selection only queries, select minimum number of documents. Fetch at least 1 document per
- // DocIdSetPlanNode's requirement.
- // TODO: Skip the filtering phase and document fetching for LIMIT 0 case
- _maxDocPerNextCall = Math.max(Math.min(selection.getSize(), _maxDocPerNextCall), 1);
+
+ // Extract selection expressions
+ List<String> selectionColumns = selection.getSelectionColumns();
+ if (selectionColumns.size() == 1 && selectionColumns.get(0).equals("*")) {
+ for (String column : indexSegment.getPhysicalColumnNames()) {
+ _projectionColumns.add(column);
+ _expressions.add(new TransformExpressionTree(new IdentifierAstNode(column)));
+ }
} else {
- for (SelectionSort selectionSort : sortSequence) {
- columns.add(selectionSort.getColumn());
+ for (String column : selectionColumns) {
+ addExpressionColumn(column);
}
}
- for (String column : columns) {
- TransformExpressionTree expression = TransformExpressionTree.compileToExpressionTree(column);
- expression.getColumns(_projectionColumns);
- _expressions.add(expression);
+
+ // Extract order-by expressions and update maxDocPerNextCall
+ if (selection.getSize() > 0) {
+ List<SelectionSort> sortSequence = selection.getSelectionSortSequence();
+ if (sortSequence == null) {
+ // For selection only queries, select minimum number of documents
+ _maxDocPerNextCall = Math.min(selection.getSize(), _maxDocPerNextCall);
+ } else {
+ for (SelectionSort selectionSort : sortSequence) {
+ addExpressionColumn(selectionSort.getColumn());
+ }
+ }
+ } else {
+ // For LIMIT 0 queries, fetch at least 1 document per DocIdSetPlanNode's requirement
+ // TODO: Skip the filtering phase and document fetching for LIMIT 0 case
+ _maxDocPerNextCall = 1;
}
}
}
+ private void addExpressionColumn(String column) {
+ TransformExpressionTree expression = TransformExpressionTree.compileToExpressionTree(column);
+ expression.getColumns(_projectionColumns);
+ _expressions.add(expression);
+ }
+
@Override
public TransformOperator run() {
return new TransformOperator(_projectionPlanNode.run(), _expressions);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java
index 87bd766..b5ecb86 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java
@@ -19,6 +19,7 @@
package org.apache.pinot.core.query.selection;
import java.io.Serializable;
+import java.util.ArrayList;
import java.util.Comparator;
import java.util.LinkedList;
import java.util.List;
@@ -89,32 +90,32 @@ public class SelectionOperatorService {
* @return flexible {@link Comparator} for selection rows.
*/
private Comparator<Serializable[]> getTypeCompatibleComparator() {
- return (o1, o2) -> {
- int numOrderByExpressions = _sortSequence.size();
- for (int i = 0; i < numOrderByExpressions; i++) {
- Serializable v1 = o1[i];
- Serializable v2 = o2[i];
-
- int result;
- // Only compare single-value columns
- if (v1 instanceof Number) {
- result = Double.compare(((Number) v1).doubleValue(), ((Number) v2).doubleValue());
- } else if (v1 instanceof String) {
- result = ((String) v1).compareTo((String) v2);
- } else {
- continue;
- }
+ // Compare all single-value columns
+ int numOrderByExpressions = _sortSequence.size();
+ List<Integer> valueIndexList = new ArrayList<>(numOrderByExpressions);
+ for (int i = 0; i < numOrderByExpressions; i++) {
+ if (!_dataSchema.getColumnDataType(i).isArray()) {
+ valueIndexList.add(i);
+ }
+ }
- if (result != 0) {
- if (_sortSequence.get(i).isIsAsc()) {
- return -result;
- } else {
- return result;
- }
- }
+ int numValuesToCompare = valueIndexList.size();
+ int[] valueIndices = new int[numValuesToCompare];
+ Comparator[] valueComparators = new Comparator[numValuesToCompare];
+ for (int i = 0; i < numValuesToCompare; i++) {
+ int valueIndex = valueIndexList.get(i);
+ valueIndices[i] = valueIndex;
+ if (_dataSchema.getColumnDataType(i).isNumber()) {
+ valueComparators[i] = Comparator.comparingDouble(Number::doubleValue);
+ } else {
+ valueComparators[i] = Comparator.naturalOrder();
}
- return 0;
- };
+ if (_sortSequence.get(valueIndex).isIsAsc()) {
+ valueComparators[i] = valueComparators[i].reversed();
+ }
+ }
+
+ return new SelectionOperatorUtils.RowComparator(valueIndices, valueComparators);
}
/**
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
index fe9b107..7919c32 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
@@ -23,6 +23,7 @@ import java.text.DecimalFormat;
import java.text.DecimalFormatSymbols;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -42,6 +43,7 @@ import org.apache.pinot.common.utils.DataTable;
import org.apache.pinot.core.common.datatable.DataTableBuilder;
import org.apache.pinot.core.indexsegment.IndexSegment;
import org.apache.pinot.core.util.ArrayCopyUtils;
+import org.apache.pinot.pql.parsers.pql2.ast.IdentifierAstNode;
/**
@@ -106,15 +108,23 @@ public class SelectionOperatorUtils {
}
if (selectionColumns.size() == 1 && selectionColumns.get(0).equals("*")) {
+ // For 'SELECT *', sort all physical columns so that the order is deterministic
selectionColumns = new ArrayList<>(indexSegment.getPhysicalColumnNames());
- // Sort the columns so that the order is deterministic
selectionColumns.sort(null);
- }
- for (String selectionColumn : selectionColumns) {
- TransformExpressionTree selectionExpression = TransformExpressionTree.compileToExpressionTree(selectionColumn);
- if (expressionSet.add(selectionExpression)) {
- expressions.add(selectionExpression);
+ for (String selectionColumn : selectionColumns) {
+ TransformExpressionTree selectionExpression =
+ new TransformExpressionTree(new IdentifierAstNode(selectionColumn));
+ if (expressionSet.add(selectionExpression)) {
+ expressions.add(selectionExpression);
+ }
+ }
+ } else {
+ for (String selectionColumn : selectionColumns) {
+ TransformExpressionTree selectionExpression = TransformExpressionTree.compileToExpressionTree(selectionColumn);
+ if (expressionSet.add(selectionExpression)) {
+ expressions.add(selectionExpression);
+ }
}
}
@@ -167,6 +177,7 @@ public class SelectionOperatorUtils {
/**
* Merge two partial results for selection queries with <code>ORDER BY</code>. (Server side)
+ * TODO: Should use type compatible comparator to compare the rows
*
* @param mergedRows partial results 1.
* @param rowsToMerge partial results 2.
@@ -568,4 +579,31 @@ public class SelectionOperatorUtils {
queue.offer(value);
}
}
+
+ /**
+ * Helper class to compare rows.
+ */
+ public static class RowComparator implements Comparator<Serializable[]> {
+ private final int[] _valueIndices;
+ private final Comparator[] _valueComparators;
+
+ public RowComparator(int[] valueIndices, Comparator[] valueComparators) {
+ _valueIndices = valueIndices;
+ _valueComparators = valueComparators;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public int compare(Serializable[] o1, Serializable[] o2) {
+ int numValuesToCompare = _valueIndices.length;
+ for (int i = 0; i < numValuesToCompare; i++) {
+ int valueIndex = _valueIndices[i];
+ int result = _valueComparators[i].compare(o1[valueIndex], o2[valueIndex]);
+ if (result != 0) {
+ return result;
+ }
+ }
+ return 0;
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org