You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2019/10/12 01:02:50 UTC

[incubator-pinot] branch selection_comparator_hotfix created (now 4f4ad6f)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a change to branch selection_comparator_hotfix
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git.


      at 4f4ad6f  Hotfix for Selection Comparator

This branch includes the following new commits:

     new 4f4ad6f  Hotfix for Selection Comparator

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 01/01: Hotfix for Selection Comparator

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch selection_comparator_hotfix
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 4f4ad6f1174387b0b051656f0fe201dc7901285e
Author: Jackie (Xiaotian) Jiang <xa...@linkedin.com>
AuthorDate: Fri Oct 11 17:58:52 2019 -0700

    Hotfix for Selection Comparator
    
    - Enhance the comparator for selection order-by queries to avoid doing row based switch
    - Enhance the 'SELECT *' to not compile the expression
    - Do not project order-by columns for 'LIMIT 0' (EmptySelection) case
---
 .../operator/query/SelectionOrderByOperator.java   | 87 +++++++++++-----------
 .../apache/pinot/core/plan/TransformPlanNode.java  | 64 +++++++++-------
 .../query/selection/SelectionOperatorService.java  | 49 ++++++------
 .../query/selection/SelectionOperatorUtils.java    | 50 +++++++++++--
 4 files changed, 150 insertions(+), 100 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java
index eee2ceb..f6cf5a6 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.core.operator.query;
 
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
 import java.util.PriorityQueue;
@@ -88,51 +89,49 @@ public class SelectionOrderByOperator extends BaseOperator<IntermediateResultsBl
   }
 
   private Comparator<Serializable[]> getComparator() {
-    return (o1, o2) -> {
-      int numOrderByExpressions = _sortSequence.size();
-      for (int i = 0; i < numOrderByExpressions; i++) {
-        // Only compare single-value columns
-        if (!_expressionMetadata[i].isSingleValue()) {
-          continue;
-        }
-
-        Serializable v1 = o1[i];
-        Serializable v2 = o2[i];
-
-        int result;
-        switch (_expressionMetadata[i].getDataType()) {
-          case INT:
-            result = ((Integer) v1).compareTo((Integer) v2);
-            break;
-          case LONG:
-            result = ((Long) v1).compareTo((Long) v2);
-            break;
-          case FLOAT:
-            result = ((Float) v1).compareTo((Float) v2);
-            break;
-          case DOUBLE:
-            result = ((Double) v1).compareTo((Double) v2);
-            break;
-          case STRING:
-            result = ((String) v1).compareTo((String) v2);
-            break;
-          case BYTES:
-            result = ByteArray.compare((byte[]) v1, (byte[]) v2);
-            break;
-          default:
-            throw new IllegalStateException();
-        }
-
-        if (result != 0) {
-          if (_sortSequence.get(i).isIsAsc()) {
-            return -result;
-          } else {
-            return result;
-          }
-        }
+    // Compare all single-value columns
+    int numOrderByExpressions = _sortSequence.size();
+    List<Integer> valueIndexList = new ArrayList<>(numOrderByExpressions);
+    for (int i = 0; i < numOrderByExpressions; i++) {
+      if (_expressionMetadata[i].isSingleValue()) {
+        valueIndexList.add(i);
       }
-      return 0;
-    };
+    }
+
+    int numValuesToCompare = valueIndexList.size();
+    int[] valueIndices = new int[numValuesToCompare];
+    Comparator[] valueComparators = new Comparator[numValuesToCompare];
+    for (int i = 0; i < numValuesToCompare; i++) {
+      int valueIndex = valueIndexList.get(i);
+      valueIndices[i] = valueIndex;
+      switch (_expressionMetadata[valueIndex].getDataType()) {
+        case INT:
+          valueComparators[i] = (Comparator<Integer>) Integer::compare;
+          break;
+        case LONG:
+          valueComparators[i] = (Comparator<Long>) Long::compare;
+          break;
+        case FLOAT:
+          valueComparators[i] = (Comparator<Float>) Float::compare;
+          break;
+        case DOUBLE:
+          valueComparators[i] = (Comparator<Double>) Double::compare;
+          break;
+        case STRING:
+          valueComparators[i] = Comparator.naturalOrder();
+          break;
+        case BYTES:
+          valueComparators[i] = (Comparator<byte[]>) ByteArray::compare;
+          break;
+        default:
+          throw new IllegalStateException();
+      }
+      if (_sortSequence.get(valueIndex).isIsAsc()) {
+        valueComparators[i] = valueComparators[i].reversed();
+      }
+    }
+
+    return new SelectionOperatorUtils.RowComparator(valueIndices, valueComparators);
   }
 
   @Override
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/TransformPlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/TransformPlanNode.java
index d8ceebb..84d9287 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/TransformPlanNode.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/TransformPlanNode.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pinot.core.plan;
 
-import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -31,6 +30,7 @@ import org.apache.pinot.common.request.transform.TransformExpressionTree;
 import org.apache.pinot.core.indexsegment.IndexSegment;
 import org.apache.pinot.core.operator.transform.TransformOperator;
 import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils;
+import org.apache.pinot.pql.parsers.pql2.ast.IdentifierAstNode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -59,48 +59,60 @@ public class TransformPlanNode implements PlanNode {
    */
   private void extractColumnsAndTransforms(BrokerRequest brokerRequest, IndexSegment indexSegment) {
     if (brokerRequest.isSetAggregationsInfo()) {
+      // Extract aggregation expressions
       for (AggregationInfo aggregationInfo : brokerRequest.getAggregationsInfo()) {
         if (!aggregationInfo.getAggregationType().equalsIgnoreCase(AggregationFunctionType.COUNT.getName())) {
-          String expression = AggregationFunctionUtils.getColumn(aggregationInfo);
-          TransformExpressionTree transformExpressionTree = TransformExpressionTree.compileToExpressionTree(expression);
-          transformExpressionTree.getColumns(_projectionColumns);
-          _expressions.add(transformExpressionTree);
+          addExpressionColumn(AggregationFunctionUtils.getColumn(aggregationInfo));
         }
       }
 
-      // Process all group-by expressions
+      // Extract group-by expressions
       if (brokerRequest.isSetGroupBy()) {
-        for (String expression : brokerRequest.getGroupBy().getExpressions()) {
-          TransformExpressionTree transformExpressionTree = TransformExpressionTree.compileToExpressionTree(expression);
-          transformExpressionTree.getColumns(_projectionColumns);
-          _expressions.add(transformExpressionTree);
+        for (String column : brokerRequest.getGroupBy().getExpressions()) {
+          addExpressionColumn(column);
         }
       }
     } else {
       Selection selection = brokerRequest.getSelections();
-      List<String> columns = selection.getSelectionColumns();
-      if (columns.size() == 1 && columns.get(0).equals("*")) {
-        columns = new ArrayList<>(indexSegment.getPhysicalColumnNames());
-      }
-      List<SelectionSort> sortSequence = selection.getSelectionSortSequence();
-      if (sortSequence == null) {
-        // For selection only queries, select minimum number of documents. Fetch at least 1 document per
-        // DocIdSetPlanNode's requirement.
-        // TODO: Skip the filtering phase and document fetching for LIMIT 0 case
-        _maxDocPerNextCall = Math.max(Math.min(selection.getSize(), _maxDocPerNextCall), 1);
+
+      // Extract selection expressions
+      List<String> selectionColumns = selection.getSelectionColumns();
+      if (selectionColumns.size() == 1 && selectionColumns.get(0).equals("*")) {
+        for (String column : indexSegment.getPhysicalColumnNames()) {
+          _projectionColumns.add(column);
+          _expressions.add(new TransformExpressionTree(new IdentifierAstNode(column)));
+        }
       } else {
-        for (SelectionSort selectionSort : sortSequence) {
-          columns.add(selectionSort.getColumn());
+        for (String column : selectionColumns) {
+          addExpressionColumn(column);
         }
       }
-      for (String column : columns) {
-        TransformExpressionTree expression = TransformExpressionTree.compileToExpressionTree(column);
-        expression.getColumns(_projectionColumns);
-        _expressions.add(expression);
+
+      // Extract order-by expressions and update maxDocPerNextCall
+      if (selection.getSize() > 0) {
+        List<SelectionSort> sortSequence = selection.getSelectionSortSequence();
+        if (sortSequence == null) {
+          // For selection only queries, select minimum number of documents
+          _maxDocPerNextCall = Math.min(selection.getSize(), _maxDocPerNextCall);
+        } else {
+          for (SelectionSort selectionSort : sortSequence) {
+            addExpressionColumn(selectionSort.getColumn());
+          }
+        }
+      } else {
+        // For LIMIT 0 queries, fetch at least 1 document per DocIdSetPlanNode's requirement
+        // TODO: Skip the filtering phase and document fetching for LIMIT 0 case
+        _maxDocPerNextCall = 1;
       }
     }
   }
 
+  private void addExpressionColumn(String column) {
+    TransformExpressionTree expression = TransformExpressionTree.compileToExpressionTree(column);
+    expression.getColumns(_projectionColumns);
+    _expressions.add(expression);
+  }
+
   @Override
   public TransformOperator run() {
     return new TransformOperator(_projectionPlanNode.run(), _expressions);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java
index 87bd766..b5ecb86 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.core.query.selection;
 
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.LinkedList;
 import java.util.List;
@@ -89,32 +90,32 @@ public class SelectionOperatorService {
    * @return flexible {@link Comparator} for selection rows.
    */
   private Comparator<Serializable[]> getTypeCompatibleComparator() {
-    return (o1, o2) -> {
-      int numOrderByExpressions = _sortSequence.size();
-      for (int i = 0; i < numOrderByExpressions; i++) {
-        Serializable v1 = o1[i];
-        Serializable v2 = o2[i];
-
-        int result;
-        // Only compare single-value columns
-        if (v1 instanceof Number) {
-          result = Double.compare(((Number) v1).doubleValue(), ((Number) v2).doubleValue());
-        } else if (v1 instanceof String) {
-          result = ((String) v1).compareTo((String) v2);
-        } else {
-          continue;
-        }
+    // Compare all single-value columns
+    int numOrderByExpressions = _sortSequence.size();
+    List<Integer> valueIndexList = new ArrayList<>(numOrderByExpressions);
+    for (int i = 0; i < numOrderByExpressions; i++) {
+      if (!_dataSchema.getColumnDataType(i).isArray()) {
+        valueIndexList.add(i);
+      }
+    }
 
-        if (result != 0) {
-          if (_sortSequence.get(i).isIsAsc()) {
-            return -result;
-          } else {
-            return result;
-          }
-        }
+    int numValuesToCompare = valueIndexList.size();
+    int[] valueIndices = new int[numValuesToCompare];
+    Comparator[] valueComparators = new Comparator[numValuesToCompare];
+    for (int i = 0; i < numValuesToCompare; i++) {
+      int valueIndex = valueIndexList.get(i);
+      valueIndices[i] = valueIndex;
+      if (_dataSchema.getColumnDataType(i).isNumber()) {
+        valueComparators[i] = Comparator.comparingDouble(Number::doubleValue);
+      } else {
+        valueComparators[i] = Comparator.naturalOrder();
       }
-      return 0;
-    };
+      if (_sortSequence.get(valueIndex).isIsAsc()) {
+        valueComparators[i] = valueComparators[i].reversed();
+      }
+    }
+
+    return new SelectionOperatorUtils.RowComparator(valueIndices, valueComparators);
   }
 
   /**
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
index fe9b107..7919c32 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
@@ -23,6 +23,7 @@ import java.text.DecimalFormat;
 import java.text.DecimalFormatSymbols;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -42,6 +43,7 @@ import org.apache.pinot.common.utils.DataTable;
 import org.apache.pinot.core.common.datatable.DataTableBuilder;
 import org.apache.pinot.core.indexsegment.IndexSegment;
 import org.apache.pinot.core.util.ArrayCopyUtils;
+import org.apache.pinot.pql.parsers.pql2.ast.IdentifierAstNode;
 
 
 /**
@@ -106,15 +108,23 @@ public class SelectionOperatorUtils {
     }
 
     if (selectionColumns.size() == 1 && selectionColumns.get(0).equals("*")) {
+      // For 'SELECT *', sort all physical columns so that the order is deterministic
       selectionColumns = new ArrayList<>(indexSegment.getPhysicalColumnNames());
-      // Sort the columns so that the order is deterministic
       selectionColumns.sort(null);
-    }
 
-    for (String selectionColumn : selectionColumns) {
-      TransformExpressionTree selectionExpression = TransformExpressionTree.compileToExpressionTree(selectionColumn);
-      if (expressionSet.add(selectionExpression)) {
-        expressions.add(selectionExpression);
+      for (String selectionColumn : selectionColumns) {
+        TransformExpressionTree selectionExpression =
+            new TransformExpressionTree(new IdentifierAstNode(selectionColumn));
+        if (expressionSet.add(selectionExpression)) {
+          expressions.add(selectionExpression);
+        }
+      }
+    } else {
+      for (String selectionColumn : selectionColumns) {
+        TransformExpressionTree selectionExpression = TransformExpressionTree.compileToExpressionTree(selectionColumn);
+        if (expressionSet.add(selectionExpression)) {
+          expressions.add(selectionExpression);
+        }
       }
     }
 
@@ -167,6 +177,7 @@ public class SelectionOperatorUtils {
 
   /**
    * Merge two partial results for selection queries with <code>ORDER BY</code>. (Server side)
+   * TODO: Should use type compatible comparator to compare the rows
    *
    * @param mergedRows partial results 1.
    * @param rowsToMerge partial results 2.
@@ -568,4 +579,31 @@ public class SelectionOperatorUtils {
       queue.offer(value);
     }
   }
+
+  /**
+   * Helper class to compare rows.
+   */
+  public static class RowComparator implements Comparator<Serializable[]> {
+    private final int[] _valueIndices;
+    private final Comparator[] _valueComparators;
+
+    public RowComparator(int[] valueIndices, Comparator[] valueComparators) {
+      _valueIndices = valueIndices;
+      _valueComparators = valueComparators;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public int compare(Serializable[] o1, Serializable[] o2) {
+      int numValuesToCompare = _valueIndices.length;
+      for (int i = 0; i < numValuesToCompare; i++) {
+        int valueIndex = _valueIndices[i];
+        int result = _valueComparators[i].compare(o1[valueIndex], o2[valueIndex]);
+        if (result != 0) {
+          return result;
+        }
+      }
+      return 0;
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org