You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by si...@apache.org on 2021/05/19 00:19:58 UTC
[incubator-pinot] branch master updated: Fix rules to include
DateTime column (#6937) (#6938)
This is an automated email from the ASF dual-hosted git repository.
siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 98891a3 Fix rules to include DateTime column (#6937) (#6938)
98891a3 is described below
commit 98891a3b7ee4a9578caf285a914ca5f9014bf083
Author: Sharayu <ga...@gmail.com>
AuthorDate: Tue May 18 17:19:43 2021 -0700
Fix rules to include DateTime column (#6937) (#6938)
---
.../controller/recommender/io/InputManager.java | 22 +++-----
.../recommender/rules/impl/BloomFilterRule.java | 12 ++--
.../recommender/rules/impl/FlagQueryRule.java | 13 +++--
.../NoDictionaryOnHeapDictionaryJointRule.java | 2 +-
.../rules/impl/PinotTablePartitionRule.java | 2 +-
.../rules/io/params/RecommenderConstants.java | 8 +--
.../utils/QueryInvertedSortedIndexRecommender.java | 6 +-
.../controller/recommender/TestConfigEngine.java | 23 ++++++--
.../BloomFilterInputWithDateTimeColumn.json | 64 ++++++++++++++++++++++
.../resources/recommenderInput/FlagQueryInput.json | 15 ++++-
10 files changed, 124 insertions(+), 43 deletions(-)
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/io/InputManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/io/InputManager.java
index 2635744..152e96d 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/io/InputManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/io/InputManager.java
@@ -229,9 +229,8 @@ public class InputManager {
_metricNames = new HashSet<>(_schema.getMetricNames());
_dateTimeNames = new HashSet<>(_schema.getDateTimeNames());
- String primaryTimeCol;
- if ((primaryTimeCol = getPrimaryTimeCol()) != null) {
- _dateTimeNames.add(primaryTimeCol);
+ if (_schema.getTimeFieldSpec() != null) {
+ _dateTimeNames.add(_schema.getTimeFieldSpec().getName());
}
_intToColNameMap = new String[_dimNames.size() + _metricNames.size() + _dateTimeNames.size()];
@@ -444,15 +443,10 @@ public class InputManager {
return _colNameToIntMap.size();
}
- //TODO: Currently Pinot is using only ONE time column specified by TimeFieldSpec
- // Change the implementation after the new schema with multiple _dateTimeNames is in use
- // Return the time column used in server level filtering
- public String getPrimaryTimeCol() {
- if (_schema.getTimeFieldSpec() != null) {
- return _schema.getTimeFieldSpec().getName();
- } else {
- return null;
- }
+ // Provides set of time columns.
+ // This could be at most 1 from TimeFieldSpec and 1 or more from DatetimeFieldSpec
+ public Set<String> getTimeColumns() {
+ return _dateTimeNames;
}
public Set<String> getColNamesNoDictionary() {
@@ -580,8 +574,8 @@ public class InputManager {
return _dimNames.contains(colName);
}
- public boolean isPrimaryDateTime(String colName) {
- return colName != null && colName.equalsIgnoreCase(getPrimaryTimeCol());
+ public boolean isTimeOrDateTimeColumn(String colName) {
+ return colName != null && getTimeColumns().stream().anyMatch(d -> colName.equalsIgnoreCase(d));
}
public void estimateSizePerRecord()
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/impl/BloomFilterRule.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/impl/BloomFilterRule.java
index 8d3977e..18af61d 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/impl/BloomFilterRule.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/impl/BloomFilterRule.java
@@ -48,8 +48,8 @@ public class BloomFilterRule extends AbstractRule {
@Override
public void run() {
- int numDims = _input.getNumDims();
- double[] weights = new double[numDims];
+ int numCols = _input.getNumCols();
+ double[] weights = new double[numCols];
AtomicDouble totalWeight = new AtomicDouble(0);
// For each query, find out the dimensions used in 'EQ'
@@ -65,7 +65,7 @@ public class BloomFilterRule extends AbstractRule {
});
LOGGER.debug("Weight: {}, Total {}", weights, totalWeight);
- for (int i = 0; i < numDims; i++) {
+ for (int i = 0; i < numCols; i++) {
String dimName = _input.intToColName(i);
if (((weights[i] / totalWeight.get()) > _params.THRESHOLD_MIN_PERCENT_EQ_BLOOMFILTER)
//The partitioned dimension should be frequently > P used
@@ -108,10 +108,6 @@ public class BloomFilterRule extends AbstractRule {
String colName = lhs.toString();
if (lhs.getType() == ExpressionContext.Type.FUNCTION) {
LOGGER.trace("Skipping the function {}", colName);
- } else if (_input.isPrimaryDateTime(colName)) {
- LOGGER.trace("Skipping the DateTime column {}", colName);
- } else if (!_input.isDim(colName)) {
- LOGGER.error("Error: Column {} should not appear in filter", colName);
} else if (filterContext.getPredicate().getType() == Predicate.Type.EQ) {
ret.add(_input.colNameToInt(colName));
}
@@ -120,6 +116,6 @@ public class BloomFilterRule extends AbstractRule {
}
private FixedLenBitset MUTABLE_EMPTY_SET() {
- return new FixedLenBitset(_input.getNumDims());
+ return new FixedLenBitset(_input.getNumCols());
}
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/impl/FlagQueryRule.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/impl/FlagQueryRule.java
index 5e4ea3a..3839093 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/impl/FlagQueryRule.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/impl/FlagQueryRule.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.pinot.controller.recommender.rules.impl;
import java.util.HashSet;
@@ -36,7 +37,7 @@ import static org.apache.pinot.controller.recommender.rules.io.params.Recommende
* Flag the queries that are not valid:
* Flag the queries with LIMIT value higher than a threshold.
* Flag the queries that are not using any filters.
- * Flag the queries that are not using any filters.
+ * Flag the queries that are not using any time filters.
*/
public class FlagQueryRule extends AbstractRule {
private final Logger LOGGER = LoggerFactory.getLogger(FlagQueryRule.class);
@@ -61,11 +62,15 @@ public class FlagQueryRule extends AbstractRule {
//Flag the queries that are not using any filters.
_output.getFlaggedQueries().add(query, WARNING_NO_FILTERING);
}
- else { //Flag the queries that are not using any filters.
+ else { //Flag the queries that are not using any time filters.
Set<String> usedCols = new HashSet<>();
queryContext.getFilter().getColumns(usedCols);
- if (!usedCols.contains(_input.getPrimaryTimeCol())){
- _output.getFlaggedQueries().add(query, WARNING_NO_TIME_COL);
+ Set<String> timeCols = _input.getTimeColumns();
+ if(!timeCols.isEmpty()) {
+ usedCols.retainAll(timeCols);
+ if (usedCols.isEmpty()) {
+ _output.getFlaggedQueries().add(query, WARNING_NO_TIME_COL);
+ }
}
}
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/impl/NoDictionaryOnHeapDictionaryJointRule.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/impl/NoDictionaryOnHeapDictionaryJointRule.java
index 221434d..9839d11 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/impl/NoDictionaryOnHeapDictionaryJointRule.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/impl/NoDictionaryOnHeapDictionaryJointRule.java
@@ -222,7 +222,7 @@ public class NoDictionaryOnHeapDictionaryJointRule extends AbstractRule {
Predicate predicate = filterContext.getPredicate();
ExpressionContext lhs = predicate.getLhs();
String colName = lhs.toString();
- if (lhs.getType() == ExpressionContext.Type.FUNCTION || _input.isPrimaryDateTime(colName)) {
+ if (lhs.getType() == ExpressionContext.Type.FUNCTION || _input.isTimeOrDateTimeColumn(colName)) {
LOGGER.trace("Skipping this column {}", colName);
} else if (!_input.isDim(colName)) {
LOGGER.error("Error: Column {} should not appear in filter", colName);
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/impl/PinotTablePartitionRule.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/impl/PinotTablePartitionRule.java
index 433d5cd..7b7ddb3 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/impl/PinotTablePartitionRule.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/impl/PinotTablePartitionRule.java
@@ -217,7 +217,7 @@ public class PinotTablePartitionRule extends AbstractRule {
String colName = lhs.toString();
if (lhs.getType() == ExpressionContext.Type.FUNCTION) {
LOGGER.trace("Skipping the function {}", colName);
- } else if (_input.isPrimaryDateTime(colName)) {
+ } else if (_input.isTimeOrDateTimeColumn(colName)) {
LOGGER.trace("Skipping the DateTime column {}", colName);
return null;
} else if (!_input.isDim(colName)) {
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/io/params/RecommenderConstants.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/io/params/RecommenderConstants.java
index 096c38f..ebb6070 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/io/params/RecommenderConstants.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/io/params/RecommenderConstants.java
@@ -77,10 +77,10 @@ public class RecommenderConstants {
public static class FlagQueryRuleParams{
public static final long DEFAULT_THRESHOLD_MAX_LIMIT_SIZE = 100000;
- public static final String WARNING_NO_FILTERING = "Warning: No filtering in ths query";
- public static final String WARNING_NO_TIME_COL = "Warning: No time column used in ths query";
- public static final String WARNING_TOO_LONG_LIMIT = "Warning: The size of LIMIT is longer than " + DEFAULT_THRESHOLD_MAX_LIMIT_SIZE;
- public static final String ERROR_INVALID_QUERY = "Error: query not able to parse, skipped";
+ public static final String WARNING_NO_FILTERING = "Warning: Query seems to scan the entire table. No filters are used in the query. Please verify if filters are not needed.";
+ public static final String WARNING_NO_TIME_COL = "Warning: No time column used in filter in the query. Table with time columns typically use it in filters to make the queries more selective.";
+ public static final String WARNING_TOO_LONG_LIMIT = "Warning: Please verify if you need to pull out huge number of records for this query. Consider using smaller limit than " + DEFAULT_THRESHOLD_MAX_LIMIT_SIZE;
+ public static final String ERROR_INVALID_QUERY = "Error: Invalid query syntax. Please fix the query";
}
public static class RealtimeProvisioningRule {
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/utils/QueryInvertedSortedIndexRecommender.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/utils/QueryInvertedSortedIndexRecommender.java
index 8fa8c4c..80ecfc5 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/utils/QueryInvertedSortedIndexRecommender.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/utils/QueryInvertedSortedIndexRecommender.java
@@ -531,10 +531,6 @@ public class QueryInvertedSortedIndexRecommender {
.setRecommendationPriorityEnum(RecommendationPriorityEnum.NON_CANDIDATE_SCAN) // won't recommend index
.setnESI(nESI).setPercentSelected(_params.PERCENT_SELECT_FOR_FUNCTION).setnESIWithIdx(nESI).build();
}
- // Skip time columns
- else if (_inputManager.isPrimaryDateTime(colName)) {
- return null;
- }
// Not a valid dimension name
else if (!_inputManager.isDim(colName)) {
LOGGER.error("Error: Column {} should not appear in filter", colName);
@@ -759,4 +755,4 @@ public class QueryInvertedSortedIndexRecommender {
private FixedLenBitset MUTABLE_EMPTY_SET() {
return new FixedLenBitset(_numDimsIndexApplicable);
}
-}
\ No newline at end of file
+}
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/recommender/TestConfigEngine.java b/pinot-controller/src/test/java/org/apache/pinot/controller/recommender/TestConfigEngine.java
index a050ec7..4b0aaad 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/recommender/TestConfigEngine.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/recommender/TestConfigEngine.java
@@ -88,7 +88,7 @@ public class TestConfigEngine {
assertEquals(_input.getAverageDataLen("g"), 100);
assertTrue(_input.isSingleValueColumn("j"));
assertFalse(_input.isSingleValueColumn("i"));
- assertEquals(_input.getPrimaryTimeCol(),"t");
+ assertTrue(_input.getTimeColumns().contains("t"));
}
@Test
@@ -176,7 +176,6 @@ public class TestConfigEngine {
loadInput("recommenderInput/InvalidInput2.json");
}
-
@Test
void testFlagQueryRule()
throws InvalidInputException, IOException {
@@ -185,8 +184,13 @@ public class TestConfigEngine {
AbstractRule abstractRule =
RulesToExecute.RuleFactory.getRule(RulesToExecute.Rule.FlagQueryRule, _input, output);
abstractRule.run();
- assertEquals(output.getFlaggedQueries().getFlaggedQueries().toString(),
- "{select g from tableName LIMIT 1000000000=Warning: The size of LIMIT is longer than 100000 | Warning: No filtering in ths query, not a valid query=Error: query not able to parse, skipped, select f from tableName=Warning: No filtering in ths query, select f from tableName where a =3=Warning: No time column used in ths query}");
+
+ assertFalse(output.getFlaggedQueries().getFlaggedQueries().containsKey("select f from tableName where x = 2"));
+ assertFalse(output.getFlaggedQueries().getFlaggedQueries().containsKey("select f from tableName where t = 3"));
+ assertTrue(output.getFlaggedQueries().getFlaggedQueries().containsKey("select * from tableName"));
+ assertTrue(output.getFlaggedQueries().getFlaggedQueries().containsKey("select f from tableName"));
+ assertTrue(output.getFlaggedQueries().getFlaggedQueries().containsKey("select f from tableName where a =3"));
+ assertTrue(output.getFlaggedQueries().getFlaggedQueries().containsKey("select g from tableName LIMIT 1000000000"));
}
@Test
@@ -212,6 +216,17 @@ public class TestConfigEngine {
}
@Test
+ void testBloomFilterRuleWithTimeSpecColumn()
+ throws InvalidInputException, IOException {
+ loadInput("recommenderInput/BloomFilterInputWithDateTimeColumn.json");
+ ConfigManager output = new ConfigManager();
+ AbstractRule abstractRule =
+ RulesToExecute.RuleFactory.getRule(RulesToExecute.Rule.BloomFilterRule, _input, output);
+ abstractRule.run();
+ assertEquals(output.getIndexConfig().getBloomFilterColumns().toString(), "[b, t, x]");
+ }
+
+ @Test
void testNoDictionaryOnHeapDictionaryJointRule()
throws InvalidInputException, IOException {
loadInput("recommenderInput/NoDictionaryOnHeapDictionaryJointRuleInput.json");
diff --git a/pinot-controller/src/test/resources/recommenderInput/BloomFilterInputWithDateTimeColumn.json b/pinot-controller/src/test/resources/recommenderInput/BloomFilterInputWithDateTimeColumn.json
new file mode 100644
index 0000000..2259fcc
--- /dev/null
+++ b/pinot-controller/src/test/resources/recommenderInput/BloomFilterInputWithDateTimeColumn.json
@@ -0,0 +1,64 @@
+{
+ "schema":{
+ "schemaName": "tableSchema",
+ "dimensionFieldSpecs": [
+ {
+ "name": "b",
+ "dataType": "DOUBLE",
+ "cardinality":6,
+ "singleValueField": false,
+ "numValuesPerEntry":1.5
+ },
+ {
+ "name": "c",
+ "dataType": "FLOAT",
+ "cardinality":7,
+ "numValuesPerEntry":1
+ },
+ {
+ "name": "d",
+ "dataType": "STRING",
+ "cardinality": 41,
+ "singleValueField": false,
+ "numValuesPerEntry": 2,
+ "averageLength": 27
+ }
+ ],
+ "metricFieldSpecs": [
+ {
+ "name": "p",
+ "dataType": "DOUBLE",
+ "cardinality":10000,
+ "numValuesPerEntry":1
+ }
+ ],
+ "dateTimeFieldSpecs": [
+ {
+ "name": "x",
+ "dataType": "INT",
+ "format": "1:DAYS:EPOCH",
+ "granularity": "1:DAYS",
+ "cardinality": 3
+ }
+ ],
+ "timeFieldSpec": {
+ "incomingGranularitySpec": {
+ "dataType": "INT",
+ "name": "t",
+ "cardinality": 1,
+ "timeType": "DAYS",
+ "numValuesPerEntry":1
+ }
+ }
+ },
+ "queriesWithWeights":{
+ "select d from tableName where (x=0 and b=1) or t=16": 3.5,
+ "select d from tableName where (x=12 and p=4) ": 3,
+ "select d from tableName where (t=7 and b=12) ": 1.5
+ },
+ "qps": 250,
+ "numMessagesPerSecInKafkaTopic":1000,
+ "numRecordsPerPush":1000000000,
+ "tableType": "HYBRID",
+ "latencySLA": 500
+}
diff --git a/pinot-controller/src/test/resources/recommenderInput/FlagQueryInput.json b/pinot-controller/src/test/resources/recommenderInput/FlagQueryInput.json
index 0f256bc..a820e76 100644
--- a/pinot-controller/src/test/resources/recommenderInput/FlagQueryInput.json
+++ b/pinot-controller/src/test/resources/recommenderInput/FlagQueryInput.json
@@ -115,6 +115,15 @@
"numValuesPerEntry":1
}
],
+ "dateTimeFieldSpecs": [
+ {
+ "name": "x",
+ "dataType": "INT",
+ "format": "1:DAYS:EPOCH",
+ "granularity": "1:DAYS",
+ "cardinality": 1000
+ }
+ ],
"timeFieldSpec": {
"incomingGranularitySpec": {
"dataType": "INT",
@@ -128,8 +137,10 @@
"queriesWithWeights":{
"select f from tableName": 0.1,
"select f from tableName where a =3": 0.1,
- "not a valid query": 0.1,
- "select g from tableName LIMIT 1000000000": 0.1
+ "select * from tableName": 0.1,
+ "select g from tableName LIMIT 1000000000": 0.1,
+ "select f from tableName where x = 2": 0.2,
+ "select f from tableName where t = 3": 0.3
},
"numRecordsPerPush":1000000000
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org