You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by si...@apache.org on 2021/05/19 00:19:58 UTC

[incubator-pinot] branch master updated: Fix rules to include DateTime column (#6937) (#6938)

This is an automated email from the ASF dual-hosted git repository.

siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 98891a3  Fix rules to include DateTime column (#6937) (#6938)
98891a3 is described below

commit 98891a3b7ee4a9578caf285a914ca5f9014bf083
Author: Sharayu <ga...@gmail.com>
AuthorDate: Tue May 18 17:19:43 2021 -0700

    Fix rules to include DateTime column (#6937) (#6938)
---
 .../controller/recommender/io/InputManager.java    | 22 +++-----
 .../recommender/rules/impl/BloomFilterRule.java    | 12 ++--
 .../recommender/rules/impl/FlagQueryRule.java      | 13 +++--
 .../NoDictionaryOnHeapDictionaryJointRule.java     |  2 +-
 .../rules/impl/PinotTablePartitionRule.java        |  2 +-
 .../rules/io/params/RecommenderConstants.java      |  8 +--
 .../utils/QueryInvertedSortedIndexRecommender.java |  6 +-
 .../controller/recommender/TestConfigEngine.java   | 23 ++++++--
 .../BloomFilterInputWithDateTimeColumn.json        | 64 ++++++++++++++++++++++
 .../resources/recommenderInput/FlagQueryInput.json | 15 ++++-
 10 files changed, 124 insertions(+), 43 deletions(-)

diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/io/InputManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/io/InputManager.java
index 2635744..152e96d 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/io/InputManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/io/InputManager.java
@@ -229,9 +229,8 @@ public class InputManager {
     _metricNames = new HashSet<>(_schema.getMetricNames());
     _dateTimeNames = new HashSet<>(_schema.getDateTimeNames());
 
-    String primaryTimeCol;
-    if ((primaryTimeCol = getPrimaryTimeCol()) != null) {
-      _dateTimeNames.add(primaryTimeCol);
+    if (_schema.getTimeFieldSpec() != null) {
+      _dateTimeNames.add(_schema.getTimeFieldSpec().getName());
     }
 
     _intToColNameMap = new String[_dimNames.size() + _metricNames.size() + _dateTimeNames.size()];
@@ -444,15 +443,10 @@ public class InputManager {
     return _colNameToIntMap.size();
   }
 
-  //TODO: Currently Pinot is using only ONE time column specified by TimeFieldSpec
-  // Change the implementation after the new schema with multiple _dateTimeNames is in use
-  // Return the time column used in server level filtering
-  public String getPrimaryTimeCol() {
-    if (_schema.getTimeFieldSpec() != null) {
-      return _schema.getTimeFieldSpec().getName();
-    } else {
-      return null;
-    }
+  // Provides set of time columns.
+  // This could be at most 1 from TimeFieldSpec and 1 or more from DatetimeFieldSpec
+  public Set<String> getTimeColumns() {
+    return _dateTimeNames;
   }
 
   public Set<String> getColNamesNoDictionary() {
@@ -580,8 +574,8 @@ public class InputManager {
     return _dimNames.contains(colName);
   }
 
-  public boolean isPrimaryDateTime(String colName) {
-    return colName != null && colName.equalsIgnoreCase(getPrimaryTimeCol());
+  public boolean isTimeOrDateTimeColumn(String colName) {
+    return colName != null && getTimeColumns().stream().anyMatch(d -> colName.equalsIgnoreCase(d));
   }
 
   public void estimateSizePerRecord()
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/impl/BloomFilterRule.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/impl/BloomFilterRule.java
index 8d3977e..18af61d 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/impl/BloomFilterRule.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/impl/BloomFilterRule.java
@@ -48,8 +48,8 @@ public class  BloomFilterRule extends AbstractRule {
 
   @Override
   public void run() {
-    int numDims = _input.getNumDims();
-    double[] weights = new double[numDims];
+    int numCols = _input.getNumCols();
+    double[] weights = new double[numCols];
     AtomicDouble totalWeight = new AtomicDouble(0);
 
     // For each query, find out the dimensions used in 'EQ'
@@ -65,7 +65,7 @@ public class  BloomFilterRule extends AbstractRule {
     });
     LOGGER.debug("Weight: {}, Total {}", weights, totalWeight);
 
-    for (int i = 0; i < numDims; i++) {
+    for (int i = 0; i < numCols; i++) {
       String dimName = _input.intToColName(i);
       if (((weights[i] / totalWeight.get()) > _params.THRESHOLD_MIN_PERCENT_EQ_BLOOMFILTER)
           //The partitioned dimension should be frequently > P used
@@ -108,10 +108,6 @@ public class  BloomFilterRule extends AbstractRule {
       String colName = lhs.toString();
       if (lhs.getType() == ExpressionContext.Type.FUNCTION) {
         LOGGER.trace("Skipping the function {}", colName);
-      } else if (_input.isPrimaryDateTime(colName)) {
-        LOGGER.trace("Skipping the DateTime column {}", colName);
-      } else if (!_input.isDim(colName)) {
-        LOGGER.error("Error: Column {} should not appear in filter", colName);
       } else if (filterContext.getPredicate().getType() == Predicate.Type.EQ) {
         ret.add(_input.colNameToInt(colName));
       }
@@ -120,6 +116,6 @@ public class  BloomFilterRule extends AbstractRule {
   }
 
   private FixedLenBitset MUTABLE_EMPTY_SET() {
-    return new FixedLenBitset(_input.getNumDims());
+    return new FixedLenBitset(_input.getNumCols());
   }
 }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/impl/FlagQueryRule.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/impl/FlagQueryRule.java
index 5e4ea3a..3839093 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/impl/FlagQueryRule.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/impl/FlagQueryRule.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.pinot.controller.recommender.rules.impl;
 
 import java.util.HashSet;
@@ -36,7 +37,7 @@ import static org.apache.pinot.controller.recommender.rules.io.params.Recommende
  * Flag the queries that are not valid:
  *    Flag the queries with LIMIT value higher than a threshold.
  *    Flag the queries that are not using any filters.
- *    Flag the queries that are not using any filters.
+ *    Flag the queries that are not using any time filters.
  */
 public class FlagQueryRule extends AbstractRule {
   private final Logger LOGGER = LoggerFactory.getLogger(FlagQueryRule.class);
@@ -61,11 +62,15 @@ public class FlagQueryRule extends AbstractRule {
         //Flag the queries that are not using any filters.
         _output.getFlaggedQueries().add(query, WARNING_NO_FILTERING);
       }
-      else { //Flag the queries that are not using any filters.
+      else { //Flag the queries that are not using any time filters.
         Set<String> usedCols = new HashSet<>();
         queryContext.getFilter().getColumns(usedCols);
-        if (!usedCols.contains(_input.getPrimaryTimeCol())){
-          _output.getFlaggedQueries().add(query, WARNING_NO_TIME_COL);
+        Set<String> timeCols = _input.getTimeColumns();
+        if(!timeCols.isEmpty()) {
+          usedCols.retainAll(timeCols);
+          if (usedCols.isEmpty()) {
+            _output.getFlaggedQueries().add(query, WARNING_NO_TIME_COL);
+          }
         }
       }
     }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/impl/NoDictionaryOnHeapDictionaryJointRule.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/impl/NoDictionaryOnHeapDictionaryJointRule.java
index 221434d..9839d11 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/impl/NoDictionaryOnHeapDictionaryJointRule.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/impl/NoDictionaryOnHeapDictionaryJointRule.java
@@ -222,7 +222,7 @@ public class NoDictionaryOnHeapDictionaryJointRule extends AbstractRule {
       Predicate predicate = filterContext.getPredicate();
       ExpressionContext lhs = predicate.getLhs();
       String colName = lhs.toString();
-      if (lhs.getType() == ExpressionContext.Type.FUNCTION || _input.isPrimaryDateTime(colName)) {
+      if (lhs.getType() == ExpressionContext.Type.FUNCTION || _input.isTimeOrDateTimeColumn(colName)) {
         LOGGER.trace("Skipping this column {}", colName);
       } else if (!_input.isDim(colName)) {
         LOGGER.error("Error: Column {} should not appear in filter", colName);
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/impl/PinotTablePartitionRule.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/impl/PinotTablePartitionRule.java
index 433d5cd..7b7ddb3 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/impl/PinotTablePartitionRule.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/impl/PinotTablePartitionRule.java
@@ -217,7 +217,7 @@ public class PinotTablePartitionRule extends AbstractRule {
       String colName = lhs.toString();
       if (lhs.getType() == ExpressionContext.Type.FUNCTION) {
         LOGGER.trace("Skipping the function {}", colName);
-      } else if (_input.isPrimaryDateTime(colName)) {
+      } else if (_input.isTimeOrDateTimeColumn(colName)) {
         LOGGER.trace("Skipping the DateTime column {}", colName);
         return null;
       } else if (!_input.isDim(colName)) {
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/io/params/RecommenderConstants.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/io/params/RecommenderConstants.java
index 096c38f..ebb6070 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/io/params/RecommenderConstants.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/io/params/RecommenderConstants.java
@@ -77,10 +77,10 @@ public class RecommenderConstants {
 
   public static class FlagQueryRuleParams{
     public static final long DEFAULT_THRESHOLD_MAX_LIMIT_SIZE = 100000;
-    public static final String WARNING_NO_FILTERING = "Warning: No filtering in ths query";
-    public static final String WARNING_NO_TIME_COL = "Warning: No time column used in ths query";
-    public static final String WARNING_TOO_LONG_LIMIT = "Warning: The size of LIMIT is longer than " + DEFAULT_THRESHOLD_MAX_LIMIT_SIZE;
-    public static final String ERROR_INVALID_QUERY = "Error: query not able to parse, skipped";
+    public static final String WARNING_NO_FILTERING = "Warning: Query seems to scan the entire table. No filters are used in the query. Please verify if filters are not needed.";
+    public static final String WARNING_NO_TIME_COL = "Warning: No time column used in filter in the query. Table with time columns typically use it in filters to make the queries more selective.";
+    public static final String WARNING_TOO_LONG_LIMIT = "Warning: Please verify if you need to pull out huge number of records for this query. Consider using smaller limit than " + DEFAULT_THRESHOLD_MAX_LIMIT_SIZE;
+    public static final String ERROR_INVALID_QUERY = "Error: Invalid query syntax. Please fix the query";
   }
 
   public static class RealtimeProvisioningRule {
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/utils/QueryInvertedSortedIndexRecommender.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/utils/QueryInvertedSortedIndexRecommender.java
index 8fa8c4c..80ecfc5 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/utils/QueryInvertedSortedIndexRecommender.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/utils/QueryInvertedSortedIndexRecommender.java
@@ -531,10 +531,6 @@ public class QueryInvertedSortedIndexRecommender {
           .setRecommendationPriorityEnum(RecommendationPriorityEnum.NON_CANDIDATE_SCAN) // won't recommend index
           .setnESI(nESI).setPercentSelected(_params.PERCENT_SELECT_FOR_FUNCTION).setnESIWithIdx(nESI).build();
     }
-    // Skip time columns
-    else if (_inputManager.isPrimaryDateTime(colName)) {
-      return null;
-    }
     // Not a valid dimension name
     else if (!_inputManager.isDim(colName)) {
       LOGGER.error("Error: Column {} should not appear in filter", colName);
@@ -759,4 +755,4 @@ public class QueryInvertedSortedIndexRecommender {
   private FixedLenBitset MUTABLE_EMPTY_SET() {
     return new FixedLenBitset(_numDimsIndexApplicable);
   }
-}
\ No newline at end of file
+}
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/recommender/TestConfigEngine.java b/pinot-controller/src/test/java/org/apache/pinot/controller/recommender/TestConfigEngine.java
index a050ec7..4b0aaad 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/recommender/TestConfigEngine.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/recommender/TestConfigEngine.java
@@ -88,7 +88,7 @@ public class TestConfigEngine {
     assertEquals(_input.getAverageDataLen("g"), 100);
     assertTrue(_input.isSingleValueColumn("j"));
     assertFalse(_input.isSingleValueColumn("i"));
-    assertEquals(_input.getPrimaryTimeCol(),"t");
+    assertTrue(_input.getTimeColumns().contains("t"));
   }
 
   @Test
@@ -176,7 +176,6 @@ public class TestConfigEngine {
     loadInput("recommenderInput/InvalidInput2.json");
   }
 
-
   @Test
   void testFlagQueryRule()
       throws InvalidInputException, IOException {
@@ -185,8 +184,13 @@ public class TestConfigEngine {
     AbstractRule abstractRule =
         RulesToExecute.RuleFactory.getRule(RulesToExecute.Rule.FlagQueryRule, _input, output);
     abstractRule.run();
-    assertEquals(output.getFlaggedQueries().getFlaggedQueries().toString(),
-        "{select g from tableName LIMIT 1000000000=Warning: The size of LIMIT is longer than 100000 | Warning: No filtering in ths query, not a valid query=Error: query not able to parse, skipped, select f from tableName=Warning: No filtering in ths query, select f from tableName where a =3=Warning: No time column used in ths query}");
+
+    assertFalse(output.getFlaggedQueries().getFlaggedQueries().containsKey("select f from tableName where x = 2"));
+    assertFalse(output.getFlaggedQueries().getFlaggedQueries().containsKey("select f from tableName where t = 3"));
+    assertTrue(output.getFlaggedQueries().getFlaggedQueries().containsKey("select * from tableName"));
+    assertTrue(output.getFlaggedQueries().getFlaggedQueries().containsKey("select f from tableName"));
+    assertTrue(output.getFlaggedQueries().getFlaggedQueries().containsKey("select f from tableName where a =3"));
+    assertTrue(output.getFlaggedQueries().getFlaggedQueries().containsKey("select g from tableName LIMIT 1000000000"));
   }
 
   @Test
@@ -212,6 +216,17 @@ public class TestConfigEngine {
   }
 
   @Test
+  void testBloomFilterRuleWithTimeSpecColumn()
+      throws InvalidInputException, IOException {
+    loadInput("recommenderInput/BloomFilterInputWithDateTimeColumn.json");
+    ConfigManager output = new ConfigManager();
+    AbstractRule abstractRule =
+        RulesToExecute.RuleFactory.getRule(RulesToExecute.Rule.BloomFilterRule, _input, output);
+    abstractRule.run();
+    assertEquals(output.getIndexConfig().getBloomFilterColumns().toString(), "[b, t, x]");
+  }
+
+  @Test
   void testNoDictionaryOnHeapDictionaryJointRule()
       throws InvalidInputException, IOException {
     loadInput("recommenderInput/NoDictionaryOnHeapDictionaryJointRuleInput.json");
diff --git a/pinot-controller/src/test/resources/recommenderInput/BloomFilterInputWithDateTimeColumn.json b/pinot-controller/src/test/resources/recommenderInput/BloomFilterInputWithDateTimeColumn.json
new file mode 100644
index 0000000..2259fcc
--- /dev/null
+++ b/pinot-controller/src/test/resources/recommenderInput/BloomFilterInputWithDateTimeColumn.json
@@ -0,0 +1,64 @@
+{
+  "schema":{
+    "schemaName": "tableSchema",
+    "dimensionFieldSpecs": [
+      {
+        "name": "b",
+        "dataType": "DOUBLE",
+        "cardinality":6,
+        "singleValueField": false,
+        "numValuesPerEntry":1.5
+      },
+      {
+        "name": "c",
+        "dataType": "FLOAT",
+        "cardinality":7,
+        "numValuesPerEntry":1
+      },
+      {
+        "name": "d",
+        "dataType": "STRING",
+        "cardinality": 41,
+        "singleValueField": false,
+        "numValuesPerEntry": 2,
+        "averageLength": 27
+      }
+    ],
+    "metricFieldSpecs": [
+      {
+        "name": "p",
+        "dataType": "DOUBLE",
+        "cardinality":10000,
+        "numValuesPerEntry":1
+      }
+    ],
+    "dateTimeFieldSpecs": [
+      {
+        "name": "x",
+        "dataType": "INT",
+        "format": "1:DAYS:EPOCH",
+        "granularity": "1:DAYS",
+        "cardinality": 3
+      }
+    ],
+    "timeFieldSpec": {
+      "incomingGranularitySpec": {
+        "dataType": "INT",
+        "name": "t",
+        "cardinality": 1,
+        "timeType": "DAYS",
+        "numValuesPerEntry":1
+      }
+    }
+  },
+  "queriesWithWeights":{
+    "select d from tableName where (x=0 and b=1) or t=16": 3.5,
+    "select d from tableName where (x=12 and p=4) ": 3,
+    "select d from tableName where (t=7 and b=12) ": 1.5
+  },
+  "qps": 250,
+  "numMessagesPerSecInKafkaTopic":1000,
+  "numRecordsPerPush":1000000000,
+  "tableType": "HYBRID",
+  "latencySLA": 500
+}
diff --git a/pinot-controller/src/test/resources/recommenderInput/FlagQueryInput.json b/pinot-controller/src/test/resources/recommenderInput/FlagQueryInput.json
index 0f256bc..a820e76 100644
--- a/pinot-controller/src/test/resources/recommenderInput/FlagQueryInput.json
+++ b/pinot-controller/src/test/resources/recommenderInput/FlagQueryInput.json
@@ -115,6 +115,15 @@
         "numValuesPerEntry":1
       }
     ],
+    "dateTimeFieldSpecs": [
+      {
+        "name": "x",
+        "dataType": "INT",
+        "format": "1:DAYS:EPOCH",
+        "granularity": "1:DAYS",
+        "cardinality": 1000
+      }
+    ],
     "timeFieldSpec": {
       "incomingGranularitySpec": {
         "dataType": "INT",
@@ -128,8 +137,10 @@
   "queriesWithWeights":{
     "select f from tableName": 0.1,
     "select f from tableName where a =3": 0.1,
-    "not a valid query": 0.1,
-    "select g from tableName LIMIT 1000000000": 0.1
+    "select * from tableName": 0.1,
+    "select g from tableName LIMIT 1000000000": 0.1,
+    "select f from tableName where x = 2": 0.2,
+    "select f from tableName where t = 3": 0.3
   },
   "numRecordsPerPush":1000000000
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org