You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2019/10/14 20:48:00 UTC

[incubator-pinot] branch master updated: Enhance Selection Comparator (#4702)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 7540702  Enhance Selection Comparator (#4702)
7540702 is described below

commit 7540702c7751dc3f6129b9c2e5b6d42e8f47cd0d
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Mon Oct 14 13:47:49 2019 -0700

    Enhance Selection Comparator (#4702)
    
    - Enhance the comparator for selection order-by queries to avoid doing row based switch
    - Enhance the 'SELECT *' to not compile the expression
    - Do not project order-by columns for 'LIMIT 0' (EmptySelection) case
---
 .../operator/query/SelectionOrderByOperator.java   | 87 +++++++++++-----------
 .../apache/pinot/core/plan/TransformPlanNode.java  | 79 +++++++++++---------
 .../query/selection/SelectionOperatorService.java  | 49 ++++++------
 .../query/selection/SelectionOperatorUtils.java    | 61 +++++++++++++--
 4 files changed, 166 insertions(+), 110 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java
index 422638a..133c7d9 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.core.operator.query;
 
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
 import java.util.PriorityQueue;
@@ -80,51 +81,49 @@ public class SelectionOrderByOperator extends BaseOperator<IntermediateResultsBl
   }
 
   private Comparator<Serializable[]> getComparator() {
-    return (o1, o2) -> {
-      int numOrderByExpressions = _sortSequence.size();
-      for (int i = 0; i < numOrderByExpressions; i++) {
-        // Only compare single-value columns
-        if (!_expressionMetadata[i].isSingleValue()) {
-          continue;
-        }
-
-        Serializable v1 = o1[i];
-        Serializable v2 = o2[i];
-
-        int result;
-        switch (_expressionMetadata[i].getDataType()) {
-          case INT:
-            result = ((Integer) v1).compareTo((Integer) v2);
-            break;
-          case LONG:
-            result = ((Long) v1).compareTo((Long) v2);
-            break;
-          case FLOAT:
-            result = ((Float) v1).compareTo((Float) v2);
-            break;
-          case DOUBLE:
-            result = ((Double) v1).compareTo((Double) v2);
-            break;
-          case STRING:
-            result = ((String) v1).compareTo((String) v2);
-            break;
-          case BYTES:
-            result = ByteArray.compare((byte[]) v1, (byte[]) v2);
-            break;
-          default:
-            throw new IllegalStateException();
-        }
-
-        if (result != 0) {
-          if (_sortSequence.get(i).isIsAsc()) {
-            return -result;
-          } else {
-            return result;
-          }
-        }
+    // Compare all single-value columns
+    int numOrderByExpressions = _sortSequence.size();
+    List<Integer> valueIndexList = new ArrayList<>(numOrderByExpressions);
+    for (int i = 0; i < numOrderByExpressions; i++) {
+      if (_expressionMetadata[i].isSingleValue()) {
+        valueIndexList.add(i);
       }
-      return 0;
-    };
+    }
+
+    int numValuesToCompare = valueIndexList.size();
+    int[] valueIndices = new int[numValuesToCompare];
+    Comparator[] valueComparators = new Comparator[numValuesToCompare];
+    for (int i = 0; i < numValuesToCompare; i++) {
+      int valueIndex = valueIndexList.get(i);
+      valueIndices[i] = valueIndex;
+      switch (_expressionMetadata[valueIndex].getDataType()) {
+        case INT:
+          valueComparators[i] = (Comparator<Integer>) Integer::compare;
+          break;
+        case LONG:
+          valueComparators[i] = (Comparator<Long>) Long::compare;
+          break;
+        case FLOAT:
+          valueComparators[i] = (Comparator<Float>) Float::compare;
+          break;
+        case DOUBLE:
+          valueComparators[i] = (Comparator<Double>) Double::compare;
+          break;
+        case STRING:
+          valueComparators[i] = Comparator.naturalOrder();
+          break;
+        case BYTES:
+          valueComparators[i] = (Comparator<byte[]>) ByteArray::compare;
+          break;
+        default:
+          throw new IllegalStateException();
+      }
+      if (_sortSequence.get(valueIndex).isIsAsc()) {
+        valueComparators[i] = valueComparators[i].reversed();
+      }
+    }
+
+    return new SelectionOperatorUtils.RowComparator(valueIndices, valueComparators);
   }
 
   @Override
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/TransformPlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/TransformPlanNode.java
index 6d4b7ad..a504e39 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/TransformPlanNode.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/TransformPlanNode.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pinot.core.plan;
 
-import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -32,6 +31,7 @@ import org.apache.pinot.core.indexsegment.IndexSegment;
 import org.apache.pinot.core.operator.transform.TransformOperator;
 import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils;
 import org.apache.pinot.pql.parsers.pql2.ast.FunctionCallAstNode;
+import org.apache.pinot.pql.parsers.pql2.ast.IdentifierAstNode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -60,60 +60,67 @@ public class TransformPlanNode implements PlanNode {
    */
   private void extractColumnsAndTransforms(BrokerRequest brokerRequest, IndexSegment indexSegment) {
     if (brokerRequest.isSetAggregationsInfo()) {
+      // Extract aggregation expressions
       for (AggregationInfo aggregationInfo : brokerRequest.getAggregationsInfo()) {
         if (aggregationInfo.getAggregationType().equalsIgnoreCase(AggregationFunctionType.DISTINCT.getName())) {
-          // handle DISTINCT (col1, col2 ...) as an aggregate function
-          String multiColumnExpression = AggregationFunctionUtils.getColumn(aggregationInfo);
-          String[] distinctColumnExpressions =
-              multiColumnExpression.split(FunctionCallAstNode.DISTINCT_MULTI_COLUMN_SEPARATOR);
-          for (String distinctColumnExpr : distinctColumnExpressions) {
-            TransformExpressionTree transformExpressionTree =
-                TransformExpressionTree.compileToExpressionTree(distinctColumnExpr);
-            transformExpressionTree.getColumns(_projectionColumns);
-            _expressions.add(transformExpressionTree);
+          // 'DISTINCT(col1, col2 ...)' is modeled as one single aggregation function
+          String[] distinctColumns = AggregationFunctionUtils.getColumn(aggregationInfo)
+              .split(FunctionCallAstNode.DISTINCT_MULTI_COLUMN_SEPARATOR);
+          for (String column : distinctColumns) {
+            addExpressionColumn(column);
           }
         } else if (!aggregationInfo.getAggregationType().equalsIgnoreCase(AggregationFunctionType.COUNT.getName())) {
-          // handle all other aggregate functions (except count(*))
-          String expression = AggregationFunctionUtils.getColumn(aggregationInfo);
-          TransformExpressionTree transformExpressionTree = TransformExpressionTree.compileToExpressionTree(expression);
-          transformExpressionTree.getColumns(_projectionColumns);
-          _expressions.add(transformExpressionTree);
+          addExpressionColumn(AggregationFunctionUtils.getColumn(aggregationInfo));
         }
       }
 
-      // Process all group-by expressions
+      // Extract group-by expressions
       if (brokerRequest.isSetGroupBy()) {
-        for (String expression : brokerRequest.getGroupBy().getExpressions()) {
-          TransformExpressionTree transformExpressionTree = TransformExpressionTree.compileToExpressionTree(expression);
-          transformExpressionTree.getColumns(_projectionColumns);
-          _expressions.add(transformExpressionTree);
+        for (String column : brokerRequest.getGroupBy().getExpressions()) {
+          addExpressionColumn(column);
         }
       }
     } else {
       Selection selection = brokerRequest.getSelections();
-      List<String> columns = selection.getSelectionColumns();
-      if (columns.size() == 1 && columns.get(0).equals("*")) {
-        columns = new ArrayList<>(indexSegment.getPhysicalColumnNames());
-      }
-      List<SelectionSort> sortSequence = selection.getSelectionSortSequence();
-      if (sortSequence == null) {
-        // For selection only queries, select minimum number of documents. Fetch at least 1 document per
-        // DocIdSetPlanNode's requirement.
-        // TODO: Skip the filtering phase and document fetching for LIMIT 0 case
-        _maxDocPerNextCall = Math.max(Math.min(selection.getSize(), _maxDocPerNextCall), 1);
+
+      // Extract selection expressions
+      List<String> selectionColumns = selection.getSelectionColumns();
+      if (selectionColumns.size() == 1 && selectionColumns.get(0).equals("*")) {
+        for (String column : indexSegment.getPhysicalColumnNames()) {
+          _projectionColumns.add(column);
+          _expressions.add(new TransformExpressionTree(new IdentifierAstNode(column)));
+        }
       } else {
-        for (SelectionSort selectionSort : sortSequence) {
-          columns.add(selectionSort.getColumn());
+        for (String column : selectionColumns) {
+          addExpressionColumn(column);
         }
       }
-      for (String column : columns) {
-        TransformExpressionTree expression = TransformExpressionTree.compileToExpressionTree(column);
-        expression.getColumns(_projectionColumns);
-        _expressions.add(expression);
+
+      // Extract order-by expressions and update maxDocPerNextCall
+      if (selection.getSize() > 0) {
+        List<SelectionSort> sortSequence = selection.getSelectionSortSequence();
+        if (sortSequence == null) {
+          // For selection only queries, select minimum number of documents
+          _maxDocPerNextCall = Math.min(selection.getSize(), _maxDocPerNextCall);
+        } else {
+          for (SelectionSort selectionSort : sortSequence) {
+            addExpressionColumn(selectionSort.getColumn());
+          }
+        }
+      } else {
+        // For LIMIT 0 queries, fetch at least 1 document per DocIdSetPlanNode's requirement
+        // TODO: Skip the filtering phase and document fetching for LIMIT 0 case
+        _maxDocPerNextCall = 1;
       }
     }
   }
 
+  private void addExpressionColumn(String column) {
+    TransformExpressionTree expression = TransformExpressionTree.compileToExpressionTree(column);
+    expression.getColumns(_projectionColumns);
+    _expressions.add(expression);
+  }
+
   @Override
   public TransformOperator run() {
     return new TransformOperator(_projectionPlanNode.run(), _expressions);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java
index 87bd766..b5ecb86 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.core.query.selection;
 
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.LinkedList;
 import java.util.List;
@@ -89,32 +90,32 @@ public class SelectionOperatorService {
    * @return flexible {@link Comparator} for selection rows.
    */
   private Comparator<Serializable[]> getTypeCompatibleComparator() {
-    return (o1, o2) -> {
-      int numOrderByExpressions = _sortSequence.size();
-      for (int i = 0; i < numOrderByExpressions; i++) {
-        Serializable v1 = o1[i];
-        Serializable v2 = o2[i];
-
-        int result;
-        // Only compare single-value columns
-        if (v1 instanceof Number) {
-          result = Double.compare(((Number) v1).doubleValue(), ((Number) v2).doubleValue());
-        } else if (v1 instanceof String) {
-          result = ((String) v1).compareTo((String) v2);
-        } else {
-          continue;
-        }
+    // Compare all single-value columns
+    int numOrderByExpressions = _sortSequence.size();
+    List<Integer> valueIndexList = new ArrayList<>(numOrderByExpressions);
+    for (int i = 0; i < numOrderByExpressions; i++) {
+      if (!_dataSchema.getColumnDataType(i).isArray()) {
+        valueIndexList.add(i);
+      }
+    }
 
-        if (result != 0) {
-          if (_sortSequence.get(i).isIsAsc()) {
-            return -result;
-          } else {
-            return result;
-          }
-        }
+    int numValuesToCompare = valueIndexList.size();
+    int[] valueIndices = new int[numValuesToCompare];
+    Comparator[] valueComparators = new Comparator[numValuesToCompare];
+    for (int i = 0; i < numValuesToCompare; i++) {
+      int valueIndex = valueIndexList.get(i);
+      valueIndices[i] = valueIndex;
+      if (_dataSchema.getColumnDataType(i).isNumber()) {
+        valueComparators[i] = Comparator.comparingDouble(Number::doubleValue);
+      } else {
+        valueComparators[i] = Comparator.naturalOrder();
       }
-      return 0;
-    };
+      if (_sortSequence.get(valueIndex).isIsAsc()) {
+        valueComparators[i] = valueComparators[i].reversed();
+      }
+    }
+
+    return new SelectionOperatorUtils.RowComparator(valueIndices, valueComparators);
   }
 
   /**
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
index fe9b107..1d3eb8f 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
@@ -23,6 +23,7 @@ import java.text.DecimalFormat;
 import java.text.DecimalFormatSymbols;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -42,6 +43,7 @@ import org.apache.pinot.common.utils.DataTable;
 import org.apache.pinot.core.common.datatable.DataTableBuilder;
 import org.apache.pinot.core.indexsegment.IndexSegment;
 import org.apache.pinot.core.util.ArrayCopyUtils;
+import org.apache.pinot.pql.parsers.pql2.ast.IdentifierAstNode;
 
 
 /**
@@ -106,15 +108,23 @@ public class SelectionOperatorUtils {
     }
 
     if (selectionColumns.size() == 1 && selectionColumns.get(0).equals("*")) {
+      // For 'SELECT *', sort all physical columns so that the order is deterministic
       selectionColumns = new ArrayList<>(indexSegment.getPhysicalColumnNames());
-      // Sort the columns so that the order is deterministic
       selectionColumns.sort(null);
-    }
 
-    for (String selectionColumn : selectionColumns) {
-      TransformExpressionTree selectionExpression = TransformExpressionTree.compileToExpressionTree(selectionColumn);
-      if (expressionSet.add(selectionExpression)) {
-        expressions.add(selectionExpression);
+      for (String selectionColumn : selectionColumns) {
+        TransformExpressionTree selectionExpression =
+            new TransformExpressionTree(new IdentifierAstNode(selectionColumn));
+        if (expressionSet.add(selectionExpression)) {
+          expressions.add(selectionExpression);
+        }
+      }
+    } else {
+      for (String selectionColumn : selectionColumns) {
+        TransformExpressionTree selectionExpression = TransformExpressionTree.compileToExpressionTree(selectionColumn);
+        if (expressionSet.add(selectionExpression)) {
+          expressions.add(selectionExpression);
+        }
       }
     }
 
@@ -167,6 +177,7 @@ public class SelectionOperatorUtils {
 
   /**
    * Merge two partial results for selection queries with <code>ORDER BY</code>. (Server side)
+   * TODO: Should use type compatible comparator to compare the rows
    *
    * @param mergedRows partial results 1.
    * @param rowsToMerge partial results 2.
@@ -568,4 +579,42 @@ public class SelectionOperatorUtils {
       queue.offer(value);
     }
   }
+
+  /**
+   * Helper Comparator class to compare rows.
+   * <p>Two arguments are expected to construct the comparator:
+   * <ul>
+   *   <li>
+   *     Value indices: an array of column indices in each row where the values need to be compared (only the
+   *     single-value order-by columns need to be compared)
+   *   </li>
+   *   <li>
+   *     Value comparators: an array of Comparator, where each element is the Comparator for the corresponding column in
+   *     the value indices array
+   *   </li>
+   * </ul>
+   */
+  public static class RowComparator implements Comparator<Serializable[]> {
+    private final int[] _valueIndices;
+    private final Comparator[] _valueComparators;
+
+    public RowComparator(int[] valueIndices, Comparator[] valueComparators) {
+      _valueIndices = valueIndices;
+      _valueComparators = valueComparators;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public int compare(Serializable[] o1, Serializable[] o2) {
+      int numValuesToCompare = _valueIndices.length;
+      for (int i = 0; i < numValuesToCompare; i++) {
+        int valueIndex = _valueIndices[i];
+        int result = _valueComparators[i].compare(o1[valueIndex], o2[valueIndex]);
+        if (result != 0) {
+          return result;
+        }
+      }
+      return 0;
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org