You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2020/08/21 21:03:24 UTC

[incubator-pinot] branch master updated: Refactored code for overwritten configs (#5875)

This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new e7e9775  Refactored code for overwritten configs (#5875)
e7e9775 is described below

commit e7e9775bddedf650d1ff197506d43089aa5d4f53
Author: JasperJiaGuo <ja...@gmail.com>
AuthorDate: Fri Aug 21 17:00:37 2020 -0400

    Refactored code for overwritten configs (#5875)
    
    Fixed an issue where the engine may crash if empty query patterns are provided. Added test case for that.
    
    sonly recommend inverted indices if sorted index presents in the overwritten config
    
    reduced time complexity for combination generation
    
    1. fixed a bug that the engine will crash when parsing X NOT IN ('x')
    2. now the code will not complain even if no time spec appears in the schema
---
 .../controller/recommender/io/InputManager.java    | 68 +++++++++---------
 .../recommender/rules/RulesToExecute.java          |  2 +-
 .../rules/impl/InvertedSortedIndexJointRule.java   | 80 +++++++++++++---------
 .../recommender/rules/impl/KafkaPartitionRule.java |  2 +-
 .../NoDictionaryOnHeapDictionaryJointRule.java     | 13 ++--
 .../rules/impl/PinotTablePartitionRule.java        | 37 ++++++----
 .../recommender/rules/io/configs/IndexConfig.java  | 14 ++--
 .../rules/io/configs/PartitionConfig.java          | 19 +++++
 .../params/InvertedSortedIndexJointRuleParams.java |  6 +-
 .../rules/io/params/PartitionRuleParams.java       |  2 +-
 .../rules/io/params/RecommenderConstants.java      | 12 ++--
 .../utils/QueryInvertedSortedIndexRecommender.java | 43 +++++-------
 .../controller/recommender/TestConfigEngine.java   | 11 ++-
 .../recommenderInput/BloomFilterInput.json         |  2 +-
 .../recommenderInput/DataSizeCalculationInput.json |  2 +-
 ...titionRuleInput.json => EmptyQueriesInput.json} | 69 +++----------------
 .../recommenderInput/KafkaPartitionRuleInput.json  |  2 +-
 .../recommenderInput/KafkaPartitionRuleInput2.json |  2 +-
 ...NoDictionaryOnHeapDictionaryJointRuleInput.json |  2 +-
 .../recommenderInput/SortedInvertedIndexInput.json |  2 +-
 20 files changed, 197 insertions(+), 193 deletions(-)

diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/io/InputManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/io/InputManager.java
index 0085061..8a64892 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/io/InputManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/io/InputManager.java
@@ -26,25 +26,14 @@ import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.ObjectReader;
 import com.google.common.annotations.VisibleForTesting;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.commons.lang3.tuple.Triple;
 import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.controller.recommender.exceptions.InvalidInputException;
 import org.apache.pinot.controller.recommender.io.metadata.FieldMetadata;
 import org.apache.pinot.controller.recommender.io.metadata.SchemaWithMetaData;
 import org.apache.pinot.controller.recommender.rules.RulesToExecute;
-import org.apache.pinot.controller.recommender.rules.io.params.BloomFilterRuleParams;
 import org.apache.pinot.controller.recommender.rules.io.params.FlagQueryRuleParams;
-import org.apache.pinot.controller.recommender.rules.io.params.InvertedSortedIndexJointRuleParams;
-import org.apache.pinot.controller.recommender.rules.io.params.NoDictionaryOnHeapDictionaryJointRuleParams;
-import org.apache.pinot.controller.recommender.rules.io.params.PartitionRuleParams;
+import org.apache.pinot.controller.recommender.rules.io.params.*;
 import org.apache.pinot.controller.recommender.rules.utils.FixedLenBitset;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.core.query.request.context.utils.BrokerRequestToQueryContextConverter;
@@ -59,6 +48,10 @@ import org.apache.pinot.sql.parsers.SqlCompilationException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
+
 import static java.lang.Math.max;
 import static java.lang.Math.pow;
 import static org.apache.pinot.controller.recommender.rules.io.params.RecommenderConstants.*;
@@ -76,10 +69,10 @@ public class InputManager {
   /******************************Deserialized from input json*********************************/
   // Basic input fields
 
-  public Long _qps;
-  public Long _numMessagesPerSecInKafKaTopic; // messages per sec for kafka to consume
-  public Long _numRecordsPerPush; // records per push for offline part of a table
-  public Long _latencySLA; // latency sla in ms
+  public Long _qps = DEFAULT_QPS;
+  public Long _numMessagesPerSecInKafkaTopic = DEFAULT_NUM_MESSAGES_PER_SEC_IN_KAFKA_TOPIC; // messages per sec for kafka to consume
+  public Long _numRecordsPerPush = DEFAULT_NUM_RECORDS_PER_PUSH; // records per push for offline part of a table
+  public Long _latencySLA = DEFAULT_LATENCY_SLA; // latency sla in ms
 
   public RulesToExecute _rulesToExecute = new RulesToExecute(); // dictates which rules to execute
   public Schema _schema = new Schema();
@@ -96,7 +89,7 @@ public class InputManager {
   // If the cardinality given by the customer is the global cardinality for the dataset (even potential data)
   // If true, the cardinality will be regulated see regulateCardinality()
   // TODO: Set to dsiabled for now, will discuss this in the next PR
-  public boolean useCardinalityNormalization = DEFAULT_USE_CARDINALITY_NORMALIZATION;
+  public boolean _useCardinalityNormalization = DEFAULT_USE_CARDINALITY_NORMALIZATION;
 
 
 
@@ -148,14 +141,14 @@ public class InputManager {
     reorderDimsAndBuildMap();
     registerColnameFieldType();
     validateQueries();
-    if (useCardinalityNormalization){
+    if (_useCardinalityNormalization){
       regulateCardinalityForAll();
     }
   }
   private void regulateCardinalityForAll(){
     double sampleSize;
     if (getTableType().equalsIgnoreCase(REALTIME)){
-      sampleSize = getSegmentFlushTime() * getNumMessagesPerSecInKafKaTopic();
+      sampleSize = getSegmentFlushTime() * getNumMessagesPerSecInKafkaTopic();
     }
     else{
       sampleSize = getNumRecordsPerPush();
@@ -199,14 +192,14 @@ public class InputManager {
   private void reorderDimsAndBuildMap()
       throws InvalidInputException {
 
-    Set<String> sortedColumn = _overWrittenConfigs.getIndexConfig().getSortedColumn();
+    String sortedColumn = _overWrittenConfigs.getIndexConfig().getSortedColumn();
     Set<String> invertedIndexColumns = _overWrittenConfigs.getIndexConfig().getInvertedIndexColumns();
     Set<String> rangeIndexColumns = _overWrittenConfigs.getIndexConfig().getRangeIndexColumns();
     Set<String> noDictionaryColumns = _overWrittenConfigs.getIndexConfig().getNoDictionaryColumns();
 
     /*Validate if there's conflict between NoDictionaryColumns and dimNamesWithAnyIndex*/
     Set<String> dimNamesWithAnyIndex = new HashSet<>();
-    dimNamesWithAnyIndex.addAll(sortedColumn);
+    dimNamesWithAnyIndex.add(sortedColumn);
     dimNamesWithAnyIndex.addAll(invertedIndexColumns);
     dimNamesWithAnyIndex.addAll(rangeIndexColumns);
     for (String colName : noDictionaryColumns) {
@@ -228,17 +221,17 @@ public class InputManager {
     _dimNames = new HashSet<>(_schema.getDimensionNames());
     _metricNames = new HashSet<>(_schema.getMetricNames());
     _dateTimeNames = new HashSet<>(_schema.getDateTimeNames());
-    try {
-      _dateTimeNames.add(getPrimaryTimeCol());
-    } catch (NullPointerException e) {
-      throw new InvalidInputException("No TimeFieldSpec specified in the schema!");
+
+    String primaryTimeCol;
+    if ((primaryTimeCol = getPrimaryTimeCol()) != null) {
+      _dateTimeNames.add(primaryTimeCol);
     }
 
     _intToColNameMap = new String[_dimNames.size() + _metricNames.size() + _dateTimeNames.size()];
     _colNameToIntMap = new HashMap<>();
 
     _dimNamesInvertedSortedIndexApplicable = new HashSet<>(_dimNames);
-    _dimNamesInvertedSortedIndexApplicable.removeAll(sortedColumn);
+    _dimNamesInvertedSortedIndexApplicable.remove(sortedColumn);
     _dimNamesInvertedSortedIndexApplicable.removeAll(invertedIndexColumns);
     _dimNamesInvertedSortedIndexApplicable.removeAll(noDictionaryColumns);
 
@@ -280,7 +273,7 @@ public class InputManager {
 
   @JsonSetter(nulls = Nulls.SKIP)
   public void setUseCardinalityNormalization(boolean cardinalityGlobal) {
-    useCardinalityNormalization = cardinalityGlobal;
+    _useCardinalityNormalization = cardinalityGlobal;
   }
 
   @JsonSetter(nulls = Nulls.SKIP)
@@ -330,8 +323,8 @@ public class InputManager {
   }
 
   @JsonSetter(nulls = Nulls.SKIP)
-  public void setNumMessagesPerSecInKafKaTopic(long numMessagesPerSecInKafKaTopic) {
-    _numMessagesPerSecInKafKaTopic = numMessagesPerSecInKafKaTopic;
+  public void setNumMessagesPerSecInKafkaTopic(long numMessagesPerSecInKafkaTopic) {
+    _numMessagesPerSecInKafkaTopic = numMessagesPerSecInKafkaTopic;
   }
 
   @JsonSetter(nulls = Nulls.SKIP)
@@ -385,7 +378,7 @@ public class InputManager {
   }
 
   public boolean isUseCardinalityNormalization() {
-    return useCardinalityNormalization;
+    return _useCardinalityNormalization;
   }
 
   public Set<String> getParsedQueries() {
@@ -442,9 +435,14 @@ public class InputManager {
   }
 
   //TODO: Currently Pinot is using only ONE time column specified by TimeFieldSpec
-  //TODO: Change the implementation after the new schema with multiple _dateTimeNames is in use
+  // Change the implementation after the new schema with multiple _dateTimeNames is in use
+  // Return the time column used in server level filtering
   public String getPrimaryTimeCol() {
-    return _schema.getTimeFieldSpec().getName();
+    if (_schema.getTimeFieldSpec() != null) {
+      return _schema.getTimeFieldSpec().getName();
+    } else {
+      return null;
+    }
   }
 
   public Set<String> getColNamesNoDictionary() {
@@ -471,8 +469,8 @@ public class InputManager {
     return _tableType;
   }
 
-  public long getNumMessagesPerSecInKafKaTopic() {
-    return _numMessagesPerSecInKafKaTopic;
+  public long getNumMessagesPerSecInKafkaTopic() {
+    return _numMessagesPerSecInKafkaTopic;
   }
 
   public long getNumRecordsPerPush() {
@@ -610,7 +608,7 @@ public class InputManager {
   }
 
   public boolean isPrimaryDateTime(String colName) {
-    return getPrimaryTimeCol().equalsIgnoreCase(colName);
+    return colName!=null && colName.equalsIgnoreCase(getPrimaryTimeCol());
   }
 
   public void estimateSizePerRecord()
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/RulesToExecute.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/RulesToExecute.java
index bb8657d..3ca8cc0 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/RulesToExecute.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/RulesToExecute.java
@@ -137,7 +137,7 @@ public class RulesToExecute {
   // Be careful with the sequence, each rule can execute individually
   // but a rule may depend on its previous rule when they both fired
   public enum Rule {
-    FlagQueryRule, // Should always fire to exclude invalid queries
+    FlagQueryRule, 
     KafkaPartitionRule,
     InvertedSortedIndexJointRule,
     NoDictionaryOnHeapDictionaryJointRule, // NoDictionaryOnHeapDictionaryJointRule must go after InvertedSortedIndexJointRule since we do not recommend NoDictionary on cols with indices
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/impl/InvertedSortedIndexJointRule.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/impl/InvertedSortedIndexJointRule.java
index a6415cc..b37d99a 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/impl/InvertedSortedIndexJointRule.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/impl/InvertedSortedIndexJointRule.java
@@ -18,23 +18,20 @@
  */
 package org.apache.pinot.controller.recommender.rules.impl;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Optional;
-import java.util.stream.Collectors;
 import org.apache.commons.lang3.tuple.Pair;
-import org.apache.pinot.controller.recommender.rules.utils.FixedLenBitset;
-import org.apache.pinot.controller.recommender.rules.utils.QueryInvertedSortedIndexRecommender;
 import org.apache.pinot.controller.recommender.io.ConfigManager;
 import org.apache.pinot.controller.recommender.io.InputManager;
 import org.apache.pinot.controller.recommender.rules.AbstractRule;
 import org.apache.pinot.controller.recommender.rules.io.params.InvertedSortedIndexJointRuleParams;
+import org.apache.pinot.controller.recommender.rules.utils.FixedLenBitset;
 import org.apache.pinot.controller.recommender.rules.utils.PredicateParseResult;
+import org.apache.pinot.controller.recommender.rules.utils.QueryInvertedSortedIndexRecommender;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.*;
+import java.util.stream.Collectors;
+
 
 /**
  * Recommend one sorted index and inverted indices
@@ -124,8 +121,8 @@ public class InvertedSortedIndexJointRule extends AbstractRule {
     }
 
     if (!dimNameWeightPairRank.isEmpty()) {
-      if (_input.getOverWrittenConfigs().getIndexConfig().getSortedColumn().size()
-          > 0) { // if an overwritten sorted index presents
+      if (_input.getOverWrittenConfigs().getIndexConfig().isSortedColumnOverwritten()) {
+        // if an overwritten sorted index presents
         dimNameWeightPairRank.forEach(pair -> {
           _output.getIndexConfig().getInvertedIndexColumns()
               .add(pair.getLeft()); // Add recommendations to inverted index set
@@ -149,12 +146,11 @@ public class InvertedSortedIndexJointRule extends AbstractRule {
             .max(Comparator.comparing(pair -> // pick dimension with highest Cardinality as sorted index
                 _input.getCardinality(pair.getLeft())));
         if (sortedColumn.isPresent()) {
-          _output.getIndexConfig().getSortedColumn().add(sortedColumn.get().getLeft());
+          _output.getIndexConfig().setSortedColumn(sortedColumn.get().getLeft());
           dimNameWeightPairRank.stream().filter(pair -> pair != sortedColumn.get())
               .forEach(pair -> _output.getIndexConfig().getInvertedIndexColumns().add(pair.getLeft()));
         } else {
-          dimNameWeightPairRank
-              .forEach(pair -> _output.getIndexConfig().getInvertedIndexColumns().add(pair.getLeft()));
+          dimNameWeightPairRank.forEach(pair -> _output.getIndexConfig().getInvertedIndexColumns().add(pair.getLeft()));
         }
       }
     }
@@ -174,6 +170,7 @@ public class InvertedSortedIndexJointRule extends AbstractRule {
       PredicateParseResult currentCombination = evaluateCombination(n, r, parsedQuery);
       LOGGER.debug("findOptimalCombination: currentCombination: {}", currentCombination);
       double ratio = (optimalCombinationResult.getnESIWithIdx() - currentCombination.getnESIWithIdx()) / _totalNESI;
+      LOGGER.debug("ratio {}", ratio);
       if (ratio > _params.THRESHOLD_RATIO_MIN_GAIN_DIFF_BETWEEN_ITERATION) {
         optimalCombinationResult = currentCombination;
         iterationsWithoutGain = 0;
@@ -197,28 +194,39 @@ public class InvertedSortedIndexJointRule extends AbstractRule {
    * {@link PredicateParseResult#getnESI()} the nESI before applying any index
    * {@link PredicateParseResult#getnESIWithIdx()} the estimated nESI after applying optimal indices
    */
-  public PredicateParseResult evaluateCombination(int n, int r, List<List<PredicateParseResult>> parsedQuery) {
-    List<int[]> combinationIntArrays = generateCombinations(n, r);
-    LOGGER.debug("combinationIntArrays {}", combinationIntArrays);
+  public PredicateParseResult evaluateCombination(final int n, final int r,
+      List<List<PredicateParseResult>> parsedQuery) {
+    FixedLenBitset usedCols = new FixedLenBitset(n);
+    parsedQuery.forEach(list -> list.stream()
+        .filter(predicateParseResult -> (predicateParseResult.getCandidateDims().getCardinality() <= r))
+        .forEach(predicateParseResult -> usedCols.union(predicateParseResult.getCandidateDims())));
+    LOGGER.debug("totalUsed {}", usedCols.getCardinality());
+
+    List<Integer> usedColIDs = usedCols.getOffsets();
+    int nCapped = usedColIDs.size();
+    int rCapped = Math.min(r, nCapped);
+
+    int[] idToColID = new int[nCapped];
+    for (int i = 0; i < nCapped; i++) {
+      idToColID[i] = usedColIDs.get(i);
+    }
 
     // Enumerate all possible combinations of r-sized set, which will be applied indices on
-    List<FixedLenBitset> combinations = combinationIntArrays.parallelStream().map(combinationIntArray -> {
-      FixedLenBitset indices = new FixedLenBitset(n);
-      for (int j = 0; j < r; j++) {
-        indices.add(combinationIntArray[j]);
-      }
-      return indices;
-    }).collect(Collectors.toList());
+    List<int[]> combinationIntArrays = generateCombinations(nCapped, rCapped);
 
     // Calculate the total nESIWithIdx after applying each r-sized indices
     // and pick the top r-sized indices that minimize total nESIWithIdx
-    Optional<Pair<Double, FixedLenBitset>> optimal = combinations.parallelStream().map(fixedLenBitset -> {
+    Optional<Pair<Double, FixedLenBitset>> optimal = combinationIntArrays.parallelStream().map(combinationIntArray -> {
+      FixedLenBitset fixedLenBitset = new FixedLenBitset(n);
+      for (int j = 0; j < rCapped; j++) {
+        fixedLenBitset.add(idToColID[combinationIntArray[j]]);
+      }
       double nESIWithIdx = 0;
       for (List<PredicateParseResult> exclusiveRecommendations : parsedQuery) {
         double bestNESIWithIdx = exclusiveRecommendations.get(0).getnESI();
         for (PredicateParseResult predicateParseResult : exclusiveRecommendations) {
           if (fixedLenBitset.contains(predicateParseResult.getCandidateDims())) {
-            // If the dimensions of a candidate is a subsub set of the r-sized set
+            // If the dimensions of a candidate is a subset of the r-sized set
             bestNESIWithIdx = Math.min(bestNESIWithIdx, predicateParseResult.getnESIWithIdx());
           }
         }
@@ -226,12 +234,19 @@ public class InvertedSortedIndexJointRule extends AbstractRule {
       }
       return Pair.of(nESIWithIdx, fixedLenBitset);
     }).min(Comparator.comparing(Pair::getLeft));
-
-    return PredicateParseResult.PredicateParseResultBuilder.aPredicateParseResult()
-        .setCandidateDims(optimal.get().getRight())
-        .setIteratorEvalPriorityEnum(QueryInvertedSortedIndexRecommender.IteratorEvalPriorityEnum.INDEXED)
-        .setRecommendationPriorityEnum(QueryInvertedSortedIndexRecommender.RecommendationPriorityEnum.BITMAP)
-        .setnESI(_totalNESI).setPercentSelected(0).setnESIWithIdx(optimal.get().getLeft()).build();
+    if (optimal.isPresent()) {
+      return PredicateParseResult.PredicateParseResultBuilder.aPredicateParseResult()
+          .setCandidateDims(optimal.get().getRight())
+          .setIteratorEvalPriorityEnum(QueryInvertedSortedIndexRecommender.IteratorEvalPriorityEnum.INDEXED)
+          .setRecommendationPriorityEnum(QueryInvertedSortedIndexRecommender.RecommendationPriorityEnum.BITMAP)
+          .setnESI(_totalNESI).setPercentSelected(0).setnESIWithIdx(optimal.get().getLeft()).build();
+    } else {
+      return PredicateParseResult.PredicateParseResultBuilder.aPredicateParseResult()
+          .setCandidateDims(new FixedLenBitset(n))
+          .setIteratorEvalPriorityEnum(QueryInvertedSortedIndexRecommender.IteratorEvalPriorityEnum.INDEXED)
+          .setRecommendationPriorityEnum(QueryInvertedSortedIndexRecommender.RecommendationPriorityEnum.BITMAP)
+          .setnESI(_totalNESI).setPercentSelected(0).setnESIWithIdx(_totalNESI).build();
+    }
   }
 
   /**
@@ -241,6 +256,9 @@ public class InvertedSortedIndexJointRule extends AbstractRule {
    */
   public static List<int[]> generateCombinations(int n, int r) {
     List<int[]> combinations = new ArrayList<>();
+    if (r == 0) {
+      return combinations;
+    }
     int[] combination = new int[r];
 
     // initialize with lowest lexicographic combination
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/impl/KafkaPartitionRule.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/impl/KafkaPartitionRule.java
index 84c5655..adfd6e2 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/impl/KafkaPartitionRule.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/impl/KafkaPartitionRule.java
@@ -54,7 +54,7 @@ public class KafkaPartitionRule extends AbstractRule {
         LOGGER.info("Recommending kafka partition configurations");
         LOGGER.info("*No kafka partition number found, recommending kafka partition number");
         _output.getPartitionConfig().setNumKafkaPartitions((int) Math
-            .ceil((double) _input._numMessagesPerSecInKafKaTopic / _params.KAFKA_NUM_MESSAGES_PER_SEC_PER_PARTITION));
+            .ceil((double) _input.getNumMessagesPerSecInKafkaTopic() / _params.KAFKA_NUM_MESSAGES_PER_SEC_PER_PARTITION));
         //Divide the messages/sec (total aggregate in the topic) by 250 to get an optimal value of the number of kafka partitions.
       }
       else{
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/impl/NoDictionaryOnHeapDictionaryJointRule.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/impl/NoDictionaryOnHeapDictionaryJointRule.java
index 4e2106d..475c53c 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/impl/NoDictionaryOnHeapDictionaryJointRule.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/impl/NoDictionaryOnHeapDictionaryJointRule.java
@@ -19,10 +19,6 @@
 package org.apache.pinot.controller.recommender.rules.impl;
 
 import com.google.common.util.concurrent.AtomicDouble;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
 import org.apache.pinot.controller.recommender.exceptions.InvalidInputException;
 import org.apache.pinot.controller.recommender.io.ConfigManager;
 import org.apache.pinot.controller.recommender.io.InputManager;
@@ -37,6 +33,11 @@ import org.apache.pinot.core.requesthandler.BrokerRequestOptimizer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
 import static org.apache.pinot.controller.recommender.rules.io.params.RecommenderConstants.REALTIME;
 
 
@@ -85,7 +86,7 @@ public class NoDictionaryOnHeapDictionaryJointRule extends AbstractRule {
 
     // Exclude cols already having index
     noDictCols.removeAll(_output.getIndexConfig().getInvertedIndexColumns());
-    noDictCols.removeAll(_output.getIndexConfig().getSortedColumn());
+    noDictCols.remove(_output.getIndexConfig().getSortedColumn());
     noDictCols.removeAll(_output.getIndexConfig()
         .getRangeIndexColumns()); // TODO: Remove this after range index is implemented for no-dictionary
     LOGGER.debug("noDictCols {}", noDictCols);
@@ -139,7 +140,7 @@ public class NoDictionaryOnHeapDictionaryJointRule extends AbstractRule {
 
       if (_input.getTableType().equalsIgnoreCase(REALTIME)) {
         //TODO: improve this estimation
-        numRecordsPerPush = _input.getNumMessagesPerSecInKafKaTopic() * _input.getSegmentFlushTime();
+        numRecordsPerPush = _input.getNumMessagesPerSecInKafkaTopic() * _input.getSegmentFlushTime();
       } else { // For hybrid or offline table, nodictionary follows the offline side
         numRecordsPerPush = _input.getNumRecordsPerPush();
       }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/impl/PinotTablePartitionRule.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/impl/PinotTablePartitionRule.java
index 4000b4a..0e7cf57 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/impl/PinotTablePartitionRule.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/impl/PinotTablePartitionRule.java
@@ -106,30 +106,37 @@ public class PinotTablePartitionRule extends AbstractRule {
     LOGGER.info("*Recommending number of partitions ");
     int numKafkaPartitions = _output.getPartitionConfig().getNumKafkaPartitions();
     long offLineDataSizePerPush = _input.getNumRecordsPerPush() * _input.getSizePerRecord();
-    int optimalOfflinePartitions = (int)Math.ceil((double) offLineDataSizePerPush / _params.OPTIMAL_SIZE_PER_SEGMENT);
-    if (_input.getTableType().equalsIgnoreCase(REALTIME) || _input.getTableType()
-        .equalsIgnoreCase(HYBRID)) {
+    int optimalOfflinePartitions = (int) Math.ceil((double) offLineDataSizePerPush / _params.OPTIMAL_SIZE_PER_SEGMENT);
+    if (_input.getTableType().equalsIgnoreCase(REALTIME) || _input.getTableType().equalsIgnoreCase(HYBRID)) {
       //real time num of partitions should be the same value as the number of kafka partitions
-      _output.getPartitionConfig().setNumPartitionsRealtime(numKafkaPartitions);
+      if (!_input.getOverWrittenConfigs().getPartitionConfig().isNumPartitionsRealtimeOverwritten()) {
+        _output.getPartitionConfig().setNumPartitionsRealtime(numKafkaPartitions);
+      }
     }
     if (_input.getTableType().equalsIgnoreCase(OFFLINE)) {
       //Offline partition num is dependent on the amount of data coming in on a given day.
       //Using a very high value of numPartitions for small dataset size will result in too many small sized segments.
       //We define have a desirable segment size OPTIMAL_SIZE_PER_SEGMENT
       //Divide the size of data coming in on a given day by OPTIMAL_SIZE_PER_SEGMENT we get the number of partitions.
-      _output.getPartitionConfig()
-          .setNumPartitionsOffline((int) (optimalOfflinePartitions));
+      if (!_input.getOverWrittenConfigs().getPartitionConfig().isNumPartitionsOfflineOverwritten()) {
+        _output.getPartitionConfig().setNumPartitionsOffline((int) (optimalOfflinePartitions));
+      }
     }
     if (_input.getTableType().equalsIgnoreCase(HYBRID)) {
-      _output.getPartitionConfig().setNumPartitionsOffline(
-          Math.min(optimalOfflinePartitions, numKafkaPartitions));
+      if (!_input.getOverWrittenConfigs().getPartitionConfig().isNumPartitionsOfflineOverwritten()) {
+        _output.getPartitionConfig().setNumPartitionsOffline(Math.min(optimalOfflinePartitions, numKafkaPartitions));
+      }
+    }
+
+    if (_input.getOverWrittenConfigs().getPartitionConfig().isPartitionDimensionOverwritten()){
+      return;
     }
 
     LOGGER.info("*Recommending column to partition");
 
     double[] weights = new double[_input.getNumDims()];
     _input.getParsedQueries().forEach(query -> {
-      Double weight =_input.getQueryWeight(query);
+      Double weight = _input.getQueryWeight(query);
       FixedLenBitset fixedLenBitset = parseQuery(_input.getQueryContext(query));
       LOGGER.debug("fixedLenBitset:{}", fixedLenBitset);
       if (fixedLenBitset != null) {
@@ -155,11 +162,9 @@ public class PinotTablePartitionRule extends AbstractRule {
     Optional<Pair<String, Double>> max = columnNameWeightPairRank.stream()
         .filter(columnNameWeightPair -> columnNameWeightPair.getRight() > topCandidatesThreshold)
         // filter out the dims with frequency < threshold (THRESHOLD_RATIO_DIMENSION_TO_PARTITION_TOP_CANDIDATES * max frequency)
-        .max(
-            Comparator.comparing(columnNameWeightPair -> _input.getCardinality(columnNameWeightPair.getLeft())));
+        .max(Comparator.comparing(columnNameWeightPair -> _input.getCardinality(columnNameWeightPair.getLeft())));
     // get the dimension with highest cardinality
-    max.ifPresent(
-        stringDoublePair -> _output.getPartitionConfig().setPartitionDimension(stringDoublePair.getLeft()));
+    max.ifPresent(stringDoublePair -> _output.getPartitionConfig().setPartitionDimension(stringDoublePair.getLeft()));
   }
 
   /**
@@ -208,7 +213,7 @@ public class PinotTablePartitionRule extends AbstractRule {
       } else if (!_input.isDim(colName)) {
         LOGGER.error("Error: Column {} should not appear in filter, ignoring this", colName);
         return null;
-      } else if(!_input.isSingleValueColumn(colName)){ // only SV column can be used as partitioning column
+      } else if (!_input.isSingleValueColumn(colName)) { // only SV column can be used as partitioning column
         LOGGER.trace("Skipping the MV column {}", colName);
       } else if (predicateType == Predicate.Type.IN) {
         int numValuesSelected;
@@ -216,7 +221,9 @@ public class PinotTablePartitionRule extends AbstractRule {
         InPredicate inPredicate = ((InPredicate) predicate);
         boolean isFirst = false;
         List<String> values = inPredicate.getValues();
-        if (values.get(FIRST).equals(IN_PREDICATE_ESTIMATE_LEN_FLAG) || (isFirst =
+        if (values.size() == 1) {
+          numValuesSelected = 1;
+        } else if (values.get(FIRST).equals(IN_PREDICATE_ESTIMATE_LEN_FLAG) || (isFirst =
             values.get(SECOND).equals(IN_PREDICATE_ESTIMATE_LEN_FLAG))) {
           numValuesSelected = Integer.parseInt(values.get(isFirst ? FIRST : SECOND));
         } else {
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/io/configs/IndexConfig.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/io/configs/IndexConfig.java
index 6061935..14458cc 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/io/configs/IndexConfig.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/io/configs/IndexConfig.java
@@ -31,13 +31,15 @@ import java.util.Set;
 public class IndexConfig {
     Set<String> _invertedIndexColumns = new HashSet<>();
     Set<String> _rangeIndexColumns = new HashSet<>();
-    Set<String> _sortedColumn = new HashSet<>();
+    String _sortedColumn = "";
     Set<String> _bloomFilterColumns = new HashSet<>();
 
     Set<String> _noDictionaryColumns = new HashSet<>();
     Set<String> _onHeapDictionaryColumns = new HashSet<>();
     Set<String> _variedLengthDictionaryColumns = new HashSet<>();
 
+    boolean _isSortedColumnOverwritten = false;
+
     @JsonSetter(nulls = Nulls.SKIP)
     public void setVariedLengthDictionaryColumns(Set<String> variedLengthDictionaryColumns) {
         _variedLengthDictionaryColumns = variedLengthDictionaryColumns;
@@ -64,8 +66,9 @@ public class IndexConfig {
     }
 
     @JsonSetter(nulls = Nulls.SKIP)
-    public void setSortedColumn(Set<String> sortedColumn) {
+    public void setSortedColumn(String sortedColumn) {
         this._sortedColumn = sortedColumn;
+        this._isSortedColumnOverwritten = true;
     }
 
     @JsonSetter(nulls = Nulls.SKIP)
@@ -73,6 +76,9 @@ public class IndexConfig {
         this._rangeIndexColumns = rangeIndexColumns;
     }
 
+    public boolean isSortedColumnOverwritten() {
+        return _isSortedColumnOverwritten;
+    }
 
     public Set<String> getVariedLengthDictionaryColumns() {
         return _variedLengthDictionaryColumns;
@@ -94,7 +100,7 @@ public class IndexConfig {
         return _invertedIndexColumns;
     }
 
-    public Set<String> getSortedColumn() {
+    public String getSortedColumn() {
         return _sortedColumn;
     }
 
@@ -107,7 +113,7 @@ public class IndexConfig {
     }
 
     public boolean hasSortedIndex(String colName){
-        return _sortedColumn.contains(colName);
+        return _sortedColumn.equals(colName);
     };
 
     public boolean hasRangeIndex(String colName){
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/io/configs/PartitionConfig.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/io/configs/PartitionConfig.java
index 4609b63..581c524 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/io/configs/PartitionConfig.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/io/configs/PartitionConfig.java
@@ -34,6 +34,10 @@ public class PartitionConfig {
   int _numPartitionsOffline;
   int _numPartitionsRealtime;
 
+  boolean _isNumPartitionsOfflineOverwritten = false;
+  boolean _isNumPartitionsRealtimeOverwritten = false;
+  boolean _isPartitionDimensionOverwritten = false;
+
   public PartitionConfig() {
     this._partitionDimension = "";
     this._numKafkaPartitions = DEFAULT_NUM_KAFKA_PARTITIONS;
@@ -48,6 +52,7 @@ public class PartitionConfig {
   @JsonSetter(nulls = Nulls.SKIP)
   public void setNumPartitionsRealtime(int numPartitionsRealtime) {
     _numPartitionsRealtime = numPartitionsRealtime;
+    _isNumPartitionsRealtimeOverwritten = true;
   }
 
   public String getPartitionDimension() {
@@ -57,6 +62,7 @@ public class PartitionConfig {
   @JsonSetter(nulls = Nulls.SKIP)
   public void setPartitionDimension(String partitionDimension) {
     this._partitionDimension = partitionDimension;
+    this._isPartitionDimensionOverwritten = true;
   }
 
   public int getNumPartitionsOffline() {
@@ -66,6 +72,7 @@ public class PartitionConfig {
   @JsonSetter(nulls = Nulls.SKIP)
   public void setNumPartitionsOffline(int numPartitionsOffline) {
     this._numPartitionsOffline = numPartitionsOffline;
+    this._isNumPartitionsOfflineOverwritten = true;
   }
 
   public int getNumKafkaPartitions() {
@@ -75,4 +82,16 @@ public class PartitionConfig {
   public void setNumKafkaPartitions(int numKafkaPartitions) {
     this._numKafkaPartitions = numKafkaPartitions;
   }
+
+  public boolean isNumPartitionsOfflineOverwritten() {
+    return _isNumPartitionsOfflineOverwritten;
+  }
+
+  public boolean isNumPartitionsRealtimeOverwritten() {
+    return _isNumPartitionsRealtimeOverwritten;
+  }
+
+  public boolean isPartitionDimensionOverwritten() {
+    return _isPartitionDimensionOverwritten;
+  }
 }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/io/params/InvertedSortedIndexJointRuleParams.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/io/params/InvertedSortedIndexJointRuleParams.java
index e806e40..a50a9ba 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/io/params/InvertedSortedIndexJointRuleParams.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/io/params/InvertedSortedIndexJointRuleParams.java
@@ -56,8 +56,8 @@ public class InvertedSortedIndexJointRuleParams {
   // of documents selected out. Thus we use default values.
   public Double PERCENT_SELECT_FOR_FUNCTION = DEFAULT_PERCENT_SELECT_FOR_FUNCTION;
   public Double PERCENT_SELECT_FOR_TEXT_MATCH = DEFAULT_PERCENT_SELECT_FOR_TEXT_MATCH;
-  public Double PERCENT_SELECT_FOR_RANGE = DEAFULT_PERCENT_SELECT_FOR_RANGE;
-  public Double PERCENT_SELECT_FOR_REGEX = DEAFULT_PERCENT_SELECT_FOR_REGEX;
+  public Double PERCENT_SELECT_FOR_RANGE = DEFAULT_PERCENT_SELECT_FOR_RANGE;
+  public Double PERCENT_SELECT_FOR_REGEX = DEFAULT_PERCENT_SELECT_FOR_REGEX;
   public Double PERCENT_SELECT_FOR_ISNULL = DEFAULT_PERCENT_SELECT_FOR_ISNULL;
 
 
@@ -101,7 +101,7 @@ public class InvertedSortedIndexJointRuleParams {
     return THRESHOLD_RATIO_MIN_AND_PREDICATE_TOP_CANDIDATES;
   }
 
-  @JsonSetter(value = "THRESHOLD_RATIO_MIN_AND_PREDICATE_TOP_VOTES", nulls = Nulls.SKIP)
+  @JsonSetter(value = "THRESHOLD_RATIO_MIN_AND_PREDICATE_TOP_CANDIDATES", nulls = Nulls.SKIP)
   public void setTHRESHOLD_RATIO_MIN_AND_PREDICATE_TOP_CANDIDATES(Double THRESHOLD_RATIO_MIN_AND_PREDICATE_TOP_CANDIDATES) {
     this.THRESHOLD_RATIO_MIN_AND_PREDICATE_TOP_CANDIDATES = THRESHOLD_RATIO_MIN_AND_PREDICATE_TOP_CANDIDATES;
   }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/io/params/PartitionRuleParams.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/io/params/PartitionRuleParams.java
index df261c2..0d47628 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/io/params/PartitionRuleParams.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/io/params/PartitionRuleParams.java
@@ -54,7 +54,7 @@ public class PartitionRuleParams {
     return THRESHOLD_MAX_IN_LENGTH;
   }
 
-  @JsonSetter(value = "THRESHOLD_RATIO_MAX_IN_LENGTH", nulls = Nulls.SKIP)
+  @JsonSetter(value = "THRESHOLD_MAX_IN_LENGTH", nulls = Nulls.SKIP)
   public void setTHRESHOLD_MAX_IN_LENGTH(Integer THRESHOLD_MAX_IN_LENGTH) {
     this.THRESHOLD_MAX_IN_LENGTH = THRESHOLD_MAX_IN_LENGTH;
   }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/io/params/RecommenderConstants.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/io/params/RecommenderConstants.java
index 0661136..4a355b9 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/io/params/RecommenderConstants.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/io/params/RecommenderConstants.java
@@ -26,13 +26,13 @@ public class RecommenderConstants {
   public static class InvertedSortedIndexJointRule {
     public static final double DEFAULT_PERCENT_SELECT_FOR_FUNCTION = 0.5d;
     public static final double DEFAULT_PERCENT_SELECT_FOR_TEXT_MATCH = 0.5d;
-    public static final double DEAFULT_PERCENT_SELECT_FOR_RANGE = 0.5d;
-    public static final double DEAFULT_PERCENT_SELECT_FOR_REGEX = 0.5d;
+    public static final double DEFAULT_PERCENT_SELECT_FOR_RANGE = 0.5d;
+    public static final double DEFAULT_PERCENT_SELECT_FOR_REGEX = 0.5d;
     public static final double DEFAULT_PERCENT_SELECT_FOR_ISNULL = 0.5d;
     public static final double DEFAULT_THRESHOLD_MIN_AND_PREDICATE_INCREMENTAL_VOTE = 0.6d;
     public static final double DEFAULT_THRESHOLD_RATIO_MIN_AND_PREDICATE_TOP_CANDIDATES = 0.8d;
     public static final double DEFAULT_THRESHOLD_RATIO_MIN_GAIN_DIFF_BETWEEN_ITERATION = 0.05d;
-    public static final int DEFAULT_MAX_NUM_ITERATION_WITHOUT_GAIN = 3;
+    public static final int DEFAULT_MAX_NUM_ITERATION_WITHOUT_GAIN = 2;
     public static final double DEFAULT_THRESHOLD_RATIO_MIN_NESI_FOR_TOP_CANDIDATES = 0.7d;
   }
 
@@ -47,7 +47,7 @@ public class RecommenderConstants {
   }
 
   public static class PartitionRule {
-    public static final int DEFAULT_NUM_PARTITIONS = 0;
+    public static final int DEFAULT_NUM_PARTITIONS = 1;
 
     public static final long DEFAULT_THRESHOLD_MAX_LATENCY_SLA_PARTITION = 1000;
     public static final long DEFAULT_THRESHOLD_MIN_QPS_PARTITION = 200;
@@ -93,6 +93,10 @@ public class RecommenderConstants {
   public static final double EPSILON = 0.0001d; // used for double value comparison, margin of error
   public static final int DEFAULT_NUM_KAFKA_PARTITIONS = 0;
   public static final int DEFAULT_SEGMENT_FLUSH_TIME = 86400;
+  public static final long DEFAULT_QPS = 5;
+  public static final long DEFAULT_NUM_MESSAGES_PER_SEC_IN_KAFKA_TOPIC = 250;
+  public static final long DEFAULT_NUM_RECORDS_PER_PUSH = 10000;
+  public static final long DEFAULT_LATENCY_SLA = 500;
 
   public static final int NO_SUCH_COL = -1; // No such colname in colName to ID mapping
 
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/utils/QueryInvertedSortedIndexRecommender.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/utils/QueryInvertedSortedIndexRecommender.java
index fc52528..d6612b2 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/utils/QueryInvertedSortedIndexRecommender.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/utils/QueryInvertedSortedIndexRecommender.java
@@ -18,15 +18,6 @@
  */
 package org.apache.pinot.controller.recommender.rules.utils;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.stream.Collectors;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pinot.controller.recommender.io.InputManager;
 import org.apache.pinot.controller.recommender.rules.io.configs.IndexConfig;
@@ -43,9 +34,10 @@ import org.apache.pinot.core.query.request.context.predicate.Predicate;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.pinot.controller.recommender.rules.utils.PredicateParseResult.NESI_ZERO;
-import static org.apache.pinot.controller.recommender.rules.utils.PredicateParseResult.PERCENT_SELECT_ALL;
-import static org.apache.pinot.controller.recommender.rules.utils.PredicateParseResult.PERCENT_SELECT_ZERO;
+import java.util.*;
+import java.util.stream.Collectors;
+
+import static org.apache.pinot.controller.recommender.rules.utils.PredicateParseResult.*;
 
 
 /**
@@ -220,7 +212,7 @@ public class QueryInvertedSortedIndexRecommender {
         double threshold =
             _params.THRESHOLD_RATIO_MIN_AND_PREDICATE_TOP_CANDIDATES * (totalNESI - totalNESIWithIdxSorted.get(0)
                 .getLeft());
-        LOGGER.debug("threshold {}",threshold);
+        LOGGER.debug("threshold {}", threshold);
         for (Pair<Double, PredicateParseResult> pair : totalNESIWithIdxSorted) {
           if ((totalNESI - pair.getLeft()) >= threshold) { // TOP candidates
             ret.add(PredicateParseResult.PredicateParseResultBuilder.aPredicateParseResult()
@@ -249,8 +241,8 @@ public class QueryInvertedSortedIndexRecommender {
 
           nESIWithIdx += previousPair.getRight().getnESIWithIdx();
           percentForCandidates *= previousPair.getRight().getPercentSelected();
-          LOGGER.debug("childResults {}",childResults);
-          LOGGER.debug("nESIWithIdx {}",nESIWithIdx);
+          LOGGER.debug("childResults {}", childResults);
+          LOGGER.debug("nESIWithIdx {}", nESIWithIdx);
 
           double tmpTotalNESIWithIdx = nESIWithIdx;
           double tmpPercentSelected = percentForCandidates;
@@ -258,7 +250,7 @@ public class QueryInvertedSortedIndexRecommender {
             tmpTotalNESIWithIdx += childResult.getnESI() * tmpPercentSelected;
             tmpPercentSelected *= childResult.getPercentSelected();
           }
-          LOGGER.debug("tmpTotalNESIWithIdx {}",tmpTotalNESIWithIdx);
+          LOGGER.debug("tmpTotalNESIWithIdx {}", tmpTotalNESIWithIdx);
           if (previousTotalNESIWithIdx - tmpTotalNESIWithIdx > _params.THRESHOLD_MIN_AND_PREDICATE_INCREMENTAL_VOTE) {
             ret.add(
                 PredicateParseResult.PredicateParseResultBuilder.aPredicateParseResult().setCandidateDims(candidateDims)
@@ -538,11 +530,7 @@ public class QueryInvertedSortedIndexRecommender {
     // Not a valid dimension name
     else if (!_inputManager.isDim(colName)) {
       LOGGER.error("Error: Column {} should not appear in filter", colName);
-      return PredicateParseResult.PredicateParseResultBuilder.aPredicateParseResult()
-          .setCandidateDims(FixedLenBitset.IMMUTABLE_EMPTY_SET)
-          .setIteratorEvalPriorityEnum(IteratorEvalPriorityEnum.SCAN)
-          .setRecommendationPriorityEnum(RecommendationPriorityEnum.NON_CANDIDATE_SCAN).setnESI(numValuesPerEntry)
-          .setPercentSelected(_params.PERCENT_SELECT_FOR_RANGE).setnESIWithIdx(numValuesPerEntry).build();
+      return null;
     }
     // e.g. a > 10 / b between 1 and 10
     else if (type == Predicate.Type.RANGE) {
@@ -584,10 +572,15 @@ public class QueryInvertedSortedIndexRecommender {
       int numValuesSelected;
 
       boolean isFirst = false;
-      List<String> values = (type == Predicate.Type.IN)?((InPredicate) predicate).getValues():((NotInPredicate) predicate).getValues();
-      if (values.get(RecommenderConstants.FIRST).equals(RecommenderConstants.IN_PREDICATE_ESTIMATE_LEN_FLAG) ||
-          (isFirst=values.get(RecommenderConstants.SECOND).equals(RecommenderConstants.IN_PREDICATE_ESTIMATE_LEN_FLAG))){
-        numValuesSelected = Integer.parseInt(values.get(isFirst? RecommenderConstants.FIRST: RecommenderConstants.SECOND));
+      List<String> values = (type == Predicate.Type.IN) ? ((InPredicate) predicate).getValues()
+          : ((NotInPredicate) predicate).getValues();
+      if (values.size() == 1) {
+        numValuesSelected = 1;
+      } else if (values.get(RecommenderConstants.FIRST).equals(RecommenderConstants.IN_PREDICATE_ESTIMATE_LEN_FLAG) || (
+          isFirst =
+              values.get(RecommenderConstants.SECOND).equals(RecommenderConstants.IN_PREDICATE_ESTIMATE_LEN_FLAG))) {
+        numValuesSelected =
+            Integer.parseInt(values.get(isFirst ? RecommenderConstants.FIRST : RecommenderConstants.SECOND));
       } else {
         numValuesSelected = values.size();
       }
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/recommender/TestConfigEngine.java b/pinot-controller/src/test/java/org/apache/pinot/controller/recommender/TestConfigEngine.java
index 30a4e7f..bd0703f 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/recommender/TestConfigEngine.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/recommender/TestConfigEngine.java
@@ -111,7 +111,16 @@ public class TestConfigEngine {
         RulesToExecute.RuleFactory.getRule(RulesToExecute.Rule.InvertedSortedIndexJointRule, _input, output);
     abstractRule.run();
     Assert.assertEquals(output.getIndexConfig().getInvertedIndexColumns().toString(), "[e, f, j]");
-    Assert.assertEquals(output.getIndexConfig().getSortedColumn().toString(), "[c]");
+    Assert.assertEquals(output.getIndexConfig().getSortedColumn(), "c");
+  }
+
+  @Test
+  void testEngineEmptyQueries()
+      throws InvalidInputException, IOException {
+    URL resourceUrl = getClass().getClassLoader().getResource("recommenderInput/EmptyQueriesInput.json");
+    File file = new File(resourceUrl.getFile());
+    String input = FileUtils.readFileToString(file, StandardCharsets.UTF_8);
+    RecommenderDriver.run(input);
   }
 
   @Test
diff --git a/pinot-controller/src/test/resources/recommenderInput/BloomFilterInput.json b/pinot-controller/src/test/resources/recommenderInput/BloomFilterInput.json
index f84f819..ee38092 100644
--- a/pinot-controller/src/test/resources/recommenderInput/BloomFilterInput.json
+++ b/pinot-controller/src/test/resources/recommenderInput/BloomFilterInput.json
@@ -128,7 +128,7 @@
     "select f from tableName where a in (1,2,3) and b in ('#VALUES',4) and c in ('#VALUES',5) and t between 1 and 1000": 2
   },
   "qps": 250,
-  "numMessagesPerSecInKafKaTopic":1000,
+  "numMessagesPerSecInKafkaTopic":1000,
   "numRecordsPerPush":1000000000,
   "tableType": "HYBRID",
   "latencySLA": 500
diff --git a/pinot-controller/src/test/resources/recommenderInput/DataSizeCalculationInput.json b/pinot-controller/src/test/resources/recommenderInput/DataSizeCalculationInput.json
index c89ed5a..f1f083a 100644
--- a/pinot-controller/src/test/resources/recommenderInput/DataSizeCalculationInput.json
+++ b/pinot-controller/src/test/resources/recommenderInput/DataSizeCalculationInput.json
@@ -56,7 +56,7 @@
   },
 
   "qps": 15000,
-  "numMessagesPerSecInKafKaTopic":1000,
+  "numMessagesPerSecInKafkaTopic":1000,
   "numRecordsPerPush":1000000000,
   "tableType": "HYBRID",
   "latencySLA": 500,
diff --git a/pinot-controller/src/test/resources/recommenderInput/KafkaPartitionRuleInput.json b/pinot-controller/src/test/resources/recommenderInput/EmptyQueriesInput.json
similarity index 53%
copy from pinot-controller/src/test/resources/recommenderInput/KafkaPartitionRuleInput.json
copy to pinot-controller/src/test/resources/recommenderInput/EmptyQueriesInput.json
index b3ce1fe..d16e8f6 100644
--- a/pinot-controller/src/test/resources/recommenderInput/KafkaPartitionRuleInput.json
+++ b/pinot-controller/src/test/resources/recommenderInput/EmptyQueriesInput.json
@@ -4,19 +4,19 @@
     "dimensionFieldSpecs": [
       {
         "name": "a",
-        "dataType": "INT",
-        "cardinality":1000001,
+        "dataType": "STRING",
+        "cardinality":20,
         "numValuesPerEntry":1
       },
       {
         "name": "b",
-        "dataType": "DOUBLE",
+        "dataType": "BYTES",
         "cardinality":6,
-        "numValuesPerEntry":1.5
+        "numValuesPerEntry":1
       },
       {
         "name": "c",
-        "dataType": "FLOAT",
+        "dataType": "BYTES",
         "cardinality":7,
         "numValuesPerEntry":1
       },
@@ -32,39 +32,6 @@
         "dataType": "LONG",
         "cardinality":18,
         "numValuesPerEntry":4
-      },
-      {
-        "name": "f",
-        "dataType": "DOUBLE",
-        "cardinality":13,
-        "numValuesPerEntry":3
-      },
-      {
-        "name": "g",
-        "dataType": "STRING",
-        "cardinality":6,
-        "numValuesPerEntry":2,
-        "averageLength" : 100
-      },
-      {
-        "name": "h",
-        "dataType": "BYTES",
-        "cardinality":12,
-        "numValuesPerEntry":1,
-        "averageLength" : 10
-      },
-      {
-        "name": "i",
-        "dataType": "STRING",
-        "cardinality":7,
-        "numValuesPerEntry":1,
-        "averageLength" : 25
-      },
-      {
-        "name": "j",
-        "dataType": "DOUBLE",
-        "cardinality":4,
-        "numValuesPerEntry":1
       }
     ],
     "metricFieldSpecs": [
@@ -88,25 +55,6 @@
         "cardinality":10000,
         "numValuesPerEntry":1,
         "averageLength" : 25
-      },
-      {
-        "name": "n",
-        "dataType": "DOUBLE",
-        "cardinality":10000,
-        "numValuesPerEntry":1
-      },
-      {
-        "name": "o",
-        "dataType": "DOUBLE",
-        "cardinality":10000,
-        "numValuesPerEntry":1,
-        "averageLength" : 25
-      },
-      {
-        "name": "p",
-        "dataType": "DOUBLE",
-        "cardinality":10000,
-        "numValuesPerEntry":1
       }
     ],
     "timeFieldSpec": {
@@ -119,8 +67,9 @@
       }
     }
   },
-
-  "numMessagesPerSecInKafKaTopic":1000,
+  "qps": 250,
+  "numKafkaPartitions": 32,
+  "numRecordsPerPush":10000000,
   "tableType": "HYBRID",
-  "numRecordsPerPush":1000000000
+  "latencySLA": 500
 }
diff --git a/pinot-controller/src/test/resources/recommenderInput/KafkaPartitionRuleInput.json b/pinot-controller/src/test/resources/recommenderInput/KafkaPartitionRuleInput.json
index b3ce1fe..4bf697f 100644
--- a/pinot-controller/src/test/resources/recommenderInput/KafkaPartitionRuleInput.json
+++ b/pinot-controller/src/test/resources/recommenderInput/KafkaPartitionRuleInput.json
@@ -120,7 +120,7 @@
     }
   },
 
-  "numMessagesPerSecInKafKaTopic":1000,
+  "numMessagesPerSecInKafkaTopic":1000,
   "tableType": "HYBRID",
   "numRecordsPerPush":1000000000
 }
diff --git a/pinot-controller/src/test/resources/recommenderInput/KafkaPartitionRuleInput2.json b/pinot-controller/src/test/resources/recommenderInput/KafkaPartitionRuleInput2.json
index 8ac340c..e8f370f 100644
--- a/pinot-controller/src/test/resources/recommenderInput/KafkaPartitionRuleInput2.json
+++ b/pinot-controller/src/test/resources/recommenderInput/KafkaPartitionRuleInput2.json
@@ -120,7 +120,7 @@
     }
   },
   "numKafkaPartitions": 16,
-  "numMessagesPerSecInKafKaTopic": 1000,
+  "numMessagesPerSecInKafkaTopic": 1000,
   "tableType": "HYBRID",
   "numRecordsPerPush":1000000000
 }
diff --git a/pinot-controller/src/test/resources/recommenderInput/NoDictionaryOnHeapDictionaryJointRuleInput.json b/pinot-controller/src/test/resources/recommenderInput/NoDictionaryOnHeapDictionaryJointRuleInput.json
index 1492634..80218b7 100644
--- a/pinot-controller/src/test/resources/recommenderInput/NoDictionaryOnHeapDictionaryJointRuleInput.json
+++ b/pinot-controller/src/test/resources/recommenderInput/NoDictionaryOnHeapDictionaryJointRuleInput.json
@@ -128,7 +128,7 @@
     "select m, n from tableName where f = 'qwerty' ": 1
   },
   "qps": 15000,
-  "numMessagesPerSecInKafKaTopic":1000,
+  "numMessagesPerSecInKafkaTopic":1000,
   "numRecordsPerPush":1000000000,
   "tableType": "HYBRID",
   "latencySLA": 500,
diff --git a/pinot-controller/src/test/resources/recommenderInput/SortedInvertedIndexInput.json b/pinot-controller/src/test/resources/recommenderInput/SortedInvertedIndexInput.json
index 91152ec..190e4ef 100644
--- a/pinot-controller/src/test/resources/recommenderInput/SortedInvertedIndexInput.json
+++ b/pinot-controller/src/test/resources/recommenderInput/SortedInvertedIndexInput.json
@@ -128,7 +128,7 @@
     "select f from tableName where t between 1 and 1000": 2
   },
   "qps": 15000,
-  "numMessagesPerSecInKafKaTopic":1000,
+  "numMessagesPerSecInKafkaTopic":1000,
   "numRecordsPerPush":1000000000,
   "tableType": "HYBRID",
   "latencySLA": 500,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org