You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by si...@apache.org on 2021/05/11 23:12:51 UTC

[incubator-pinot] branch master updated: Add segment size rule to Recommendation Engine (#6869)

This is an automated email from the ASF dual-hosted git repository.

siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new d76d5e2  Add segment size rule to Recommendation Engine (#6869)
d76d5e2 is described below

commit d76d5e26d961f21135420ff977d32ee69197b8cc
Author: Sajjad Moradi <mo...@gmail.com>
AuthorDate: Tue May 11 16:12:32 2021 -0700

    Add segment size rule to Recommendation Engine (#6869)
    
    * Add segment size rule to Rule Engine
    
    * Fix a small issue
    
    * Remove unused import
    
    * Fix license header for a new file
    
    * Update a comment
    
    * Adjust PartitionRule to not throw NPE for RT only tables
    
    * Add an option to provide actual segment size & numRows
    
    * Addressed Sidd's comments
    
    * Apply PR's comments
    
    * Remove unused imports
    
    * Apply latest comments
    
    * mvn spotless:apply
---
 .../controller/recommender/RecommenderDriver.java  |  31 +++-
 .../data/generator/StringGenerator.java            |  26 ++--
 .../controller/recommender/io/ConfigManager.java   |  11 ++
 .../controller/recommender/io/InputManager.java    |  92 +++---------
 .../controller/recommender/rules/AbstractRule.java |   7 +
 .../recommender/rules/RulesToExecute.java          |  16 ++-
 .../rules/impl/PinotTablePartitionRule.java        |  66 +++++----
 .../rules/impl/RealtimeProvisioningRule.java       |   2 +-
 .../recommender/rules/impl/SegmentSizeRule.java    | 160 +++++++++++++++++++++
 .../io/configs/SegmentSizeRecommendations.java     |  84 +++++++++++
 .../rules/io/params/PartitionRuleParams.java       |  12 --
 .../io/params/RealtimeProvisioningRuleParams.java  |  11 ++
 .../rules/io/params/RecommenderConstants.java      |  11 +-
 .../rules/io/params/SegmentSizeRuleParams.java     |  81 +++++++++++
 .../controller/recommender/TestConfigEngine.java   |  45 +++++-
 .../rules/impl/PinotTablePartitionRuleTest.java    |  75 ++++++++++
 .../rules/impl/SegmentSizeRuleTest.java            |  89 ++++++++++++
 .../recommenderInput/BloomFilterInput.json         |   7 +-
 .../recommenderInput/DataSizeCalculationInput.json |   2 +
 .../recommenderInput/EmptyQueriesInput.json        |   4 +-
 .../resources/recommenderInput/FlagQueryInput.json |   7 +-
 .../resources/recommenderInput/InvalidInput1.json  |   5 +-
 .../resources/recommenderInput/InvalidInput2.json  |   4 +-
 .../recommenderInput/KafkaPartitionRuleInput.json  |   4 +-
 .../recommenderInput/KafkaPartitionRuleInput2.json |   7 +-
 ...NoDictionaryOnHeapDictionaryJointRuleInput.json |   5 +
 .../PinotTablePartitionRuleInput.json              |   7 +-
 .../RealtimeProvisioningInput_dateTimeColumn.json  |   5 +-
 .../RealtimeProvisioningInput_timeColumn.json      |   4 +-
 ...teTimeColumn.json => SegmentSizeRuleInput.json} |  21 ++-
 ...mentSizeRuleInput_noNeedToGenerateSegment.json} |  22 ++-
 ...=> SegmentSizeRuleInput_realtimeOnlyTable.json} |  40 ++----
 ...ut_ruleIsDisableButItNeedsToBeSilentlyRun.json} |  38 ++---
 .../recommenderInput/SortedInvertedIndexInput.json |   7 +-
 .../VariedLengthDictionaryInput.json               |   2 +-
 35 files changed, 770 insertions(+), 240 deletions(-)

diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/RecommenderDriver.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/RecommenderDriver.java
index f1a71ff..1be6059 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/RecommenderDriver.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/RecommenderDriver.java
@@ -24,6 +24,8 @@ import com.fasterxml.jackson.databind.SerializationFeature;
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
 import org.apache.pinot.controller.recommender.exceptions.InvalidInputException;
 import org.apache.pinot.controller.recommender.io.ConfigManager;
 import org.apache.pinot.controller.recommender.io.InputManager;
@@ -54,27 +56,44 @@ public class RecommenderDriver {
     inputManager.init();
     outputManager = inputManager.getOverWrittenConfigs();
 
-    for (RulesToExecute.Rule value : RulesToExecute.Rule.values()) {
+    // silent rules will run, but their output will only be used in other rules and it will not be present to user
+    List<AbstractRule> silentRules = new ArrayList<>();
+
+    for (RulesToExecute.Rule rule : RulesToExecute.Rule.values()) {
       try {
         Method ruleExecuteFlag =
-            inputManager.getRulesToExecute().getClass().getDeclaredMethod(RULE_EXECUTION_PREFIX + value.name().replace(RULE_EXECUTION_SUFFIX,""));
+            inputManager.getRulesToExecute().getClass().getDeclaredMethod(RULE_EXECUTION_PREFIX + rule.name().replace(RULE_EXECUTION_SUFFIX,""));
         LOGGER.info("{}:{}", ruleExecuteFlag.getName(), ruleExecuteFlag.invoke(inputManager.getRulesToExecute()));
-        if (!(boolean) ruleExecuteFlag.invoke(inputManager.getRulesToExecute())) {
-          continue;
+        boolean shouldRun = (boolean) ruleExecuteFlag.invoke(inputManager.getRulesToExecute());
+        boolean shouldSilentlyRun = false;
+        if (!shouldRun) {
+          shouldSilentlyRun = shouldSilentlyRun(rule, inputManager);
+          if (!shouldSilentlyRun) {
+            continue;
+          }
         }
-        AbstractRule abstractRule = RulesToExecute.RuleFactory.getRule(value, inputManager, outputManager);
+        AbstractRule abstractRule = RulesToExecute.RuleFactory.getRule(rule, inputManager, outputManager);
         if (abstractRule != null) {
           abstractRule.run();
+          if (shouldSilentlyRun) {
+            silentRules.add(abstractRule);
+          }
         }
       } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
-        LOGGER.error("Error while executing strategy:{}", value, e);
+        LOGGER.error("Error while executing strategy:{}", rule, e);
       }
     }
     try {
+      silentRules.forEach(AbstractRule::hideOutput);
       return objectMapper.writeValueAsString(outputManager);
     } catch (JsonProcessingException e) {
       LOGGER.error("Error while writing the output json string! Stack trace:", e);
     }
     return "";
   }
+
+  private static boolean shouldSilentlyRun(RulesToExecute.Rule rule, InputManager input) {
+    return rule == RulesToExecute.Rule.SegmentSizeRule &&
+        (input.getTableType().equalsIgnoreCase("OFFLINE") || input.getTableType().equalsIgnoreCase("HYBRID"));
+  }
 }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/generator/StringGenerator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/generator/StringGenerator.java
index e3870c6..c493ec0 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/generator/StringGenerator.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/generator/StringGenerator.java
@@ -19,12 +19,9 @@
 package org.apache.pinot.controller.recommender.data.generator;
 
 import com.google.common.base.Preconditions;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
 import java.util.Random;
-import java.util.Set;
 import org.apache.commons.lang.RandomStringUtils;
+import org.apache.commons.lang.StringUtils;
 
 
 /**
@@ -38,30 +35,28 @@ public class StringGenerator implements Generator {
   private final int cardinality;
   private final Random rand;
   private final double numberOfValuesPerEntry;
-  private final int lengthOfEachString;
 
-  private List<String> vals;
+  private final String initialValue;
+  private final int counterLength;
   private int counter = 0;
 
   public StringGenerator(Integer cardinality, Double numberOfValuesPerEntry, Integer lengthOfEachString) {
     this.cardinality = cardinality;
     this.numberOfValuesPerEntry =
         numberOfValuesPerEntry != null ? numberOfValuesPerEntry : DEFAULT_NUMBER_OF_VALUES_PER_ENTRY;
-    this.lengthOfEachString = lengthOfEachString != null ? lengthOfEachString : DEFAULT_LENGTH_OF_EACH_STRING;
+    lengthOfEachString = lengthOfEachString != null ? lengthOfEachString : DEFAULT_LENGTH_OF_EACH_STRING;
     Preconditions.checkState(this.numberOfValuesPerEntry >= 1,
         "Number of values per entry (should be >= 1): " + this.numberOfValuesPerEntry);
+    counterLength = String.valueOf(this.cardinality).length();
+    int initValueSize = lengthOfEachString - counterLength;
+    Preconditions.checkState(initValueSize >= 0,
+        String.format("Cannot generate %d unique string with length %d", this.cardinality, lengthOfEachString));
+    initialValue = RandomStringUtils.randomAlphabetic(initValueSize);
     rand = new Random(System.currentTimeMillis());
   }
 
   @Override
   public void init() {
-    final Set<String> uniqueStrings = new HashSet<>();
-    for (int i = 0; i < cardinality; i++) {
-      while (!uniqueStrings.add(RandomStringUtils.randomAlphabetic(lengthOfEachString))) {
-        uniqueStrings.add(RandomStringUtils.randomAlphabetic(lengthOfEachString));
-      }
-    }
-    vals = new ArrayList<>(uniqueStrings);
   }
 
   @Override
@@ -76,7 +71,8 @@ public class StringGenerator implements Generator {
     if (counter == cardinality) {
       counter = 0;
     }
-    return vals.get(counter++);
+    counter++;
+    return initialValue + StringUtils.leftPad(String.valueOf(counter), counterLength, '0');
   }
 
   public static void main(String[] args) {
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/io/ConfigManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/io/ConfigManager.java
index 1e43893..faafafe 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/io/ConfigManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/io/ConfigManager.java
@@ -25,6 +25,7 @@ import java.util.Map;
 import org.apache.pinot.controller.recommender.rules.io.FlaggedQueries;
 import org.apache.pinot.controller.recommender.rules.io.configs.IndexConfig;
 import org.apache.pinot.controller.recommender.rules.io.configs.PartitionConfig;
+import org.apache.pinot.controller.recommender.rules.io.configs.SegmentSizeRecommendations;
 
 
 /**
@@ -37,6 +38,7 @@ import org.apache.pinot.controller.recommender.rules.io.configs.PartitionConfig;
  * The engine will do it's job of recommending by taking into account the overwritten config and honoring it.
  */
 public class ConfigManager {
+  SegmentSizeRecommendations _segmentSizeRecommendations;
   IndexConfig _indexConfig = new IndexConfig();
   PartitionConfig _partitionConfig = new PartitionConfig();
   FlaggedQueries _flaggedQueries = new FlaggedQueries();
@@ -44,6 +46,11 @@ public class ConfigManager {
   boolean _aggregateMetrics = false;
 
   @JsonSetter(nulls = Nulls.SKIP)
+  public void setSegmentSizeRecommendations(SegmentSizeRecommendations segmentSizeRecommendations) {
+    _segmentSizeRecommendations = segmentSizeRecommendations;
+  }
+
+  @JsonSetter(nulls = Nulls.SKIP)
   public void setIndexConfig(IndexConfig indexConfig) {
     _indexConfig = indexConfig;
   }
@@ -69,6 +76,10 @@ public class ConfigManager {
     _aggregateMetrics = aggregateMetrics;
   }
 
+  public SegmentSizeRecommendations getSegmentSizeRecommendations() {
+    return _segmentSizeRecommendations;
+  }
+
   public IndexConfig getIndexConfig() {
     return _indexConfig;
   }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/io/InputManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/io/InputManager.java
index 064e44f..2635744 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/io/InputManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/io/InputManager.java
@@ -47,6 +47,7 @@ import org.apache.pinot.controller.recommender.rules.io.params.InvertedSortedInd
 import org.apache.pinot.controller.recommender.rules.io.params.NoDictionaryOnHeapDictionaryJointRuleParams;
 import org.apache.pinot.controller.recommender.rules.io.params.PartitionRuleParams;
 import org.apache.pinot.controller.recommender.rules.io.params.RealtimeProvisioningRuleParams;
+import org.apache.pinot.controller.recommender.rules.io.params.SegmentSizeRuleParams;
 import org.apache.pinot.controller.recommender.rules.utils.FixedLenBitset;
 import org.apache.pinot.core.query.optimizer.QueryOptimizer;
 import org.apache.pinot.core.query.request.context.QueryContext;
@@ -63,7 +64,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import static java.lang.Math.max;
-import static java.lang.Math.pow;
 import static org.apache.pinot.controller.recommender.rules.io.params.RecommenderConstants.*;
 import static org.apache.pinot.controller.recommender.rules.io.params.RecommenderConstants.FlagQueryRuleParams.ERROR_INVALID_QUERY;
 
@@ -96,11 +96,6 @@ public class InputManager {
   // (consuming segments -> online segments)
   public Integer _segmentFlushTime = DEFAULT_SEGMENT_FLUSH_TIME;
 
-  // If the cardinality given by the customer is the global cardinality for the dataset (even potential data)
-  // If true, the cardinality will be regulated see regulateCardinality()
-  // TODO: Set to dsiabled for now, will discuss this in the next PR
-  public boolean _useCardinalityNormalization = DEFAULT_USE_CARDINALITY_NORMALIZATION;
-
   // The parameters of rules
   public PartitionRuleParams _partitionRuleParams = new PartitionRuleParams();
   public InvertedSortedIndexJointRuleParams _invertedSortedIndexJointRuleParams =
@@ -110,6 +105,7 @@ public class InputManager {
       new NoDictionaryOnHeapDictionaryJointRuleParams();
   public FlagQueryRuleParams _flagQueryRuleParams = new FlagQueryRuleParams();
   public RealtimeProvisioningRuleParams _realtimeProvisioningRuleParams;
+  public SegmentSizeRuleParams _segmentSizeRuleParams = new SegmentSizeRuleParams();
 
   // For forward compatibility: 1. dev/sre to overwrite field(s) 2. incremental recommendation on existing/staging tables
   public ConfigManager _overWrittenConfigs = new ConfigManager();
@@ -149,23 +145,16 @@ public class InputManager {
     reorderDimsAndBuildMap();
     registerColNameFieldType();
     validateQueries();
-    if (_useCardinalityNormalization) {
-      regulateCardinalityForAll();
-    }
   }
 
-  private void regulateCardinalityForAll() {
-    double sampleSize;
-    if (getTableType().equalsIgnoreCase(REALTIME)) {
-      sampleSize = getSegmentFlushTime() * getNumMessagesPerSecInKafkaTopic();
-    } else {
-      sampleSize = getNumRecordsPerPush();
-    }
-
+  /**
+   * Cardinalities provided by users are relative to number of records per push, but we might end up creating multiple
+   * segments for each push. Using this methods, cardinalities will be capped by the provided number of rows in segment.
+   */
+  public void capCardinalities(int numRecordsInSegment) {
     _metaDataMap.keySet().forEach(colName -> {
-      int cardinality = _metaDataMap.get(colName).getCardinality();
-      double regulatedCardinality = regulateCardinalityInfinitePopulation(cardinality, sampleSize);
-      _metaDataMap.get(colName).setCardinality((int) Math.round(regulatedCardinality));
+      int cardinality = Math.min(numRecordsInSegment, _metaDataMap.get(colName).getCardinality());
+      _metaDataMap.get(colName).setCardinality(cardinality);
     });
   }
 
@@ -290,11 +279,6 @@ public class InputManager {
   }
 
   @JsonSetter(nulls = Nulls.SKIP)
-  public void setUseCardinalityNormalization(boolean cardinalityGlobal) {
-    _useCardinalityNormalization = cardinalityGlobal;
-  }
-
-  @JsonSetter(nulls = Nulls.SKIP)
   public void setFlagQueryRuleParams(FlagQueryRuleParams flagQueryRuleParams) {
     _flagQueryRuleParams = flagQueryRuleParams;
   }
@@ -402,8 +386,9 @@ public class InputManager {
     _overWrittenConfigs = overWrittenConfigs;
   }
 
-  public boolean isUseCardinalityNormalization() {
-    return _useCardinalityNormalization;
+  @JsonSetter(nulls = Nulls.SKIP)
+  public void setSegmentSizeRuleParams(SegmentSizeRuleParams segmentSizeRuleParams) {
+    _segmentSizeRuleParams = segmentSizeRuleParams;
   }
 
   public Set<String> getParsedQueries() {
@@ -535,6 +520,10 @@ public class InputManager {
     return _overWrittenConfigs;
   }
 
+  public SegmentSizeRuleParams getSegmentSizeRuleParams() {
+    return _segmentSizeRuleParams;
+  }
+
   public long getSizePerRecord() {
     return _sizePerRecord;
   }
@@ -566,55 +555,6 @@ public class InputManager {
   }
 
   /**
-   * Expectation of unique values in sample, E(Cardinality_sample)
-   * This can be used to calculate the cardinality per segment or per push, given the segment/ push size
-   * TODO: a iterative algorithm to recommend no dictionary columns and number of partitions,
-   *       It works like:
-   *          1. Start from a large number of partitions, recommend no index columns,
-   *             use this function to calculate per segment cardinality.
-   *             regulateCardinality(cardinality, segmentSize = pushSize/numPartitions, pushSize)
-   *          2. Then based on no-index result recommend numPartitions
-   *          3. re-recommend the no dictionary columns based on the new numPartitions
-   *       This way we can resolve the dependency between noIndexRecommendation <--> numPartitions
-   * @param cardinality the cardinality of population
-   * @param sampleSize a segment / push of data which is a sample from population
-   * @param populationSize total dataset size (rows), assuming even distribution of data,
-   *                       i.e. each unique value corresponds to population/cardinality rows
-   * @return Expected cardinality of sample
-   */
-  public double regulateCardinality(double cardinality, double sampleSize, double populationSize) {
-    double fpcReciprocal; // reciprocal of Finite Population Correction Factor, used when sampleSize> 0.05*population
-    if (sampleSize / populationSize < THRESHOLD_MIN_USE_FPC) {
-      fpcReciprocal = 1;
-    } else {
-      fpcReciprocal = Math.sqrt((populationSize - 1) / (populationSize - sampleSize));
-    }
-
-    // The probability of not selecting a given value in one sample is p0 = (cardinality - 1)/cardinality
-    // The probability of not selecting a given value in sampleSize samples is p0^sampleSize
-    // The probability of selecting a given value in sampleSize samples is 1 - p0^sampleSize
-    // The expectation of selected values is E(V1 + V2 + V3 + ... + V_cardinality) = E(V1) + E(V2) + ... (linearity of expectation)
-    // E(V1) = E(V2) = ... = E(V_cardinality) due to even distribution
-    // therefore E(V1 + V2 + V3 + ... + V_cardinality) = cardinality * E(V1) = cardinality * 1 * P(V1)
-    // Which is cardinality * (1 - p0^sampleSize) = cardinality * (1-((cardinality - 1) / cardinality)^(sampleSize))
-    return fpcReciprocal * cardinality * (1 - pow(((cardinality - 1) / cardinality), sampleSize));
-  }
-
-  /**
-   * No fix version of the above process, assuming very large population
-   */
-  public double regulateCardinalityInfinitePopulation(double cardinality, double sampleSize) {
-    // The probability of not selecting a given value in one sample is p0 = (cardinality - 1)/cardinality
-    // The probability of not selecting a given value in sampleSize samples is p0^sampleSize
-    // The probability of selecting a given value in sampleSize samples is 1 - p0^sampleSize
-    // The expectation of selected values is E(V1 + V2 + V3 + ... + V_cardinality) = E(V1) + E(V2) + ... (linearity of expectation)
-    // E(V1) = E(V2) = ... = E(V_cardinality) due to even distribution
-    // therefore E(V1 + V2 + V3 + ... + V_cardinality) = cardinality * E(V1) = cardinality * 1 * P(V1)
-    // Which is cardinality * (1 - p0^sampleSize) = cardinality * (1-((cardinality - 1) / cardinality)^(sampleSize))
-    return cardinality * (1 - pow(((cardinality - 1) / cardinality), sampleSize));
-  }
-
-  /**
    * Map a index-applicable dimension name to an 0<=integer<getNumDimsInvertedSortedApplicable,
    * to be used with {@link FixedLenBitset}
    * @param colName a dimension with no overwritten index
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/AbstractRule.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/AbstractRule.java
index 679cd73..b1e3990 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/AbstractRule.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/AbstractRule.java
@@ -35,4 +35,11 @@ public abstract class AbstractRule {
     _input = input;
     _output = output;
   }
+
+  /**
+   * Some rules have to be run even if user has disabled them. The reason is the output of these rules are used in other
+   * rules. This method is used to hide the output from the final result that's going to be presented to the user.
+   */
+  public void hideOutput() {
+  }
 }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/RulesToExecute.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/RulesToExecute.java
index 50d65f8..8980d0a 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/RulesToExecute.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/RulesToExecute.java
@@ -30,6 +30,7 @@ import org.apache.pinot.controller.recommender.rules.impl.KafkaPartitionRule;
 import org.apache.pinot.controller.recommender.rules.impl.NoDictionaryOnHeapDictionaryJointRule;
 import org.apache.pinot.controller.recommender.rules.impl.PinotTablePartitionRule;
 import org.apache.pinot.controller.recommender.rules.impl.RealtimeProvisioningRule;
+import org.apache.pinot.controller.recommender.rules.impl.SegmentSizeRule;
 import org.apache.pinot.controller.recommender.rules.impl.VariedLengthDictionaryRule;
 
 import static org.apache.pinot.controller.recommender.rules.io.params.RecommenderConstants.RulesToExecute.*;
@@ -45,6 +46,8 @@ public class RulesToExecute {
   public static class RuleFactory {
     public static AbstractRule getRule(Rule rule, InputManager inputManager, ConfigManager outputManager) {
       switch (rule) {
+        case SegmentSizeRule:
+          return new SegmentSizeRule(inputManager, outputManager);
         case FlagQueryRule:
           return new FlagQueryRule(inputManager, outputManager);
         case InvertedSortedIndexJointRule:
@@ -69,6 +72,7 @@ public class RulesToExecute {
     }
   }
   // All rules will execute by default unless explicitly specifying "recommendInvertedSortedIndexJoint" = "false"
+  boolean _recommendSegmentSize = DEFAULT_RECOMMEND_SEGMENT_SIZE;
   boolean _recommendKafkaPartition = DEFAULT_RECOMMEND_KAFKA_PARTITION;
   boolean _recommendPinotTablePartition = DEFAULT_RECOMMEND_PINOT_TABLE_PARTITION;
   boolean _recommendInvertedSortedIndexJoint = DEFAULT_RECOMMEND_INVERTED_SORTED_INDEX_JOINT;
@@ -121,7 +125,12 @@ public class RulesToExecute {
 
   @JsonSetter(nulls = Nulls.SKIP)
   public void setRecommendRealtimeProvisioning(boolean recommendRealtimeProvisioning) {
-    _recommendPinotTablePartition = recommendRealtimeProvisioning;
+    _recommendRealtimeProvisioning = recommendRealtimeProvisioning;
+  }
+
+  @JsonSetter(nulls = Nulls.SKIP)
+  public void setRecommendSegmentSize(boolean recommendSegmentSize) {
+    _recommendSegmentSize = recommendSegmentSize;
   }
 
   public boolean isRecommendVariedLengthDictionary() {
@@ -160,9 +169,14 @@ public class RulesToExecute {
     return _recommendRealtimeProvisioning;
   }
 
+  public boolean isRecommendSegmentSize() {
+    return _recommendSegmentSize;
+  }
+
   // Be careful with the sequence, each rule can execute individually
   // but a rule may depend on its previous rule when they both fired
   public enum Rule {
+    SegmentSizeRule, // This rule must be the first rule. It provides segment count, segment size, numRows in segments which are used in other rules. It also adjust cardinality per segment for different columns.
     FlagQueryRule,
     KafkaPartitionRule,
     InvertedSortedIndexJointRule,
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/impl/PinotTablePartitionRule.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/impl/PinotTablePartitionRule.java
index 5900c4d..433d5cd 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/impl/PinotTablePartitionRule.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/impl/PinotTablePartitionRule.java
@@ -18,10 +18,12 @@
  */
 package org.apache.pinot.controller.recommender.rules.impl;
 
+import com.google.common.annotations.VisibleForTesting;
 import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Optional;
+import java.util.function.Function;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.common.request.context.FilterContext;
@@ -63,16 +65,12 @@ public class PinotTablePartitionRule extends AbstractRule {
 
   public PinotTablePartitionRule(InputManager input, ConfigManager output) {
     super(input, output);
-    this._params = input.getPartitionRuleParams();
+    _params = input.getPartitionRuleParams();
   }
 
   @Override
   public void run()
       throws InvalidInputException {
-    //**********Calculate size per record***************/
-    _input.estimateSizePerRecord();
-    //**************************************************/
-
     LOGGER.info("Recommending partition configurations");
 
     if (_input.getQps()
@@ -105,26 +103,23 @@ public class PinotTablePartitionRule extends AbstractRule {
 
     LOGGER.info("*Recommending number of partitions ");
     int numKafkaPartitions = _output.getPartitionConfig().getNumKafkaPartitions();
-    long offLineDataSizePerPush = _input.getNumRecordsPerPush() * _input.getSizePerRecord();
-    int optimalOfflinePartitions = (int) Math.ceil((double) offLineDataSizePerPush / _params.OPTIMAL_SIZE_PER_SEGMENT);
-    if (_input.getTableType().equalsIgnoreCase(REALTIME) || _input.getTableType().equalsIgnoreCase(HYBRID)) {
+    boolean isRealtimeTable = _input.getTableType().equalsIgnoreCase(REALTIME);
+    boolean isHybridTable = _input.getTableType().equalsIgnoreCase(HYBRID);
+    boolean isOfflineTable = _input.getTableType().equalsIgnoreCase(OFFLINE);
+    if (isRealtimeTable || isHybridTable) {
       //real time num of partitions should be the same value as the number of kafka partitions
       if (!_input.getOverWrittenConfigs().getPartitionConfig().isNumPartitionsRealtimeOverwritten()) {
         _output.getPartitionConfig().setNumPartitionsRealtime(numKafkaPartitions);
       }
     }
-    if (_input.getTableType().equalsIgnoreCase(OFFLINE)) {
+    if (isOfflineTable || isHybridTable) {
       //Offline partition num is dependent on the amount of data coming in on a given day.
       //Using a very high value of numPartitions for small dataset size will result in too many small sized segments.
       //We define have a desirable segment size OPTIMAL_SIZE_PER_SEGMENT
       //Divide the size of data coming in on a given day by OPTIMAL_SIZE_PER_SEGMENT we get the number of partitions.
       if (!_input.getOverWrittenConfigs().getPartitionConfig().isNumPartitionsOfflineOverwritten()) {
-        _output.getPartitionConfig().setNumPartitionsOffline((int) (optimalOfflinePartitions));
-      }
-    }
-    if (_input.getTableType().equalsIgnoreCase(HYBRID)) {
-      if (!_input.getOverWrittenConfigs().getPartitionConfig().isNumPartitionsOfflineOverwritten()) {
-        _output.getPartitionConfig().setNumPartitionsOffline(Math.min(optimalOfflinePartitions, numKafkaPartitions));
+        int optimalOfflinePartitions = (int) _output.getSegmentSizeRecommendations().getNumSegments();
+        _output.getPartitionConfig().setNumPartitionsOffline(optimalOfflinePartitions);
       }
     }
 
@@ -146,25 +141,40 @@ public class PinotTablePartitionRule extends AbstractRule {
       }
     });
 
-    List<Pair<String, Double>> columnNameWeightPairRank = new ArrayList<>();
+    List<Pair<String, Double>> columnNameToWeightPairs = new ArrayList<>();
     for (int i = 0; i < _input.getNumDims(); i++) {
       if (weights[i] > 0) {
-        columnNameWeightPairRank.add(Pair.of(_input.intToColName(i), weights[i]));
+        columnNameToWeightPairs.add(Pair.of(_input.intToColName(i), weights[i]));
       }
     }
-    if (columnNameWeightPairRank.isEmpty()) {
+    if (columnNameToWeightPairs.isEmpty()) {
       return;
     }
-    columnNameWeightPairRank.sort((a, b) -> b.getRight().compareTo(a.getRight()));
-    LOGGER.info("**Goodness of column to partition {}", columnNameWeightPairRank);
-    double topCandidatesThreshold =
-        columnNameWeightPairRank.get(0).getRight() * _params.THRESHOLD_RATIO_MIN_DIMENSION_PARTITION_TOP_CANDIDATES;
-    Optional<Pair<String, Double>> max = columnNameWeightPairRank.stream()
-        .filter(columnNameWeightPair -> columnNameWeightPair.getRight() > topCandidatesThreshold)
-        // filter out the dims with frequency < threshold (THRESHOLD_RATIO_DIMENSION_TO_PARTITION_TOP_CANDIDATES * max frequency)
-        .max(Comparator.comparing(columnNameWeightPair -> _input.getCardinality(columnNameWeightPair.getLeft())));
-    // get the dimension with highest cardinality
-    max.ifPresent(stringDoublePair -> _output.getPartitionConfig().setPartitionDimension(stringDoublePair.getLeft()));
+
+    LOGGER.info("**Goodness of column to partition {}", columnNameToWeightPairs);
+    int numPartitions = isOfflineTable || isHybridTable
+        ? _output.getPartitionConfig().getNumPartitionsOffline()
+        : _output.getPartitionConfig().getNumPartitionsRealtime();
+    Optional<String> colNameOpt = findBestColumnForPartitioning(columnNameToWeightPairs, _input::getCardinality,
+        _params.THRESHOLD_RATIO_MIN_DIMENSION_PARTITION_TOP_CANDIDATES, numPartitions);
+    colNameOpt.ifPresent(colName -> _output.getPartitionConfig().setPartitionDimension(colName));
+  }
+
+  @VisibleForTesting
+  static Optional<String> findBestColumnForPartitioning(List<Pair<String, Double>> columnNameToWeightPairs,
+      Function<String, Double> cardinalityExtractor, double topCandidateRatio, int numPartitions) {
+    return columnNameToWeightPairs.stream()
+        .filter(colToWeight -> cardinalityExtractor.apply(colToWeight.getLeft())
+            > numPartitions * PartitionRule.ACCEPTABLE_CARDINALITY_TO_NUM_PARTITIONS_RATIO)
+        .max(Comparator.comparingDouble(Pair::getRight))
+        .map(Pair::getRight)
+        .flatMap(maxWeight -> {
+          double topCandidatesThreshold = maxWeight * topCandidateRatio;
+          return columnNameToWeightPairs.stream()
+              .filter(colToWeight -> colToWeight.getRight() > topCandidatesThreshold)
+              .max(Comparator.comparingDouble(colToWeight -> cardinalityExtractor.apply(colToWeight.getLeft())))
+              .map(Pair::getLeft);
+        });
   }
 
   /**
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/impl/RealtimeProvisioningRule.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/impl/RealtimeProvisioningRule.java
index 5f34fda..469ad4e 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/impl/RealtimeProvisioningRule.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/impl/RealtimeProvisioningRule.java
@@ -84,7 +84,7 @@ public class RealtimeProvisioningRule extends AbstractRule {
         new MemoryEstimator(tableConfig,
             _input.getSchema(),
             _input.getSchemaWithMetadata(),
-            (int) _input.getNumRecordsPerPush(), // TODO we may not want to use numRecordsPerPush as the numRows for the completed segment we are going to generate. A more fine-grained number is needed which we need to figure out how to capture.
+            _params.getNumRowsInGeneratedSegment(),
             ingestionRatePerPartition,
             maxUsableHostMemoryByte,
             _params.getRealtimeTableRetentionHours());
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/impl/SegmentSizeRule.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/impl/SegmentSizeRule.java
new file mode 100644
index 0000000..b507ac7
--- /dev/null
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/impl/SegmentSizeRule.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.controller.recommender.rules.impl;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.io.IOException;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.controller.recommender.exceptions.InvalidInputException;
+import org.apache.pinot.controller.recommender.io.ConfigManager;
+import org.apache.pinot.controller.recommender.io.InputManager;
+import org.apache.pinot.controller.recommender.realtime.provisioning.MemoryEstimator;
+import org.apache.pinot.controller.recommender.rules.AbstractRule;
+import org.apache.pinot.controller.recommender.rules.io.configs.SegmentSizeRecommendations;
+import org.apache.pinot.controller.recommender.rules.io.params.RecommenderConstants;
+import org.apache.pinot.controller.recommender.rules.io.params.SegmentSizeRuleParams;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+
+
+/**
+ * This rule generates a segment based on the provided schema characteristics and then recommends the followings
+ * using the size and number of records in the generated segments:
+ *   - number of segments
+ *   - number of records in each segment
+ *   - size of each segment
+ *
+ * The purpose of generating a segment is to estimate the size and number of rows of one sample segment. In case user
+ * already have a production segment in hand, they can provide actualSegmentSize and numRowsInActualSegment parameters
+ * in the input and then this rule uses those parameters instead of generating a segment to derived those values.
+ * It's worth noting that since this rule gets executed before other rules, we run into chicken-egg problem.
+ *  - Index recommendation, dictionary, bloom filter rules will be run next.
+ *  - Only after the subsequent rules run and the index recommendation is available, will we know what segment might
+ *    look like.
+ * So here we just assume dictionary on all dimensions and raw for metric columns and try to generate a segment quickly.
+ * This also means that the calculated size of the optimal segment should ideally be bumped up by a factor, say 20%, to
+ * account for what the size might look like when all the index/dictionary/bloom recommendations are applied.
+ */
+public class SegmentSizeRule extends AbstractRule {
+
+  static final int MEGA_BYTE = 1024 * 1024;
+
+  public SegmentSizeRule(InputManager input, ConfigManager output) {
+    super(input, output);
+  }
+
+  @Override
+  public void run()
+      throws InvalidInputException {
+
+    if (_input.getTableType().equalsIgnoreCase("REALTIME")) {
+      // no need to estimate segment size & optimal number of segments for realtime only tables;
+      // RT Provisioning Rule will have a comprehensive analysis on that
+      _output.setSegmentSizeRecommendations(new SegmentSizeRecommendations(
+          "Segment sizing for realtime-only tables is done via Realtime Provisioning Rule"));
+      return;
+    }
+
+    long segmentSize;
+    int numRows;
+    SegmentSizeRuleParams segmentSizeRuleParams = _input.getSegmentSizeRuleParams();
+    if (segmentSizeRuleParams.getActualSegmentSizeMB() == RecommenderConstants.SegmentSizeRule.NOT_PROVIDED
+        && segmentSizeRuleParams.getNumRowsInActualSegment() == RecommenderConstants.SegmentSizeRule.NOT_PROVIDED) {
+
+      // generate a segment
+      TableConfig tableConfig = createTableConfig(_input.getSchema());
+      int numRowsInGeneratedSegment = segmentSizeRuleParams.getNumRowsInGeneratedSegment();
+      File generatedSegmentDir =
+          new MemoryEstimator.SegmentGenerator(_input._schemaWithMetaData, _input._schema, tableConfig,
+              numRowsInGeneratedSegment, true).generate();
+      segmentSize = Math.round(FileUtils.sizeOfDirectory(generatedSegmentDir)
+          * RecommenderConstants.SegmentSizeRule.INDEX_OVERHEAD_RATIO_FOR_SEGMENT_SIZE);
+      numRows = numRowsInGeneratedSegment;
+
+      // cleanup
+      try {
+        FileUtils.deleteDirectory(generatedSegmentDir);
+      } catch (IOException e) {
+        throw new RuntimeException("Cannot delete the generated segment directory", e);
+      }
+    } else {
+      segmentSize = segmentSizeRuleParams.getActualSegmentSizeMB() * MEGA_BYTE;
+      numRows = segmentSizeRuleParams.getNumRowsInActualSegment();
+    }
+
+    // estimate optimal segment count & size parameters
+    SegmentSizeRecommendations params =
+        estimate(segmentSize, segmentSizeRuleParams.getDesiredSegmentSizeMB() * MEGA_BYTE, numRows,
+            _input.getNumRecordsPerPush());
+
+    // wire the recommendations and also update the cardinalities in input manager. The updated cardinalities are used in subsequent rules.
+    _output.setSegmentSizeRecommendations(params);
+    _input.capCardinalities((int) params.getNumRowsPerSegment());
+  }
+
+  /**
+   * Estimate segment size parameters by extrapolation based on the number of records and size of the generated segment.
+   * The linear extrapolation used here is not optimal because of columnar way of storing data and usage of different
+   * indices. Another way would be to iteratively generate new segments with expected number of rows until the ideal
+   * segment is found, but that's costly because of the time it takes to generate segments. Although the extrapolation
+   * approach seems to be less accurate, it is chosen due to its performance.
+   *
+   * @param GSS  generated segment size
+   * @param DSS  desired segment size
+   * @param NRGS num records of generated segment
+   * @param NRPP num records per push
+   * @return recommendations on optimal segment count, size, and number of records
+   */
+  @VisibleForTesting
+  SegmentSizeRecommendations estimate(long GSS, int DSS, int NRGS, long NRPP) {
+
+    // calc num rows in desired segment
+    double sizeRatio = (double) DSS / GSS;
+    long numRowsInDesiredSegment = Math.round(NRGS * sizeRatio);
+
+    // calc optimal num segment
+    long optimalNumSegments = Math.round(NRPP / (double) numRowsInDesiredSegment);
+    optimalNumSegments = Math.max(optimalNumSegments, 1);
+
+    // revise optimal num rows in segment
+    long optimalNumRowsInSegment = Math.round(NRPP / (double) optimalNumSegments);
+
+    // calc optimal segment size
+    double rowRatio = (double) optimalNumRowsInSegment / NRGS;
+    long optimalSegmentSize = Math.round(GSS * rowRatio);
+
+    return new SegmentSizeRecommendations(optimalNumRowsInSegment, optimalNumSegments, optimalSegmentSize);
+  }
+
+  private TableConfig createTableConfig(Schema schema) {
+    return new TableConfigBuilder(TableType.OFFLINE)
+        .setTableName(schema.getSchemaName())
+        .setNoDictionaryColumns(schema.getMetricNames())
+        .build();
+  }
+
+  @Override
+  public void hideOutput() {
+    _output.setSegmentSizeRecommendations(null);
+  }
+}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/io/configs/SegmentSizeRecommendations.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/io/configs/SegmentSizeRecommendations.java
new file mode 100644
index 0000000..409e8c6
--- /dev/null
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/io/configs/SegmentSizeRecommendations.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.controller.recommender.rules.io.configs;
+
+import com.fasterxml.jackson.annotation.JsonSetter;
+import com.fasterxml.jackson.annotation.Nulls;
+
+
+/**
+ * The recommendations proposed by SegmentSizeRule
+ */
+public class SegmentSizeRecommendations {
+
+  private long numRowsPerSegment;
+  private long numSegments;
+  private long segmentSize;
+  private String message;
+
+  public SegmentSizeRecommendations(long numRowsPerSegment, long numSegments, long segmentSize) {
+    this.numRowsPerSegment = numRowsPerSegment;
+    this.numSegments = numSegments;
+    this.segmentSize = segmentSize;
+  }
+
+  public SegmentSizeRecommendations(String message) {
+    this.message = message;
+  }
+
+  public SegmentSizeRecommendations() {
+  }
+
+  public long getNumRowsPerSegment() {
+    return numRowsPerSegment;
+  }
+
+  @JsonSetter(nulls = Nulls.SKIP)
+  public void setNumRowsPerSegment(long numRowsPerSegment) {
+    this.numRowsPerSegment = numRowsPerSegment;
+  }
+
+  public long getNumSegments() {
+    return numSegments;
+  }
+
+  @JsonSetter(nulls = Nulls.SKIP)
+  public void setNumSegments(long numSegments) {
+    this.numSegments = numSegments;
+  }
+
+  public long getSegmentSize() {
+    return segmentSize;
+  }
+
+  @JsonSetter(nulls = Nulls.SKIP)
+  public void setSegmentSize(long segmentSize) {
+    this.segmentSize = segmentSize;
+  }
+
+  public String getMessage() {
+    return message;
+  }
+
+  @JsonSetter(nulls = Nulls.SKIP)
+  public void setMessage(String message) {
+    this.message = message;
+  }
+}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/io/params/PartitionRuleParams.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/io/params/PartitionRuleParams.java
index 0d47628..a296252 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/io/params/PartitionRuleParams.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/io/params/PartitionRuleParams.java
@@ -33,9 +33,6 @@ public class PartitionRuleParams {
   // Below this QPS we do not need any partitioning
   public Long THRESHOLD_MIN_QPS_PARTITION = DEFAULT_THRESHOLD_MIN_QPS_PARTITION;
 
-  // The optimal size for an offline segment
-  public Long OPTIMAL_SIZE_PER_SEGMENT = DEFAULT_OPTIMAL_SIZE_PER_SEGMENT;
-
   // In the over all recommendation for partitioning, iff the frequency of top N-th candidate is larger than
   // THRESHOLD_RATIO_MIN_DIMENSION_PARTITION_TOP_CANDIDATES * frequency_of_top_one_candidate,
   // we will pick from [1st, nth] candidates with the largest cardinality as partitioning column
@@ -77,15 +74,6 @@ public class PartitionRuleParams {
     this.THRESHOLD_MIN_QPS_PARTITION = THRESHOLD_MIN_QPS_PARTITION;
   }
 
-  public Long getOPTIMAL_SIZE_PER_SEGMENT() {
-    return OPTIMAL_SIZE_PER_SEGMENT;
-  }
-
-  @JsonSetter(value = "OPTIMAL_SIZE_PER_SEGMENT", nulls = Nulls.SKIP)
-  public void setOPTIMAL_SIZE_PER_SEGMENT(long OPTIMAL_SIZE_PER_SEGMENT) {
-    this.OPTIMAL_SIZE_PER_SEGMENT = OPTIMAL_SIZE_PER_SEGMENT;
-  }
-
   public Double getTHRESHOLD_RATIO_MIN_DIMENSION_PARTITION_TOP_CANDIDATES() {
     return THRESHOLD_RATIO_MIN_DIMENSION_PARTITION_TOP_CANDIDATES;
   }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/io/params/RealtimeProvisioningRuleParams.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/io/params/RealtimeProvisioningRuleParams.java
index b0a40ff..8f150bb 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/io/params/RealtimeProvisioningRuleParams.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/io/params/RealtimeProvisioningRuleParams.java
@@ -48,6 +48,8 @@ public class RealtimeProvisioningRuleParams {
   // Acceptable values for `number of hours` parameter. NumHours represents consumption duration.
   private int[] numHours = RecommenderConstants.RealtimeProvisioningRule.DEFAULT_NUM_HOURS;
 
+  // Number of rows for the segment that is going to be generated
+  private int numRowsInGeneratedSegment = RecommenderConstants.DEFAULT_NUM_ROWS_IN_GENERATED_SEGMENT;
 
   // Getters & Setters
 
@@ -102,4 +104,13 @@ public class RealtimeProvisioningRuleParams {
   public void setNumHours(int[] numHours) {
     this.numHours = numHours;
   }
+
+  public int getNumRowsInGeneratedSegment() {
+    return numRowsInGeneratedSegment;
+  }
+
+  @JsonSetter(nulls = Nulls.SKIP)
+  public void setNumRowsInGeneratedSegment(int numRowsInGeneratedSegment) {
+    this.numRowsInGeneratedSegment = numRowsInGeneratedSegment;
+  }
 }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/io/params/RecommenderConstants.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/io/params/RecommenderConstants.java
index 67d7f31..096c38f 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/io/params/RecommenderConstants.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/io/params/RecommenderConstants.java
@@ -37,6 +37,7 @@ public class RecommenderConstants {
   }
 
   public static class RulesToExecute {
+    public static final boolean DEFAULT_RECOMMEND_SEGMENT_SIZE = true;
     public static final boolean DEFAULT_RECOMMEND_FLAG_QUERY = true;
     public static final boolean DEFAULT_RECOMMEND_VARIED_LENGTH_DICTIONARY = true;
     public static final boolean DEFAULT_RECOMMEND_KAFKA_PARTITION = true;
@@ -53,10 +54,10 @@ public class RecommenderConstants {
 
     public static final long DEFAULT_THRESHOLD_MAX_LATENCY_SLA_PARTITION = 1000;
     public static final long DEFAULT_THRESHOLD_MIN_QPS_PARTITION = 200;
-    public static final long DEFAULT_OPTIMAL_SIZE_PER_SEGMENT = 1000_000_000; //1GB
     public static final long DEFAULT_KAFKA_NUM_MESSAGES_PER_SEC_PER_PARTITION = 250;
     public static final double DEFAULT_THRESHOLD_RATIO_MIN_DIMENSION_PARTITION_TOP_CANDIDATES = 0.8d;
     public static final int DEFAULT_THRESHOLD_MAX_IN_LENGTH = 4;
+    public static final double ACCEPTABLE_CARDINALITY_TO_NUM_PARTITIONS_RATIO = 0.7;
   }
 
   public static class BloomFilterRule {
@@ -89,6 +90,13 @@ public class RecommenderConstants {
     public static final int[] DEFAULT_NUM_HOSTS = {3, 4, 5, 6, 7, 8, 9, 10, 11, 12};
   }
 
+  public static class SegmentSizeRule {
+    public static final int DEFAULT_NUM_SEGMENTS = 1;
+    public static final int DEFAULT_DESIRED_SEGMENT_SIZE_MB = 500;
+    public static final int NOT_PROVIDED = -1;
+    public static final double INDEX_OVERHEAD_RATIO_FOR_SEGMENT_SIZE = 1.2;
+  }
+
   public static final String PQL = "pql";
   public static final String SQL = "sql";
   public static final String OFFLINE = "offline";
@@ -119,4 +127,5 @@ public class RecommenderConstants {
   public static final int FIRST = 0;
   public static final int SECOND = 1;
 
+  public static final int DEFAULT_NUM_ROWS_IN_GENERATED_SEGMENT = 50_000;
 }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/io/params/SegmentSizeRuleParams.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/io/params/SegmentSizeRuleParams.java
new file mode 100644
index 0000000..362effc
--- /dev/null
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/io/params/SegmentSizeRuleParams.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.controller.recommender.rules.io.params;
+
+import com.fasterxml.jackson.annotation.JsonSetter;
+import com.fasterxml.jackson.annotation.Nulls;
+
+
+/**
+ * Parameters used in SegmentSizeRule
+ */
+public class SegmentSizeRuleParams {
+
+  // Desired segment size in MB
+  private int desiredSegmentSizeMB = RecommenderConstants.SegmentSizeRule.DEFAULT_DESIRED_SEGMENT_SIZE_MB;
+
+  // Number for rows in the generated segment
+  private int numRowsInGeneratedSegment = RecommenderConstants.DEFAULT_NUM_ROWS_IN_GENERATED_SEGMENT;
+
+  // Actual segment size in MB
+  private int actualSegmentSizeMB = RecommenderConstants.SegmentSizeRule.NOT_PROVIDED;
+
+  // Number of rows in the actual segment
+  private int numRowsInActualSegment = RecommenderConstants.SegmentSizeRule.NOT_PROVIDED;
+
+
+  // setter and getters
+
+  public int getDesiredSegmentSizeMB() {
+    return desiredSegmentSizeMB;
+  }
+
+  @JsonSetter(nulls = Nulls.SKIP)
+  public void setDesiredSegmentSizeMB(int desiredSegmentSizeMB) {
+    this.desiredSegmentSizeMB = desiredSegmentSizeMB;
+  }
+
+  public int getNumRowsInGeneratedSegment() {
+    return numRowsInGeneratedSegment;
+  }
+
+  @JsonSetter(nulls = Nulls.SKIP)
+  public void setNumRowsInGeneratedSegment(int numRowsInGeneratedSegment) {
+    this.numRowsInGeneratedSegment = numRowsInGeneratedSegment;
+  }
+
+  public int getActualSegmentSizeMB() {
+    return actualSegmentSizeMB;
+  }
+
+  @JsonSetter(nulls = Nulls.SKIP)
+  public void setActualSegmentSizeMB(int actualSegmentSizeMB) {
+    this.actualSegmentSizeMB = actualSegmentSizeMB;
+  }
+
+  public int getNumRowsInActualSegment() {
+    return numRowsInActualSegment;
+  }
+
+  @JsonSetter(nulls = Nulls.SKIP)
+  public void setNumRowsInActualSegment(int numRowsInActualSegment) {
+    this.numRowsInActualSegment = numRowsInActualSegment;
+  }
+}
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/recommender/TestConfigEngine.java b/pinot-controller/src/test/java/org/apache/pinot/controller/recommender/TestConfigEngine.java
index 555b0a1..a050ec7 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/recommender/TestConfigEngine.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/recommender/TestConfigEngine.java
@@ -36,6 +36,7 @@ import org.apache.pinot.controller.recommender.io.InputManager;
 import org.apache.pinot.controller.recommender.rules.AbstractRule;
 import org.apache.pinot.controller.recommender.rules.RulesToExecute;
 import org.apache.pinot.controller.recommender.rules.impl.InvertedSortedIndexJointRule;
+import org.apache.pinot.controller.recommender.rules.io.configs.SegmentSizeRecommendations;
 import org.apache.pinot.controller.recommender.rules.utils.FixedLenBitset;
 import org.apache.pinot.controller.recommender.rules.utils.QueryInvertedSortedIndexRecommender;
 import org.apache.pinot.spi.data.FieldSpec;
@@ -47,9 +48,7 @@ import org.testng.annotations.Test;
 import static org.apache.pinot.controller.recommender.rules.impl.RealtimeProvisioningRule.CONSUMING_MEMORY_PER_HOST;
 import static org.apache.pinot.controller.recommender.rules.impl.RealtimeProvisioningRule.OPTIMAL_SEGMENT_SIZE;
 import static org.apache.pinot.controller.recommender.rules.impl.RealtimeProvisioningRule.TOTAL_MEMORY_USED_PER_HOST;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.*;
 
 
 public class TestConfigEngine {
@@ -229,6 +228,10 @@ public class TestConfigEngine {
       throws InvalidInputException, IOException {
     loadInput("recommenderInput/PinotTablePartitionRuleInput.json");
 
+    // segment size recommendations get populated by SegmentSize Rule; hard-coding the values here
+    _input._overWrittenConfigs.setSegmentSizeRecommendations(
+        new SegmentSizeRecommendations(/*numRows=*/1_000_000, /*numSegments=*/4, /*segmentSize=*/1_000_000));
+
     AbstractRule abstractRule = RulesToExecute.RuleFactory
         .getRule(RulesToExecute.Rule.KafkaPartitionRule, _input, _input._overWrittenConfigs);
     abstractRule.run();
@@ -376,6 +379,42 @@ public class TestConfigEngine {
     assertTrue(output.isAggregateMetrics());
   }
 
+  @Test
+  void testSegmentSizeRule() throws Exception {
+    ConfigManager output = runRecommenderDriver("recommenderInput/SegmentSizeRuleInput.json");
+    SegmentSizeRecommendations segmentSizeRecommendations = output.getSegmentSizeRecommendations();
+    assertEquals(segmentSizeRecommendations.getNumSegments(), 2);
+    assertEquals(segmentSizeRecommendations.getNumRowsPerSegment(), 50_000);
+  }
+
+  @Test
+  void testSegmentSizeRule_noNeedToGenerateSegment() throws Exception {
+    ConfigManager output = runRecommenderDriver("recommenderInput/SegmentSizeRuleInput_noNeedToGenerateSegment.json");
+    SegmentSizeRecommendations segmentSizeRecommendations = output.getSegmentSizeRecommendations();
+    assertEquals(segmentSizeRecommendations.getNumSegments(), 2);
+    assertEquals(segmentSizeRecommendations.getNumRowsPerSegment(), 50_000);
+  }
+
+  @Test
+  void testSegmentSizeRule_ruleIsDisabledButItNeedsToBeSilentlyRun() throws Exception {
+    ConfigManager output =
+        runRecommenderDriver("recommenderInput/SegmentSizeRuleInput_ruleIsDisableButItNeedsToBeSilentlyRun.json");
+    assertNull(output.getSegmentSizeRecommendations()); // output is null because the rule silently ran
+    assertEquals(output.getPartitionConfig().getPartitionDimension(), "e");
+    assertEquals(output.getPartitionConfig().getNumPartitionsOffline(), 2);
+  }
+
+  @Test
+  void testSegmentSizeRule_realtimeOnlyTable() throws Exception {
+    ConfigManager output =
+        runRecommenderDriver("recommenderInput/SegmentSizeRuleInput_realtimeOnlyTable.json");
+    assertEquals(output.getSegmentSizeRecommendations().getMessage(),
+        "Segment sizing for realtime-only tables is done via Realtime Provisioning Rule");
+    assertEquals(output.getSegmentSizeRecommendations().getNumSegments(), 0);
+    assertEquals(output.getSegmentSizeRecommendations().getSegmentSize(), 0);
+    assertEquals(output.getSegmentSizeRecommendations().getNumRowsPerSegment(), 0);
+  }
+
   private void testRealtimeProvisioningRule(String fileName) throws Exception {
     ConfigManager output = runRecommenderDriver(fileName);
     Map<String, Map<String, String>> recommendations = output.getRealtimeProvisioningRecommendations();
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/recommender/rules/impl/PinotTablePartitionRuleTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/recommender/rules/impl/PinotTablePartitionRuleTest.java
new file mode 100644
index 0000000..da29d33
--- /dev/null
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/recommender/rules/impl/PinotTablePartitionRuleTest.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.controller.recommender.rules.impl;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.*;
+
+
+public class PinotTablePartitionRuleTest {
+
+  private static final double TOP_CANDIDATE_RATIO = 0.8;
+  private static final int NUM_PARTITIONS = 32;
+
+  @Test
+  public void testFindBestColumnForPartitioning() {
+
+    // d has max weight, but c has more cardinality
+    String[] columnNames = {"a", "b", "c", "d", "e"};
+    double[] cardinalities = {100, 90, 80, 70, 60};
+    double[] weights = {0.2, 0.3, 0.7, 0.8, 0.5};
+    assertEquals(Optional.of("c"), findBestColumn(columnNames, cardinalities, weights));
+
+    // d has max weight and max cardinality
+    columnNames = new String[] {"a", "b", "c", "d", "e"};
+    cardinalities = new double[] {100, 90, 80, 200, 60};
+    weights = new double[] {0.2, 0.3, 0.7, 0.8, 0.5};
+    assertEquals(Optional.of("d"), findBestColumn(columnNames, cardinalities, weights));
+
+    // d has max weight, but its cardinality compared to numPartition is lower than threshold;
+    columnNames = new String[] {"a", "b", "c", "d", "e"};
+    cardinalities = new double[] {100, 90, 80, 10, 60};
+    weights = new double[] {0.1, 0.1, 0.4, 0.8, 0.2};
+    assertEquals(Optional.of("c"), findBestColumn(columnNames, cardinalities, weights));
+  }
+
+  private Optional<String> findBestColumn(String[] columnNames, double[] cardinalities, double[] weights) {
+    List<Pair<String, Double>> colNameToWeightPairs = new ArrayList<>();
+    Map<String, Double> colNameToCardinality = new HashMap<>();
+    for (int i = 0; i < columnNames.length; i++) {
+      colNameToCardinality.put(columnNames[i], cardinalities[i]);
+      colNameToWeightPairs.add(ImmutablePair.of(columnNames[i], weights[i]));
+    }
+    return PinotTablePartitionRule.findBestColumnForPartitioning(
+        colNameToWeightPairs,
+        colNameToCardinality::get,
+        TOP_CANDIDATE_RATIO,
+        NUM_PARTITIONS
+    );
+  }
+}
\ No newline at end of file
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/recommender/rules/impl/SegmentSizeRuleTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/recommender/rules/impl/SegmentSizeRuleTest.java
new file mode 100644
index 0000000..aab8952
--- /dev/null
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/recommender/rules/impl/SegmentSizeRuleTest.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.controller.recommender.rules.impl;
+
+import org.apache.pinot.controller.recommender.rules.io.configs.SegmentSizeRecommendations;
+import org.testng.annotations.Test;
+
+import static org.apache.pinot.controller.recommender.rules.impl.SegmentSizeRule.MEGA_BYTE;
+import static org.testng.Assert.*;
+
+
+public class SegmentSizeRuleTest {
+
+  private static final SegmentSizeRule RULE = new SegmentSizeRule(null, null);
+  private static final int MILLION = 1_000_000;
+
+  @Test
+  public void testEstimate() {
+
+    /*
+     * NRPP -> num records per push
+     * NRGS -> num records of generated segment
+     * GSS  -> generated segment size
+     * DSS  -> desired segment size
+     */
+
+    long NRPP = 20 * MILLION;
+    int NRGS = 5 * MILLION;
+    long GSS = 50 * MEGA_BYTE;
+    int DSS = 120 * MEGA_BYTE;
+    SegmentSizeRecommendations params = RULE.estimate(GSS, DSS, NRGS, NRPP);
+    assertEquals(params.getNumSegments(), 2);
+    assertEquals(params.getSegmentSize(), 100 * MEGA_BYTE);
+    assertEquals(params.getNumRowsPerSegment(), 10 * MILLION);
+
+    NRPP = 22 * MILLION;
+    NRGS = 5 * MILLION;
+    GSS = 50 * MEGA_BYTE;
+    DSS = 120 * MEGA_BYTE;
+    params = RULE.estimate(GSS, DSS, NRGS, NRPP);
+    assertEquals(params.getNumSegments(), 2);
+    assertEquals(params.getSegmentSize(), 110 * MEGA_BYTE);
+    assertEquals(params.getNumRowsPerSegment(), 11 * MILLION);
+
+    NRPP = 18 * MILLION;
+    NRGS = 5 * MILLION;
+    GSS = 50 * MEGA_BYTE;
+    DSS = 120 * MEGA_BYTE;
+    params = RULE.estimate(GSS, DSS, NRGS, NRPP);
+    assertEquals(params.getNumSegments(), 2);
+    assertEquals(params.getSegmentSize(), 90 * MEGA_BYTE);
+    assertEquals(params.getNumRowsPerSegment(), 9 * MILLION);
+
+    NRPP = 16 * MILLION;
+    NRGS = 5 * MILLION;
+    GSS = 50 * MEGA_BYTE;
+    DSS = 120 * MEGA_BYTE;
+    params = RULE.estimate(GSS, DSS, NRGS, NRPP);
+    assertEquals(params.getNumSegments(), 1);
+    assertEquals(params.getSegmentSize(), 160 * MEGA_BYTE);
+    assertEquals(params.getNumRowsPerSegment(), 16 * MILLION);
+
+    NRPP = 2 * MILLION;
+    NRGS = 5 * MILLION;
+    GSS = 50 * MEGA_BYTE;
+    DSS = 120 * MEGA_BYTE;
+    params = RULE.estimate(GSS, DSS, NRGS, NRPP);
+    assertEquals(params.getNumSegments(), 1);
+    assertEquals(params.getSegmentSize(), 20 * MEGA_BYTE);
+    assertEquals(params.getNumRowsPerSegment(), 2 * MILLION);
+  }
+}
\ No newline at end of file
diff --git a/pinot-controller/src/test/resources/recommenderInput/BloomFilterInput.json b/pinot-controller/src/test/resources/recommenderInput/BloomFilterInput.json
index ee38092..8f7d29e 100644
--- a/pinot-controller/src/test/resources/recommenderInput/BloomFilterInput.json
+++ b/pinot-controller/src/test/resources/recommenderInput/BloomFilterInput.json
@@ -12,6 +12,7 @@
         "name": "b",
         "dataType": "DOUBLE",
         "cardinality":6,
+        "singleValueField": false,
         "numValuesPerEntry":1.5
       },
       {
@@ -24,6 +25,7 @@
         "name": "d",
         "dataType": "STRING",
         "cardinality":41,
+        "singleValueField": false,
         "numValuesPerEntry":2,
         "averageLength" : 27
       },
@@ -31,18 +33,21 @@
         "name": "e",
         "dataType": "LONG",
         "cardinality":18,
+        "singleValueField": false,
         "numValuesPerEntry":4
       },
       {
         "name": "f",
         "dataType": "DOUBLE",
         "cardinality":13,
+        "singleValueField": false,
         "numValuesPerEntry":3
       },
       {
         "name": "g",
         "dataType": "STRING",
         "cardinality":6,
+        "singleValueField": false,
         "numValuesPerEntry":2,
         "averageLength" : 100
       },
@@ -72,7 +77,7 @@
         "name": "k",
         "dataType": "DOUBLE",
         "cardinality":10000,
-        "numValuesPerEntry":2,
+        "numValuesPerEntry":1,
         "averageLength" : 100
       },
       {
diff --git a/pinot-controller/src/test/resources/recommenderInput/DataSizeCalculationInput.json b/pinot-controller/src/test/resources/recommenderInput/DataSizeCalculationInput.json
index f1f083a..78f2b2b 100644
--- a/pinot-controller/src/test/resources/recommenderInput/DataSizeCalculationInput.json
+++ b/pinot-controller/src/test/resources/recommenderInput/DataSizeCalculationInput.json
@@ -18,12 +18,14 @@
         "name": "c",
         "dataType": "DOUBLE",
         "cardinality":2000,
+        "singleValueField": false,
         "numValuesPerEntry":4
       },
       {
         "name": "d",
         "dataType": "STRING",
         "cardinality":1000,
+        "singleValueField": false,
         "numValuesPerEntry":2,
         "averageLength" : 27
       }
diff --git a/pinot-controller/src/test/resources/recommenderInput/EmptyQueriesInput.json b/pinot-controller/src/test/resources/recommenderInput/EmptyQueriesInput.json
index d16e8f6..65e7968 100644
--- a/pinot-controller/src/test/resources/recommenderInput/EmptyQueriesInput.json
+++ b/pinot-controller/src/test/resources/recommenderInput/EmptyQueriesInput.json
@@ -24,6 +24,7 @@
         "name": "d",
         "dataType": "STRING",
         "cardinality":41,
+        "singleValueField": false,
         "numValuesPerEntry":2,
         "averageLength" : 27
       },
@@ -31,6 +32,7 @@
         "name": "e",
         "dataType": "LONG",
         "cardinality":18,
+        "singleValueField": false,
         "numValuesPerEntry":4
       }
     ],
@@ -39,7 +41,7 @@
         "name": "k",
         "dataType": "DOUBLE",
         "cardinality":10000,
-        "numValuesPerEntry":2,
+        "numValuesPerEntry":1,
         "averageLength" : 100
       },
       {
diff --git a/pinot-controller/src/test/resources/recommenderInput/FlagQueryInput.json b/pinot-controller/src/test/resources/recommenderInput/FlagQueryInput.json
index 4fc7446..0f256bc 100644
--- a/pinot-controller/src/test/resources/recommenderInput/FlagQueryInput.json
+++ b/pinot-controller/src/test/resources/recommenderInput/FlagQueryInput.json
@@ -12,6 +12,7 @@
         "name": "b",
         "dataType": "DOUBLE",
         "cardinality":6,
+        "singleValueField": false,
         "numValuesPerEntry":1.5
       },
       {
@@ -24,6 +25,7 @@
         "name": "d",
         "dataType": "STRING",
         "cardinality":41,
+        "singleValueField": false,
         "numValuesPerEntry":2,
         "averageLength" : 27
       },
@@ -31,18 +33,21 @@
         "name": "e",
         "dataType": "LONG",
         "cardinality":18,
+        "singleValueField": false,
         "numValuesPerEntry":4
       },
       {
         "name": "f",
         "dataType": "DOUBLE",
         "cardinality":13,
+        "singleValueField": false,
         "numValuesPerEntry":3
       },
       {
         "name": "g",
         "dataType": "STRING",
         "cardinality":6,
+        "singleValueField": false,
         "numValuesPerEntry":2,
         "averageLength" : 100
       },
@@ -73,7 +78,7 @@
         "name": "k",
         "dataType": "DOUBLE",
         "cardinality":10000,
-        "numValuesPerEntry":2,
+        "numValuesPerEntry":1,
         "averageLength" : 100
       },
       {
diff --git a/pinot-controller/src/test/resources/recommenderInput/InvalidInput1.json b/pinot-controller/src/test/resources/recommenderInput/InvalidInput1.json
index f14bba1..7914980 100644
--- a/pinot-controller/src/test/resources/recommenderInput/InvalidInput1.json
+++ b/pinot-controller/src/test/resources/recommenderInput/InvalidInput1.json
@@ -12,6 +12,7 @@
         "name": "b",
         "dataType": "BYTES",
         "cardinality":6,
+        "singleValueField": false,
         "numValuesPerEntry":2
       },
       {
@@ -24,6 +25,7 @@
         "name": "d",
         "dataType": "STRING",
         "cardinality":41,
+        "singleValueField": false,
         "numValuesPerEntry":2,
         "averageLength" : 27
       },
@@ -31,6 +33,7 @@
         "name": "e",
         "dataType": "LONG",
         "cardinality":18,
+        "singleValueField": false,
         "numValuesPerEntry":4
       }
     ],
@@ -39,7 +42,7 @@
         "name": "k",
         "dataType": "DOUBLE",
         "cardinality":10000,
-        "numValuesPerEntry":2,
+        "numValuesPerEntry":1,
         "averageLength" : 100
       },
       {
diff --git a/pinot-controller/src/test/resources/recommenderInput/InvalidInput2.json b/pinot-controller/src/test/resources/recommenderInput/InvalidInput2.json
index c29bdc9..9c43551 100644
--- a/pinot-controller/src/test/resources/recommenderInput/InvalidInput2.json
+++ b/pinot-controller/src/test/resources/recommenderInput/InvalidInput2.json
@@ -24,6 +24,7 @@
         "name": "d",
         "dataType": "STRING",
         "cardinality":41,
+        "singleValueField": false,
         "numValuesPerEntry":2,
         "averageLength" : 27
       },
@@ -31,6 +32,7 @@
         "name": "e",
         "dataType": "LONG",
         "cardinality":18,
+        "singleValueField": false,
         "numValuesPerEntry":4
       }
     ],
@@ -39,7 +41,7 @@
         "name": "k",
         "dataType": "DOUBLE",
         "cardinality":10000,
-        "numValuesPerEntry":2,
+        "numValuesPerEntry":1,
         "averageLength" : 100
       },
       {
diff --git a/pinot-controller/src/test/resources/recommenderInput/KafkaPartitionRuleInput.json b/pinot-controller/src/test/resources/recommenderInput/KafkaPartitionRuleInput.json
index 4bf697f..f2e8496 100644
--- a/pinot-controller/src/test/resources/recommenderInput/KafkaPartitionRuleInput.json
+++ b/pinot-controller/src/test/resources/recommenderInput/KafkaPartitionRuleInput.json
@@ -12,6 +12,7 @@
         "name": "b",
         "dataType": "DOUBLE",
         "cardinality":6,
+        "singleValueField": false,
         "numValuesPerEntry":1.5
       },
       {
@@ -24,6 +25,7 @@
         "name": "d",
         "dataType": "STRING",
         "cardinality":41,
+        "singleValueField": false,
         "numValuesPerEntry":2,
         "averageLength" : 27
       },
@@ -72,7 +74,7 @@
         "name": "k",
         "dataType": "DOUBLE",
         "cardinality":10000,
-        "numValuesPerEntry":2,
+        "numValuesPerEntry":1,
         "averageLength" : 100
       },
       {
diff --git a/pinot-controller/src/test/resources/recommenderInput/KafkaPartitionRuleInput2.json b/pinot-controller/src/test/resources/recommenderInput/KafkaPartitionRuleInput2.json
index e8f370f..b2e879b 100644
--- a/pinot-controller/src/test/resources/recommenderInput/KafkaPartitionRuleInput2.json
+++ b/pinot-controller/src/test/resources/recommenderInput/KafkaPartitionRuleInput2.json
@@ -12,6 +12,7 @@
         "name": "b",
         "dataType": "DOUBLE",
         "cardinality":6,
+        "singleValueField": false,
         "numValuesPerEntry":1.5
       },
       {
@@ -24,6 +25,7 @@
         "name": "d",
         "dataType": "STRING",
         "cardinality":41,
+        "singleValueField": false,
         "numValuesPerEntry":2,
         "averageLength" : 27
       },
@@ -31,18 +33,21 @@
         "name": "e",
         "dataType": "LONG",
         "cardinality":18,
+        "singleValueField": false,
         "numValuesPerEntry":4
       },
       {
         "name": "f",
         "dataType": "DOUBLE",
         "cardinality":13,
+        "singleValueField": false,
         "numValuesPerEntry":3
       },
       {
         "name": "g",
         "dataType": "STRING",
         "cardinality":6,
+        "singleValueField": false,
         "numValuesPerEntry":2,
         "averageLength" : 100
       },
@@ -72,7 +77,7 @@
         "name": "k",
         "dataType": "DOUBLE",
         "cardinality":10000,
-        "numValuesPerEntry":2,
+        "numValuesPerEntry":1,
         "averageLength" : 100
       },
       {
diff --git a/pinot-controller/src/test/resources/recommenderInput/NoDictionaryOnHeapDictionaryJointRuleInput.json b/pinot-controller/src/test/resources/recommenderInput/NoDictionaryOnHeapDictionaryJointRuleInput.json
index 80218b7..0bfe76e 100644
--- a/pinot-controller/src/test/resources/recommenderInput/NoDictionaryOnHeapDictionaryJointRuleInput.json
+++ b/pinot-controller/src/test/resources/recommenderInput/NoDictionaryOnHeapDictionaryJointRuleInput.json
@@ -12,12 +12,14 @@
         "name": "b",
         "dataType": "DOUBLE",
         "cardinality":6,
+        "singleValueField": false,
         "numValuesPerEntry":1.5
       },
       {
         "name": "c",
         "dataType": "STRING",
         "cardinality": 100000000,
+        "singleValueField": false,
         "numValuesPerEntry":2,
         "averageLength" : 100
       },
@@ -32,18 +34,21 @@
         "name": "e",
         "dataType": "LONG",
         "cardinality":10000000,
+        "singleValueField": false,
         "numValuesPerEntry":4
       },
       {
         "name": "f",
         "dataType": "DOUBLE",
         "cardinality":20,
+        "singleValueField": false,
         "numValuesPerEntry":3
       },
       {
         "name": "g",
         "dataType": "STRING",
         "cardinality":6,
+        "singleValueField": false,
         "numValuesPerEntry":2,
         "averageLength" : 100
       },
diff --git a/pinot-controller/src/test/resources/recommenderInput/PinotTablePartitionRuleInput.json b/pinot-controller/src/test/resources/recommenderInput/PinotTablePartitionRuleInput.json
index a506320..4f88f8f 100644
--- a/pinot-controller/src/test/resources/recommenderInput/PinotTablePartitionRuleInput.json
+++ b/pinot-controller/src/test/resources/recommenderInput/PinotTablePartitionRuleInput.json
@@ -12,12 +12,14 @@
         "name": "b",
         "dataType": "DOUBLE",
         "cardinality":6,
+        "singleValueField": false,
         "numValuesPerEntry":1.5
       },
       {
         "name": "c",
         "dataType": "STRING",
         "cardinality": 100000000,
+        "singleValueField": false,
         "numValuesPerEntry":2,
         "averageLength" : 100
       },
@@ -32,18 +34,21 @@
         "name": "e",
         "dataType": "LONG",
         "cardinality":10000000,
+        "singleValueField": false,
         "numValuesPerEntry":4
       },
       {
         "name": "f",
         "dataType": "DOUBLE",
         "cardinality":20,
+        "singleValueField": false,
         "numValuesPerEntry":3
       },
       {
         "name": "g",
         "dataType": "STRING",
         "cardinality":6,
+        "singleValueField": false,
         "numValuesPerEntry":2,
         "averageLength" : 100
       },
@@ -73,7 +78,7 @@
         "name": "k",
         "dataType": "DOUBLE",
         "cardinality":10000,
-        "numValuesPerEntry":2,
+        "numValuesPerEntry":1,
         "averageLength" : 100
       },
       {
diff --git a/pinot-controller/src/test/resources/recommenderInput/RealtimeProvisioningInput_dateTimeColumn.json b/pinot-controller/src/test/resources/recommenderInput/RealtimeProvisioningInput_dateTimeColumn.json
index a846ecb..89b5915 100644
--- a/pinot-controller/src/test/resources/recommenderInput/RealtimeProvisioningInput_dateTimeColumn.json
+++ b/pinot-controller/src/test/resources/recommenderInput/RealtimeProvisioningInput_dateTimeColumn.json
@@ -135,7 +135,7 @@
   },
   "qps": 150,
   "numMessagesPerSecInKafkaTopic":1000,
-  "numRecordsPerPush":10000,
+  "numRecordsPerPush":10000000,
   "tableType": "HYBRID",
   "latencySLA": 500,
   "rulesToExecute": {
@@ -159,7 +159,8 @@
     "realtimeTableRetentionHours": 72,
     "maxUsableHostMemory": "60G",
     "numHours": [2, 4, 6, 8, 10, 12],
-    "numHosts": [3, 6, 9, 12, 15, 18, 21]
+    "numHosts": [3, 6, 9, 12, 15, 18, 21],
+    "numRowsInGeneratedSegment": 10000
   },
   "overWrittenConfigs": {
     "indexConfig": {
diff --git a/pinot-controller/src/test/resources/recommenderInput/RealtimeProvisioningInput_timeColumn.json b/pinot-controller/src/test/resources/recommenderInput/RealtimeProvisioningInput_timeColumn.json
index cd568d5..fb0aeb8 100644
--- a/pinot-controller/src/test/resources/recommenderInput/RealtimeProvisioningInput_timeColumn.json
+++ b/pinot-controller/src/test/resources/recommenderInput/RealtimeProvisioningInput_timeColumn.json
@@ -135,7 +135,6 @@
   },
   "qps": 150,
   "numMessagesPerSecInKafkaTopic":1000,
-  "numRecordsPerPush":10000,
   "tableType": "HYBRID",
   "latencySLA": 500,
   "rulesToExecute": {
@@ -159,7 +158,8 @@
     "realtimeTableRetentionHours": 72,
     "maxUsableHostMemory": "60G",
     "numHours": [2, 4, 6, 8, 10, 12],
-    "numHosts": [3, 6, 9, 12, 15, 18, 21]
+    "numHosts": [3, 6, 9, 12, 15, 18, 21],
+    "numRowsInGeneratedSegment": 10000
   },
   "overWrittenConfigs": {
     "indexConfig": {
diff --git a/pinot-controller/src/test/resources/recommenderInput/RealtimeProvisioningInput_dateTimeColumn.json b/pinot-controller/src/test/resources/recommenderInput/SegmentSizeRuleInput.json
similarity index 91%
copy from pinot-controller/src/test/resources/recommenderInput/RealtimeProvisioningInput_dateTimeColumn.json
copy to pinot-controller/src/test/resources/recommenderInput/SegmentSizeRuleInput.json
index a846ecb..248001b 100644
--- a/pinot-controller/src/test/resources/recommenderInput/RealtimeProvisioningInput_dateTimeColumn.json
+++ b/pinot-controller/src/test/resources/recommenderInput/SegmentSizeRuleInput.json
@@ -5,7 +5,7 @@
       {
         "name": "a",
         "dataType": "INT",
-        "cardinality":20,
+        "cardinality":70000,
         "numValuesPerEntry":1
       },
       {
@@ -133,13 +133,18 @@
     "select f from tableName where (a=0 and b=1) or c=7 or (d = 7 and e =1)": 2,
     "select f from tableName where t between 1 and 1000": 2
   },
-  "qps": 150,
+  "qps": 400,
   "numMessagesPerSecInKafkaTopic":1000,
-  "numRecordsPerPush":10000,
+  "numRecordsPerPush":100000,
   "tableType": "HYBRID",
   "latencySLA": 500,
   "rulesToExecute": {
-    "recommendRealtimeProvisioning": true
+    "recommendSegmentSize": true,
+    "recommendRealtimeProvisioning": false
+  },
+  "segmentSizeRuleParams": {
+    "desiredSegmentSizeMB": 5,
+    "numRowsInGeneratedSegment": 50000
   },
   "partitionRuleParams": {
     "THRESHOLD_MAX_LATENCY_SLA_PARTITION": 1001
@@ -153,14 +158,6 @@
   "noDictionaryOnHeapDictionaryJointRuleParams": {
     "THRESHOLD_MIN_PERCENT_DICTIONARY_STORAGE_SAVE" : 0.96
   },
-  "realtimeProvisioningRuleParams": {
-    "numPartitions": 10,
-    "numReplicas": 3,
-    "realtimeTableRetentionHours": 72,
-    "maxUsableHostMemory": "60G",
-    "numHours": [2, 4, 6, 8, 10, 12],
-    "numHosts": [3, 6, 9, 12, 15, 18, 21]
-  },
   "overWrittenConfigs": {
     "indexConfig": {
       "invertedIndexColumns": ["a","b"],
diff --git a/pinot-controller/src/test/resources/recommenderInput/RealtimeProvisioningInput_dateTimeColumn.json b/pinot-controller/src/test/resources/recommenderInput/SegmentSizeRuleInput_noNeedToGenerateSegment.json
similarity index 91%
copy from pinot-controller/src/test/resources/recommenderInput/RealtimeProvisioningInput_dateTimeColumn.json
copy to pinot-controller/src/test/resources/recommenderInput/SegmentSizeRuleInput_noNeedToGenerateSegment.json
index a846ecb..b206932 100644
--- a/pinot-controller/src/test/resources/recommenderInput/RealtimeProvisioningInput_dateTimeColumn.json
+++ b/pinot-controller/src/test/resources/recommenderInput/SegmentSizeRuleInput_noNeedToGenerateSegment.json
@@ -5,7 +5,7 @@
       {
         "name": "a",
         "dataType": "INT",
-        "cardinality":20,
+        "cardinality":70000,
         "numValuesPerEntry":1
       },
       {
@@ -133,13 +133,19 @@
     "select f from tableName where (a=0 and b=1) or c=7 or (d = 7 and e =1)": 2,
     "select f from tableName where t between 1 and 1000": 2
   },
-  "qps": 150,
+  "qps": 400,
   "numMessagesPerSecInKafkaTopic":1000,
-  "numRecordsPerPush":10000,
+  "numRecordsPerPush":100000,
   "tableType": "HYBRID",
   "latencySLA": 500,
   "rulesToExecute": {
-    "recommendRealtimeProvisioning": true
+    "recommendSegmentSize": true,
+    "recommendRealtimeProvisioning": false
+  },
+  "segmentSizeRuleParams": {
+    "desiredSegmentSizeMB": 5,
+    "actualSegmentSizeMB": 4,
+    "numRowsInActualSegment": 45000
   },
   "partitionRuleParams": {
     "THRESHOLD_MAX_LATENCY_SLA_PARTITION": 1001
@@ -153,14 +159,6 @@
   "noDictionaryOnHeapDictionaryJointRuleParams": {
     "THRESHOLD_MIN_PERCENT_DICTIONARY_STORAGE_SAVE" : 0.96
   },
-  "realtimeProvisioningRuleParams": {
-    "numPartitions": 10,
-    "numReplicas": 3,
-    "realtimeTableRetentionHours": 72,
-    "maxUsableHostMemory": "60G",
-    "numHours": [2, 4, 6, 8, 10, 12],
-    "numHosts": [3, 6, 9, 12, 15, 18, 21]
-  },
   "overWrittenConfigs": {
     "indexConfig": {
       "invertedIndexColumns": ["a","b"],
diff --git a/pinot-controller/src/test/resources/recommenderInput/RealtimeProvisioningInput_dateTimeColumn.json b/pinot-controller/src/test/resources/recommenderInput/SegmentSizeRuleInput_realtimeOnlyTable.json
similarity index 69%
copy from pinot-controller/src/test/resources/recommenderInput/RealtimeProvisioningInput_dateTimeColumn.json
copy to pinot-controller/src/test/resources/recommenderInput/SegmentSizeRuleInput_realtimeOnlyTable.json
index a846ecb..f3ff4c1 100644
--- a/pinot-controller/src/test/resources/recommenderInput/RealtimeProvisioningInput_dateTimeColumn.json
+++ b/pinot-controller/src/test/resources/recommenderInput/SegmentSizeRuleInput_realtimeOnlyTable.json
@@ -5,7 +5,7 @@
       {
         "name": "a",
         "dataType": "INT",
-        "cardinality":20,
+        "cardinality":70000,
         "numValuesPerEntry":1
       },
       {
@@ -32,9 +32,7 @@
       {
         "name": "e",
         "dataType": "LONG",
-        "cardinality":18,
-        "numValuesPerEntry":4,
-        "singleValueField": false
+        "cardinality":18
       },
       {
         "name": "f",
@@ -127,40 +125,22 @@
     ]
   },
   "queriesWithWeights":{
-    "select i from tableName where b in (2,4) and ((a in (1,2,3) and e = 4) or c = 7) and d in ('#VALUES', 23) and t > 500": 1,
-    "select j from tableName where (a=3 and (h = 5 or f >34) and REGEXP_LIKE(i, 'as*')) or ((f = 3  or j in ('#VALUES', 4)) and REGEXP_LIKE(d, 'fl*'))": 2,
-    "select f from tableName where (a=0 or (b=1 and (e in ('#VALUES',2) or c=7))) and TEXT_MATCH(d, 'dasd') and MAX(MAX(h,i),j)=4 and t<3": 4,
-    "select f from tableName where (a=0 and b=1) or c=7 or (d = 7 and e =1)": 2,
-    "select f from tableName where t between 1 and 1000": 2
+    "select m, n from tableName where a = 3 and e = 4": 0.7,
+    "select sum(n) from tableName where e = 5": 0.3
   },
-  "qps": 150,
+  "qps": 400,
   "numMessagesPerSecInKafkaTopic":1000,
-  "numRecordsPerPush":10000,
-  "tableType": "HYBRID",
+  "numRecordsPerPush":10000000,
+  "tableType": "REALTIME",
   "latencySLA": 500,
   "rulesToExecute": {
-    "recommendRealtimeProvisioning": true
+    "recommendSegmentSize": true,
+    "recommendPinotTablePartition": true,
+    "recommendRealtimeProvisioning": false
   },
   "partitionRuleParams": {
     "THRESHOLD_MAX_LATENCY_SLA_PARTITION": 1001
   },
-  "bloomFilterRuleParams": {
-    "THRESHOLD_MIN_PERCENT_EQ_BLOOMFILTER" : 0.51
-  },
-  "invertedSortedIndexJointRuleParams": {
-    "THRESHOLD_RATIO_MIN_GAIN_DIFF_BETWEEN_ITERATION" : 0.06
-  },
-  "noDictionaryOnHeapDictionaryJointRuleParams": {
-    "THRESHOLD_MIN_PERCENT_DICTIONARY_STORAGE_SAVE" : 0.96
-  },
-  "realtimeProvisioningRuleParams": {
-    "numPartitions": 10,
-    "numReplicas": 3,
-    "realtimeTableRetentionHours": 72,
-    "maxUsableHostMemory": "60G",
-    "numHours": [2, 4, 6, 8, 10, 12],
-    "numHosts": [3, 6, 9, 12, 15, 18, 21]
-  },
   "overWrittenConfigs": {
     "indexConfig": {
       "invertedIndexColumns": ["a","b"],
diff --git a/pinot-controller/src/test/resources/recommenderInput/RealtimeProvisioningInput_dateTimeColumn.json b/pinot-controller/src/test/resources/recommenderInput/SegmentSizeRuleInput_ruleIsDisableButItNeedsToBeSilentlyRun.json
similarity index 69%
copy from pinot-controller/src/test/resources/recommenderInput/RealtimeProvisioningInput_dateTimeColumn.json
copy to pinot-controller/src/test/resources/recommenderInput/SegmentSizeRuleInput_ruleIsDisableButItNeedsToBeSilentlyRun.json
index a846ecb..2098c90 100644
--- a/pinot-controller/src/test/resources/recommenderInput/RealtimeProvisioningInput_dateTimeColumn.json
+++ b/pinot-controller/src/test/resources/recommenderInput/SegmentSizeRuleInput_ruleIsDisableButItNeedsToBeSilentlyRun.json
@@ -5,7 +5,7 @@
       {
         "name": "a",
         "dataType": "INT",
-        "cardinality":20,
+        "cardinality":70000,
         "numValuesPerEntry":1
       },
       {
@@ -32,9 +32,7 @@
       {
         "name": "e",
         "dataType": "LONG",
-        "cardinality":18,
-        "numValuesPerEntry":4,
-        "singleValueField": false
+        "cardinality":18
       },
       {
         "name": "f",
@@ -127,40 +125,22 @@
     ]
   },
   "queriesWithWeights":{
-    "select i from tableName where b in (2,4) and ((a in (1,2,3) and e = 4) or c = 7) and d in ('#VALUES', 23) and t > 500": 1,
-    "select j from tableName where (a=3 and (h = 5 or f >34) and REGEXP_LIKE(i, 'as*')) or ((f = 3  or j in ('#VALUES', 4)) and REGEXP_LIKE(d, 'fl*'))": 2,
-    "select f from tableName where (a=0 or (b=1 and (e in ('#VALUES',2) or c=7))) and TEXT_MATCH(d, 'dasd') and MAX(MAX(h,i),j)=4 and t<3": 4,
-    "select f from tableName where (a=0 and b=1) or c=7 or (d = 7 and e =1)": 2,
-    "select f from tableName where t between 1 and 1000": 2
+    "select m, n from tableName where a = 3 and e = 4": 0.7,
+    "select sum(n) from tableName where e = 5": 0.3
   },
-  "qps": 150,
+  "qps": 400,
   "numMessagesPerSecInKafkaTopic":1000,
-  "numRecordsPerPush":10000,
+  "numRecordsPerPush":10000000,
   "tableType": "HYBRID",
   "latencySLA": 500,
   "rulesToExecute": {
-    "recommendRealtimeProvisioning": true
+    "recommendSegmentSize": false,
+    "recommendPinotTablePartition": true,
+    "recommendRealtimeProvisioning": false
   },
   "partitionRuleParams": {
     "THRESHOLD_MAX_LATENCY_SLA_PARTITION": 1001
   },
-  "bloomFilterRuleParams": {
-    "THRESHOLD_MIN_PERCENT_EQ_BLOOMFILTER" : 0.51
-  },
-  "invertedSortedIndexJointRuleParams": {
-    "THRESHOLD_RATIO_MIN_GAIN_DIFF_BETWEEN_ITERATION" : 0.06
-  },
-  "noDictionaryOnHeapDictionaryJointRuleParams": {
-    "THRESHOLD_MIN_PERCENT_DICTIONARY_STORAGE_SAVE" : 0.96
-  },
-  "realtimeProvisioningRuleParams": {
-    "numPartitions": 10,
-    "numReplicas": 3,
-    "realtimeTableRetentionHours": 72,
-    "maxUsableHostMemory": "60G",
-    "numHours": [2, 4, 6, 8, 10, 12],
-    "numHosts": [3, 6, 9, 12, 15, 18, 21]
-  },
   "overWrittenConfigs": {
     "indexConfig": {
       "invertedIndexColumns": ["a","b"],
diff --git a/pinot-controller/src/test/resources/recommenderInput/SortedInvertedIndexInput.json b/pinot-controller/src/test/resources/recommenderInput/SortedInvertedIndexInput.json
index 190e4ef..c1d5c07 100644
--- a/pinot-controller/src/test/resources/recommenderInput/SortedInvertedIndexInput.json
+++ b/pinot-controller/src/test/resources/recommenderInput/SortedInvertedIndexInput.json
@@ -12,6 +12,7 @@
         "name": "b",
         "dataType": "DOUBLE",
         "cardinality":6,
+        "singleValueField": false,
         "numValuesPerEntry":1.5
       },
       {
@@ -24,6 +25,7 @@
         "name": "d",
         "dataType": "STRING",
         "cardinality":41,
+        "singleValueField": false,
         "numValuesPerEntry":2,
         "averageLength" : 27
       },
@@ -31,18 +33,21 @@
         "name": "e",
         "dataType": "LONG",
         "cardinality":18,
+        "singleValueField": false,
         "numValuesPerEntry":4
       },
       {
         "name": "f",
         "dataType": "DOUBLE",
         "cardinality":13,
+        "singleValueField": false,
         "numValuesPerEntry":3
       },
       {
         "name": "g",
         "dataType": "STRING",
         "cardinality":6,
+        "singleValueField": false,
         "numValuesPerEntry":2,
         "averageLength" : 100
       },
@@ -73,7 +78,7 @@
         "name": "k",
         "dataType": "DOUBLE",
         "cardinality":10000,
-        "numValuesPerEntry":2,
+        "numValuesPerEntry":1,
         "averageLength" : 100
       },
       {
diff --git a/pinot-controller/src/test/resources/recommenderInput/VariedLengthDictionaryInput.json b/pinot-controller/src/test/resources/recommenderInput/VariedLengthDictionaryInput.json
index 7bec6cb..2ac264d 100644
--- a/pinot-controller/src/test/resources/recommenderInput/VariedLengthDictionaryInput.json
+++ b/pinot-controller/src/test/resources/recommenderInput/VariedLengthDictionaryInput.json
@@ -39,7 +39,7 @@
         "name": "k",
         "dataType": "DOUBLE",
         "cardinality":10000,
-        "numValuesPerEntry":2,
+        "numValuesPerEntry":1,
         "averageLength" : 100
       },
       {

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org