You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ma...@apache.org on 2020/05/12 22:38:10 UTC

[incubator-pinot] branch master updated: Address comments from #5316. (#5377)

This is an automated email from the ASF dual-hosted git repository.

mayanks pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 1e325ea  Address comments from #5316. (#5377)
1e325ea is described below

commit 1e325eabb65054fc586fda7e15f9839f487a6dcb
Author: Mayank Shrivastava <ms...@linkedin.com>
AuthorDate: Tue May 12 15:38:03 2020 -0700

    Address comments from #5316. (#5377)
    
    1. Add predicate evaluators for all types instead of LONG/DOUBLE/STRING only.
    2. Update DistinctCountThetaSketchTest to test more data types.
    3. Add hashCode() and equals() for Predicate class.
---
 .../org/apache/pinot/core/common/Predicate.java    | 18 ++++++
 ...istinctCountThetaSketchAggregationFunction.java | 70 +++++++++++++++++++---
 .../queries/DistinctCountThetaSketchTest.java      | 35 +++++------
 3 files changed, 99 insertions(+), 24 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/Predicate.java b/pinot-core/src/main/java/org/apache/pinot/core/common/Predicate.java
index 0ad1a4d..d2e2bd3 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/Predicate.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/Predicate.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.core.common;
 
 import java.util.List;
+import java.util.Objects;
 import org.apache.pinot.common.request.FilterOperator;
 import org.apache.pinot.common.utils.request.FilterQueryTree;
 import org.apache.pinot.core.common.predicate.EqPredicate;
@@ -114,4 +115,21 @@ public abstract class Predicate {
     }
     return predicate;
   }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (!(o instanceof Predicate)) {
+      return false;
+    }
+    Predicate predicate = (Predicate) o;
+    return Objects.equals(lhs, predicate.lhs) && Objects.equals(rhs, predicate.rhs) && type == predicate.type;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(lhs, rhs, type);
+  }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountThetaSketchAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountThetaSketchAggregationFunction.java
index b7463a0..be8dc2e 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountThetaSketchAggregationFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountThetaSketchAggregationFunction.java
@@ -146,6 +146,14 @@ public class DistinctCountThetaSketchAggregationFunction implements AggregationF
       Union union = result.get(predicate);
       switch (valueType) {
         case INT:
+          int[] intValues = blockValSet.getIntValuesSV();
+          for (int i = 0; i < length; i++) {
+            if (predicateEvaluator.applySV(intValues[i])) {
+              union.update(sketches[i]);
+            }
+          }
+          break;
+
         case LONG:
           long[] longValues = blockValSet.getLongValuesSV();
           for (int i = 0; i < length; i++) {
@@ -156,6 +164,14 @@ public class DistinctCountThetaSketchAggregationFunction implements AggregationF
           break;
 
         case FLOAT:
+          float[] floatValues = blockValSet.getFloatValuesSV();
+          for (int i = 0; i < length; i++) {
+            if (predicateEvaluator.applySV(floatValues[i])) {
+              union.update(sketches[i]);
+            }
+          }
+          break;
+
         case DOUBLE:
           double[] doubleValues = blockValSet.getDoubleValuesSV();
           for (int i = 0; i < length; i++) {
@@ -195,6 +211,16 @@ public class DistinctCountThetaSketchAggregationFunction implements AggregationF
       Map<String, Union> result;
       switch (valueType) {
         case INT:
+          int[] intValues = blockValSet.getIntValuesSV();
+          for (int i = 0; i < length; i++) {
+            if (predicateEvaluator.applySV(intValues[i])) {
+              result = getDefaultResult(groupByResultHolder, groupKeyArray[i], _predicateStrings);
+              Union union = result.get(predicate);
+              union.update(sketches[i]);
+            }
+          }
+          break;
+
         case LONG:
           long[] longValues = blockValSet.getLongValuesSV();
           for (int i = 0; i < length; i++) {
@@ -207,6 +233,16 @@ public class DistinctCountThetaSketchAggregationFunction implements AggregationF
           break;
 
         case FLOAT:
+          float[] floatValues = blockValSet.getFloatValuesSV();
+          for (int i = 0; i < length; i++) {
+            if (predicateEvaluator.applySV(floatValues[i])) {
+              result = getDefaultResult(groupByResultHolder, groupKeyArray[i], _predicateStrings);
+              Union union = result.get(predicate);
+              union.update(sketches[i]);
+            }
+          }
+          break;
+
         case DOUBLE:
           double[] doubleValues = blockValSet.getDoubleValuesSV();
           for (int i = 0; i < length; i++) {
@@ -251,6 +287,19 @@ public class DistinctCountThetaSketchAggregationFunction implements AggregationF
       Map<String, Union> result;
       switch (valueType) {
         case INT:
+          int[] intValues = blockValSet.getIntValuesSV();
+
+          for (int i = 0; i < length; i++) {
+            if (predicateEvaluator.applySV(intValues[i])) {
+
+              for (int groupKey : groupKeysArray[i]) {
+                result = getDefaultResult(groupByResultHolder, groupKey, _predicateStrings);
+                Union union = result.get(predicate);
+                union.update(sketches[i]);
+              }
+            }
+          }
+
         case LONG:
           long[] longValues = blockValSet.getLongValuesSV();
 
@@ -267,6 +316,20 @@ public class DistinctCountThetaSketchAggregationFunction implements AggregationF
           break;
 
         case FLOAT:
+          float[] floatValues = blockValSet.getFloatValuesSV();
+
+          for (int i = 0; i < length; i++) {
+            if (predicateEvaluator.applySV(floatValues[i])) {
+
+              for (int groupKey : groupKeysArray[i]) {
+                result = getDefaultResult(groupByResultHolder, groupKey, _predicateStrings);
+                Union union = result.get(predicate);
+                union.update(sketches[i]);
+              }
+            }
+          }
+          break;
+
         case DOUBLE:
           double[] doubleValues = blockValSet.getDoubleValuesSV();
 
@@ -612,13 +675,6 @@ public class DistinctCountThetaSketchAggregationFunction implements AggregationF
         return _predicateEvaluator;
       }
 
-      // Theta-sketch does not work on INT and FLOAT.
-      if (dataType == FieldSpec.DataType.INT) {
-        dataType = FieldSpec.DataType.LONG;
-      } else if (dataType == FieldSpec.DataType.FLOAT) {
-        dataType = FieldSpec.DataType.DOUBLE;
-      }
-
       _predicateEvaluator = PredicateEvaluatorProvider.getPredicateEvaluator(_predicate, null, dataType);
       return _predicateEvaluator;
     }
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/DistinctCountThetaSketchTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/DistinctCountThetaSketchTest.java
index 20ca264..2ea387c 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/DistinctCountThetaSketchTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/DistinctCountThetaSketchTest.java
@@ -74,13 +74,6 @@ public class DistinctCountThetaSketchTest extends BaseQueriesTest {
 
   private static final String THETA_SKETCH_COLUMN = "colTS";
   private static final String DISTINCT_COLUMN = "distinctColumn";
-  private static final List<String> ALL_COLUMNS;
-
-  static {
-    ALL_COLUMNS = Arrays.asList("colA", "colB", "colC", DISTINCT_COLUMN, THETA_SKETCH_COLUMN);
-  }
-
-  private static final List<String> DIMENSIONS = ALL_COLUMNS.subList(0, 3);
 
   private static Random RANDOM = new Random(RANDOM_SEED);
   protected static final int MAX_CARDINALITY = 5; // 3 columns will lead to at most 125 groups
@@ -126,23 +119,23 @@ public class DistinctCountThetaSketchTest extends BaseQueriesTest {
   }
 
   private void testThetaSketches(boolean groupBy, boolean sql) {
-    List<String> predicates = Collections.singletonList("colA = 'colA_1'");
+    List<String> predicates = Collections.singletonList("colA = 1");
     String whereClause = Strings.join(predicates, " or ");
     testQuery(whereClause, null, predicates, whereClause, groupBy, sql);
 
     // Test Intersection (AND)
-    predicates = Arrays.asList("colA = 'colA_1'", "colB >= 'colB_1'", "colC <> 'colC_1'");
+    predicates = Arrays.asList("colA = 1", "colB >= 2.0", "colC <> 'colC_1'");
     whereClause = Strings.join(predicates, " and ");
     testQuery(whereClause, "nominalEntries=1001", predicates, whereClause, groupBy, sql);
 
     // Test Union (OR)
-    predicates = Arrays.asList("colA = 'colA_1'", "colB = 'colB_2'");
+    predicates = Arrays.asList("colA = 1", "colB = 1.9");
     whereClause = Strings.join(predicates, " or ");
     testQuery(whereClause, " nominalEntries =1001 ", predicates, whereClause, groupBy, sql);
 
     // Test complex predicates
     predicates =
-        Arrays.asList("colA in ('colA_1', 'colA_2')", "colB not in ('colB_1')", "colC between 'colC_1' and 'colC_5'");
+        Arrays.asList("colA in (1, 2)", "colB not in (3.0)", "colC between 'colC_1' and 'colC_5'");
     whereClause =
         predicates.get(0) + " and " + predicates.get(1) + " or " + predicates.get(0) + " and " + predicates.get(2);
     testQuery(whereClause, "nominalEntries =  1001", predicates, whereClause, groupBy, sql);
@@ -306,12 +299,17 @@ public class DistinctCountThetaSketchTest extends BaseQueriesTest {
       stringBuilder.setLength(0);
       HashMap<String, Object> valueMap = new HashMap<>();
 
-      for (String dimension : DIMENSIONS) {
-        String value = dimension + "_" + (i % (1 + RANDOM.nextInt(MAX_CARDINALITY)));
-        valueMap.put(dimension, value);
+      int value = (i % (1 + RANDOM.nextInt(MAX_CARDINALITY)));
+      valueMap.put("colA", value);
+      stringBuilder.append(value);
 
-        stringBuilder.append(value);
-      }
+      value = (i % (1 + RANDOM.nextInt(MAX_CARDINALITY)));
+      valueMap.put("colB", (i % (1 + RANDOM.nextInt(MAX_CARDINALITY))));
+      stringBuilder.append(value);
+
+      String sValue = "colC" + "_" + (i % (1 + RANDOM.nextInt(MAX_CARDINALITY)));
+      valueMap.put("colC", sValue);
+      stringBuilder.append(sValue);
 
       String distinctValue = stringBuilder.toString();
       valueMap.put(DISTINCT_COLUMN, distinctValue);
@@ -344,7 +342,10 @@ public class DistinctCountThetaSketchTest extends BaseQueriesTest {
 
   private Schema buildSchema() {
     Schema schema = new Schema();
-    DIMENSIONS.forEach(column -> schema.addField(new DimensionFieldSpec(column, FieldSpec.DataType.STRING, true)));
+
+    schema.addField(new DimensionFieldSpec("colA", FieldSpec.DataType.INT, true));
+    schema.addField(new DimensionFieldSpec("colB", FieldSpec.DataType.DOUBLE, true));
+    schema.addField(new DimensionFieldSpec("colC", FieldSpec.DataType.STRING, true));
 
     schema.addField(new DimensionFieldSpec(DISTINCT_COLUMN, FieldSpec.DataType.STRING, true));
     schema.addField(new MetricFieldSpec(THETA_SKETCH_COLUMN, FieldSpec.DataType.BYTES));


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org