You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by si...@apache.org on 2021/02/18 04:02:07 UTC

[incubator-pinot] branch master updated: Improve Real Time Provisioning Helper tool (#6546)

This is an automated email from the ASF dual-hosted git repository.

siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 4c3af59  Improve Real Time Provisioning Helper tool (#6546)
4c3af59 is described below

commit 4c3af59e19c33fdb9162589b18cff8040ccb3b75
Author: Sajjad Moradi <mo...@gmail.com>
AuthorDate: Wed Feb 17 20:01:48 2021 -0800

    Improve Real Time Provisioning Helper tool (#6546)
    
    * Improve Realtime Provisioning Helper tool
    
    * Fix license header issue - was missing a `*`
    
    * Use existing schemaWithMetadata class instead dataCharacteristics
    
    * Fix license header issue - was missing a `*`
    
    * Move data objects to pinot-controller
---
 .../controller/recommender/io/InputManager.java    |   5 +-
 .../io/metadata/DateTimeFieldSpecMetadata.java     |  54 ++++++
 .../recommender/io/metadata/FieldMetadata.java     |   6 +-
 .../io/metadata/SchemaWithMetaData.java            |   8 +-
 .../io/metadata/TimeFieldSpecMetadata.java         |  27 +--
 .../io/metadata/TimeGranularitySpecMetadata.java   |  40 +++++
 .../rules/io/params/RecommenderConstants.java      |   2 +-
 pinot-tools/pom.xml                                |   5 +
 .../tools/admin/command/GenerateDataCommand.java   |  10 +-
 .../command/RealtimeProvisioningHelperCommand.java |  44 ++++-
 .../pinot/tools/data/generator/BytesGenerator.java |  47 +++++
 .../pinot/tools/data/generator/DataGenerator.java  | 104 ++++++++---
 .../tools/data/generator/DataGeneratorSpec.java    |  16 +-
 .../tools/data/generator/GeneratorFactory.java     |  15 +-
 .../data/generator/MultiValueGeneratorHelper.java  |  54 ++++++
 .../tools/data/generator/NumberGenerator.java      |  34 ++--
 .../tools/data/generator/StringGenerator.java      |  26 ++-
 .../pinot/tools/data/generator/TimeGenerator.java  |  70 ++++++++
 .../realtime/provisioning/MemoryEstimator.java     | 200 +++++++++++++++++++++
 .../generator/MultiValueGeneratorHelperTest.java   |  49 +++++
 .../tools/data/generator/TimeGeneratorTest.java    |  52 ++++++
 .../realtime/provisioning/MemoryEstimatorTest.java | 129 +++++++++++++
 .../memory_estimation/schema-with-metadata.json    |  93 ++++++++++
 .../schema-with-metadata__dateTimeFieldSpec.json   |  54 ++++++
 .../resources/memory_estimation/table-config.json  |  49 +++++
 25 files changed, 1116 insertions(+), 77 deletions(-)

diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/io/InputManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/io/InputManager.java
index 8a64892..f45d6bb 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/io/InputManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/io/InputManager.java
@@ -155,8 +155,9 @@ public class InputManager {
     }
 
     _metaDataMap.keySet().forEach(colName -> {
-      double cardinality = _metaDataMap.get(colName).getCardinality();
-      _metaDataMap.get(colName).setCardinality(regulateCardinalityInfinitePopulation(cardinality, sampleSize));
+      int cardinality = _metaDataMap.get(colName).getCardinality();
+      double regulatedCardinality = regulateCardinalityInfinitePopulation(cardinality, sampleSize);
+      _metaDataMap.get(colName).setCardinality((int) Math.round(regulatedCardinality));
     });
   }
 
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/io/metadata/DateTimeFieldSpecMetadata.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/io/metadata/DateTimeFieldSpecMetadata.java
new file mode 100644
index 0000000..3d6f093
--- /dev/null
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/io/metadata/DateTimeFieldSpecMetadata.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.controller.recommender.io.metadata;
+
+import org.apache.pinot.spi.data.DateTimeFieldSpec;
+import org.apache.pinot.spi.data.Schema;
+
+
+/**
+ * This class is used in {@link SchemaWithMetaData} to add metadata to {@link DateTimeFieldSpec}. Without this object,
+ * json representation of {@link SchemaWithMetaData} object cannot be deserialized to {@link Schema} object.
+ */
+public class DateTimeFieldSpecMetadata extends FieldMetadata {
+  private String _format;
+  private String _granularity;
+
+  public String getFormat() {
+    return _format;
+  }
+
+  public void setFormat(String format) {
+    _format = format;
+  }
+
+  public String getGranularity() {
+    return _granularity;
+  }
+
+  public void setGranularity(String granularity) {
+    _granularity = granularity;
+  }
+
+  @Override
+  public FieldType getFieldType() {
+    return FieldType.DATE_TIME;
+  }
+}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/io/metadata/FieldMetadata.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/io/metadata/FieldMetadata.java
index f39ede5..22b91db 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/io/metadata/FieldMetadata.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/io/metadata/FieldMetadata.java
@@ -33,7 +33,7 @@ import static org.apache.pinot.controller.recommender.rules.io.params.Recommende
  */
 @JsonIgnoreProperties(ignoreUnknown = true)
 public class FieldMetadata extends FieldSpec {
-  double _cardinality = DEFAULT_CARDINALITY;
+  int _cardinality = DEFAULT_CARDINALITY;
   int _averageLength = DEFAULT_DATA_LENGTH;
   double _numValuesPerEntry = DEFAULT_AVERAGE_NUM_VALUES_PER_ENTRY; // for multi-values
 
@@ -55,12 +55,12 @@ public class FieldMetadata extends FieldSpec {
     this._numValuesPerEntry = numValuesPerEntry;
   }
 
-  public double getCardinality() {
+  public int getCardinality() {
     return _cardinality;
   }
 
   @JsonSetter(nulls = Nulls.SKIP)
-  public void setCardinality(double cardinality) {
+  public void setCardinality(int cardinality) {
     this._cardinality = cardinality;
   }
 
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/io/metadata/SchemaWithMetaData.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/io/metadata/SchemaWithMetaData.java
index 2254082..7385093 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/io/metadata/SchemaWithMetaData.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/io/metadata/SchemaWithMetaData.java
@@ -31,7 +31,7 @@ import java.util.List;
 public class SchemaWithMetaData {
   private List<FieldMetadata> _dimensionFieldSpecs = new ArrayList<>();
   private List<FieldMetadata> _metricFieldSpecs = new ArrayList<>();
-  private List<FieldMetadata> _dateTimeFieldSpecs = new ArrayList<>();
+  private List<DateTimeFieldSpecMetadata> _dateTimeFieldSpecs = new ArrayList<>();
   private TimeFieldSpecMetadata _timeFieldSpec;
 
 
@@ -51,15 +51,15 @@ public class SchemaWithMetaData {
     _metricFieldSpecs = metricFieldSpecs;
   }
 
-  public List<FieldMetadata> getDateTimeFieldSpecs() {
+  public List<DateTimeFieldSpecMetadata> getDateTimeFieldSpecs() {
     return _dateTimeFieldSpecs;
   }
 
-  public void setDateTimeFieldSpecs(List<FieldMetadata> dateTimeFieldSpecs) {
+  public void setDateTimeFieldSpecs(List<DateTimeFieldSpecMetadata> dateTimeFieldSpecs) {
     _dateTimeFieldSpecs = dateTimeFieldSpecs;
   }
 
-  public FieldMetadata getTimeFieldSpec() {
+  public TimeFieldSpecMetadata getTimeFieldSpec() {
     return _timeFieldSpec;
   }
 
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/io/metadata/TimeFieldSpecMetadata.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/io/metadata/TimeFieldSpecMetadata.java
index 7c47d2e..74e9e98 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/io/metadata/TimeFieldSpecMetadata.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/io/metadata/TimeFieldSpecMetadata.java
@@ -21,36 +21,41 @@ package org.apache.pinot.controller.recommender.io.metadata;
 import com.fasterxml.jackson.annotation.JsonSetter;
 import com.fasterxml.jackson.annotation.Nulls;
 
+import static org.apache.pinot.controller.recommender.rules.io.params.RecommenderConstants.DEFAULT_CARDINALITY;
+import static org.apache.pinot.controller.recommender.rules.io.params.RecommenderConstants.DEFAULT_DATA_LENGTH;
+
 
 public class TimeFieldSpecMetadata extends FieldMetadata {
 
-  private FieldMetadata _incomingGranularitySpec;
-  private FieldMetadata _outgoingGranularitySpec;
+  private TimeGranularitySpecMetadata _incomingGranularitySpec;
+  private TimeGranularitySpecMetadata _outgoingGranularitySpec;
 
-  public FieldMetadata getIncomingGranularitySpec() {
+  public TimeGranularitySpecMetadata getIncomingGranularitySpec() {
     return _incomingGranularitySpec;
   }
 
-  public void setIncomingGranularitySpec(FieldMetadata incomingGranularitySpec) {
+  public void setIncomingGranularitySpec(TimeGranularitySpecMetadata incomingGranularitySpec) {
     _incomingGranularitySpec = incomingGranularitySpec;
     if (_outgoingGranularitySpec == null) {
-      super.setNumValuesPerEntry(incomingGranularitySpec.getNumValuesPerEntry());
-      super.setAverageLength(incomingGranularitySpec.getAverageLength());
+      super.setNumValuesPerEntry(DEFAULT_CARDINALITY);
+      super.setAverageLength(DEFAULT_DATA_LENGTH);
       super.setCardinality(incomingGranularitySpec.getCardinality());
       super.setName(incomingGranularitySpec.getName());
+      super.setDataType(incomingGranularitySpec.getDataType());
     }
   }
 
-  public FieldMetadata getOutgoingGranularitySpec() {
+  public TimeGranularitySpecMetadata getOutgoingGranularitySpec() {
     return _outgoingGranularitySpec;
   }
 
-  public void setOutgoingGranularitySpec(FieldMetadata outgoingGranularitySpec) {
+  public void setOutgoingGranularitySpec(TimeGranularitySpecMetadata outgoingGranularitySpec) {
     _outgoingGranularitySpec = outgoingGranularitySpec;
-    super.setNumValuesPerEntry(outgoingGranularitySpec.getNumValuesPerEntry());
-    super.setAverageLength(outgoingGranularitySpec.getAverageLength());
+    super.setNumValuesPerEntry(DEFAULT_CARDINALITY);
+    super.setAverageLength(DEFAULT_DATA_LENGTH);
     super.setCardinality(outgoingGranularitySpec.getCardinality());
     super.setName(outgoingGranularitySpec.getName());
+    super.setDataType(outgoingGranularitySpec.getDataType());
   }
 
   @Override
@@ -67,7 +72,7 @@ public class TimeFieldSpecMetadata extends FieldMetadata {
 
   @Override
   @JsonSetter(nulls = Nulls.SKIP)
-  public void setCardinality(double cardinality) {
+  public void setCardinality(int cardinality) {
     ;
   }
 
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/io/metadata/TimeGranularitySpecMetadata.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/io/metadata/TimeGranularitySpecMetadata.java
new file mode 100644
index 0000000..33fbc40
--- /dev/null
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/io/metadata/TimeGranularitySpecMetadata.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.controller.recommender.io.metadata;
+
+import com.fasterxml.jackson.annotation.JsonSetter;
+import com.fasterxml.jackson.annotation.Nulls;
+import org.apache.pinot.spi.data.TimeGranularitySpec;
+
+import static org.apache.pinot.controller.recommender.rules.io.params.RecommenderConstants.DEFAULT_CARDINALITY;
+
+
+public class TimeGranularitySpecMetadata extends TimeGranularitySpec {
+  private int _cardinality = DEFAULT_CARDINALITY;
+
+  public int getCardinality() {
+    return _cardinality;
+  }
+
+  @JsonSetter(nulls = Nulls.SKIP)
+  public void setCardinality(int cardinality) {
+    _cardinality = cardinality;
+  }
+}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/io/params/RecommenderConstants.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/io/params/RecommenderConstants.java
index 4a355b9..776113d 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/io/params/RecommenderConstants.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/io/params/RecommenderConstants.java
@@ -85,7 +85,7 @@ public class RecommenderConstants {
   public static final String OFFLINE = "offline";
   public static final String REALTIME = "realtime";
   public static final String HYBRID = "hybrid";
-  public static final double DEFAULT_CARDINALITY = 1;
+  public static final int DEFAULT_CARDINALITY = 1;
   public static final double MIN_CARDINALITY = 1;
   public static final double DEFAULT_AVERAGE_NUM_VALUES_PER_ENTRY = 1d;
   public static final int DEFAULT_NULL_SIZE = 0;
diff --git a/pinot-tools/pom.xml b/pinot-tools/pom.xml
index 43afd0f..a93df73 100644
--- a/pinot-tools/pom.xml
+++ b/pinot-tools/pom.xml
@@ -154,6 +154,11 @@
       <groupId>org.apache.pinot</groupId>
       <artifactId>pinot-minion</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-core</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
   <build>
     <plugins>
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/GenerateDataCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/GenerateDataCommand.java
index fe3e182..f64c8e5 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/GenerateDataCommand.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/GenerateDataCommand.java
@@ -132,11 +132,14 @@ public class GenerateDataCommand extends AbstractBaseAdminCommand implements Com
     final HashMap<String, Integer> cardinality = new HashMap<>();
     final HashMap<String, IntRange> range = new HashMap<>();
     final HashMap<String, Map<String, Object>> pattern = new HashMap<>();
+    final HashMap<String, Double> mvCountMap = new HashMap<>();
+    final HashMap<String, Integer> lengthMap = new HashMap<>();
 
     buildCardinalityRangeMaps(_schemaAnnFile, cardinality, range, pattern);
 
     final DataGeneratorSpec spec =
-        buildDataGeneratorSpec(schema, columns, dataTypes, fieldTypes, timeUnits, cardinality, range, pattern);
+        buildDataGeneratorSpec(schema, columns, dataTypes, fieldTypes, timeUnits, cardinality, range, pattern,
+            mvCountMap, lengthMap);
 
     final DataGenerator gen = new DataGenerator();
     gen.init(spec);
@@ -176,7 +179,8 @@ public class GenerateDataCommand extends AbstractBaseAdminCommand implements Com
 
   private DataGeneratorSpec buildDataGeneratorSpec(Schema schema, List<String> columns,
       HashMap<String, DataType> dataTypes, HashMap<String, FieldType> fieldTypes, HashMap<String, TimeUnit> timeUnits,
-      HashMap<String, Integer> cardinality, HashMap<String, IntRange> range, HashMap<String, Map<String, Object>> pattern) {
+      HashMap<String, Integer> cardinality, HashMap<String, IntRange> range, HashMap<String,
+      Map<String, Object>> pattern, Map<String, Double> mvCountMap, Map<String, Integer> lengthMap) {
     for (final FieldSpec fs : schema.getAllFieldSpecs()) {
       String col = fs.getName();
 
@@ -215,7 +219,7 @@ public class GenerateDataCommand extends AbstractBaseAdminCommand implements Com
       }
     }
 
-    return new DataGeneratorSpec(columns, cardinality, range, pattern, dataTypes, fieldTypes, timeUnits, FileFormat.AVRO,
+    return new DataGeneratorSpec(columns, cardinality, range, pattern, mvCountMap, lengthMap, dataTypes, fieldTypes, timeUnits, FileFormat.AVRO,
         _outDir, _overwrite);
   }
 
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/RealtimeProvisioningHelperCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/RealtimeProvisioningHelperCommand.java
index b7c5deb..addd0e6 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/RealtimeProvisioningHelperCommand.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/RealtimeProvisioningHelperCommand.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.tools.admin.command;
 
+import com.google.common.base.Preconditions;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
@@ -49,6 +50,7 @@ public class RealtimeProvisioningHelperCommand extends AbstractBaseAdminCommand
   private static final int DEFAULT_RETENTION_FOR_DAILY_PUSH = 72;
   private static final int DEFAULT_RETENTION_FOR_WEEKLY_PUSH = 24*7 + 72;
   private static final int DEFAULT_RETENTION_FOR_MONTHLY_PUSH = 24*31 + 72;
+  private static final int DEFAULT_NUMBER_OF_ROWS = 10_000;
 
   @Option(name = "-tableConfigFile", required = true, metaVar = "<String>")
   private String _tableConfigFile;
@@ -71,9 +73,15 @@ public class RealtimeProvisioningHelperCommand extends AbstractBaseAdminCommand
   @Option(name = "-numHours", metaVar = "<String>", usage = "number of hours to consume as comma separated values (default 2,3,4,5,6,7,8,9,10,11,12)")
   private String _numHours = "2,3,4,5,6,7,8,9,10,11,12";
 
-  @Option(name = "-sampleCompletedSegmentDir", required = true, metaVar = "<String>", usage = "Consume from the topic for n hours and provide the path of the segment dir after it completes")
+  @Option(name = "-sampleCompletedSegmentDir", required = false, metaVar = "<String>", usage = "Consume from the topic for n hours and provide the path of the segment dir after it completes")
   private String _sampleCompletedSegmentDir;
 
+  @Option(name = "-schemaWithMetadataFile", required = false, metaVar = "<String>", usage = "Schema file with extra information on each column describing characteristics of data")
+  private String _schemaWithMetadataFile;
+
+  @Option(name = "-numRows", required = false, metaVar = "<String>", usage = "Number of rows to be generated based on schema with metadata file")
+  private int _numRows;
+
   @Option(name = "-ingestionRate", required = true, metaVar = "<String>", usage = "Avg number of messages per second ingested on any one stream partition (assumed all partitions are uniform)")
   private int _ingestionRate;
 
@@ -128,12 +136,20 @@ public class RealtimeProvisioningHelperCommand extends AbstractBaseAdminCommand
     return this;
   }
 
+  public RealtimeProvisioningHelperCommand setNumRows(int numRows) {
+    _numRows = numRows;
+    return this;
+  }
+
   @Override
   public String toString() {
-    return ("RealtimeProvisioningHelper -tableConfigFile " + _tableConfigFile + " -numPartitions "
-        + _numPartitions + " -pushFrequency " + _pushFrequency + " -numHosts " + _numHosts + " -numHours " + _numHours
-        + " -sampleCompletedSegmentDir " + _sampleCompletedSegmentDir + " -ingestionRate "
-        + _ingestionRate + " -maxUsableHostMemory " + _maxUsableHostMemory + " -retentionHours " + _retentionHours);
+    String segmentStr = _sampleCompletedSegmentDir != null
+        ? " -sampleCompletedSegmentDir " + _sampleCompletedSegmentDir
+        : " -schemaWithMetadataFile " + _schemaWithMetadataFile + " -numRows " + _numRows;
+    return "RealtimeProvisioningHelper -tableConfigFile " + _tableConfigFile + " -numPartitions " + _numPartitions
+        + " -pushFrequency " + _pushFrequency + " -numHosts " + _numHosts + " -numHours " + _numHours + segmentStr
+        + " -ingestionRate " + _ingestionRate + " -maxUsableHostMemory " + _maxUsableHostMemory + " -retentionHours "
+        + _retentionHours;
   }
 
   @Override
@@ -145,7 +161,8 @@ public class RealtimeProvisioningHelperCommand extends AbstractBaseAdminCommand
   public String description() {
     return
         "Given the table config, partitions, retention and a sample completed segment for a realtime table to be setup, "
-            + "this tool will provide memory used by each host and an optimal segment size for various combinations of hours to consume and hosts";
+            + "this tool will provide memory used by each host and an optimal segment size for various combinations of hours to consume and hosts. "
+            + "Instead of a completed segment, if schema with characteristics of data is provided, a segment will be generated and used for memory estimation.";
   }
 
   @Override
@@ -171,6 +188,11 @@ public class RealtimeProvisioningHelperCommand extends AbstractBaseAdminCommand
   @Override
   public boolean execute()
       throws IOException {
+
+    boolean segmentProvided = _sampleCompletedSegmentDir != null;
+    boolean characteristicsProvided = _schemaWithMetadataFile != null;
+    Preconditions.checkState(segmentProvided ^ characteristicsProvided, "Either completed segment should be provided or schema with characteristics file!");
+
     LOGGER.info("Executing command: {}", toString());
 
     TableConfig tableConfig;
@@ -217,12 +239,16 @@ public class RealtimeProvisioningHelperCommand extends AbstractBaseAdminCommand
     // TODO: allow multiple segments.
     // Consuming: Build statsHistory using multiple segments. Use multiple data points of (totalDocs,numHoursConsumed) to calculate totalDocs for our numHours
     // Completed: Use multiple (completedSize,numHours) data points to calculate completed size for our numHours
-    File sampleCompletedSegmentFile = new File(_sampleCompletedSegmentDir);
 
     long maxUsableHostMemBytes = DataSizeUtils.toBytes(_maxUsableHostMemory);
 
-    MemoryEstimator memoryEstimator =
-        new MemoryEstimator(tableConfig, sampleCompletedSegmentFile, _ingestionRate, maxUsableHostMemBytes, tableRetentionHours);
+    if (_numRows == 0) {
+      _numRows = DEFAULT_NUMBER_OF_ROWS;
+    }
+
+    MemoryEstimator memoryEstimator = segmentProvided
+        ? new MemoryEstimator(tableConfig, new File(_sampleCompletedSegmentDir), _ingestionRate, maxUsableHostMemBytes, tableRetentionHours)
+        : new MemoryEstimator(tableConfig, new File(_schemaWithMetadataFile), _numRows, _ingestionRate, maxUsableHostMemBytes, tableRetentionHours);
     File sampleStatsHistory = memoryEstimator.initializeStatsHistory();
     memoryEstimator
         .estimateMemoryUsed(sampleStatsHistory, numHosts, numHours, totalConsumingPartitions, _retentionHours);
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/data/generator/BytesGenerator.java b/pinot-tools/src/main/java/org/apache/pinot/tools/data/generator/BytesGenerator.java
new file mode 100644
index 0000000..c7624d0
--- /dev/null
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/data/generator/BytesGenerator.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.tools.data.generator;
+
+import org.apache.commons.codec.binary.Hex;
+
+
+/**
+ * A class to generating data for a column with type BYTES
+ */
+public class BytesGenerator implements Generator {
+  private static final double DEFAULT_NUMBER_OF_VALUES_PER_ENTRY = 1;
+
+  private final StringGenerator _stringGenerator;
+
+  public BytesGenerator(Integer cardinality, Integer entryLength) {
+    _stringGenerator = new StringGenerator(cardinality, DEFAULT_NUMBER_OF_VALUES_PER_ENTRY, entryLength);
+  }
+
+  @Override
+  public void init() {
+    _stringGenerator.init();
+  }
+
+  @Override
+  public Object next() {
+    String next = (String) _stringGenerator.next();
+    return Hex.encodeHexString(next.getBytes());
+  }
+}
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/data/generator/DataGenerator.java b/pinot-tools/src/main/java/org/apache/pinot/tools/data/generator/DataGenerator.java
index 7944ecb..0ac754d 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/data/generator/DataGenerator.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/data/generator/DataGenerator.java
@@ -18,6 +18,9 @@
  */
 package org.apache.pinot.tools.data.generator;
 
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.math.IntRange;
@@ -31,7 +34,6 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
@@ -71,26 +73,27 @@ public class DataGenerator {
     for (final String column : genSpec.getColumns()) {
       DataType dataType = genSpec.getDataTypesMap().get(column);
 
+      Generator generator;
       if (genSpec.getPatternMap().containsKey(column)) {
-        generators.put(column,
-                GeneratorFactory.getGeneratorFor(
-                        PatternType.valueOf(genSpec.getPatternMap().get(column).get("type").toString()),
-                        genSpec.getPatternMap().get(column)));
+         generator = GeneratorFactory
+            .getGeneratorFor(PatternType.valueOf(genSpec.getPatternMap().get(column).get("type").toString()),
+                genSpec.getPatternMap().get(column));
 
       } else if (genSpec.getCardinalityMap().containsKey(column)) {
-        generators.put(column, GeneratorFactory.getGeneratorFor(dataType, genSpec.getCardinalityMap().get(column)));
-
+        generator = GeneratorFactory.getGeneratorFor(dataType,
+            genSpec.getCardinalityMap().get(column),
+            genSpec.getMvCountMap().get(column),
+            genSpec.getLengthMap().get(column),
+            genSpec.getTimeUnitMap().get(column));
       } else if (genSpec.getRangeMap().containsKey(column)) {
         IntRange range = genSpec.getRangeMap().get(column);
-        generators.put(column,
-                GeneratorFactory.getGeneratorFor(dataType, range.getMinimumInteger(), range.getMaximumInteger()));
-
+        generator = GeneratorFactory.getGeneratorFor(dataType, range.getMinimumInteger(), range.getMaximumInteger());
       } else {
         LOGGER.error("cardinality for this column does not exist : " + column);
         throw new RuntimeException("cardinality for this column does not exist");
       }
-
-      generators.get(column).init();
+      generator.init();
+      generators.put(column, generator);
     }
   }
 
@@ -115,7 +118,8 @@ public class DataGenerator {
         for (int j = 0; j < numPerFiles; j++) {
           Object[] values = new Object[genSpec.getColumns().size()];
           for (int k = 0; k < genSpec.getColumns().size(); k++) {
-            values[k] = generators.get(genSpec.getColumns().get(k)).next();
+            Object next = generators.get(genSpec.getColumns().get(k)).next();
+            values[k] = serializeIfMultiValue(next);
           }
           writer.append(StringUtils.join(values, ",")).append('\n');
         }
@@ -123,6 +127,13 @@ public class DataGenerator {
     }
   }
 
+  private Object serializeIfMultiValue(Object obj) {
+    if (obj instanceof List) {
+      return StringUtils.join((List) obj, ";");
+    }
+    return obj;
+  }
+
   public Schema fetchSchema() {
     final Schema schema = new Schema();
     for (final String column : genSpec.getColumns()) {
@@ -163,7 +174,7 @@ public class DataGenerator {
 
   public static void main(String[] args)
       throws IOException {
-    final String[] columns = {"column1", "column2", "column3", "column4", "column5"};
+
     final Map<String, DataType> dataTypes = new HashMap<>();
     final Map<String, FieldType> fieldTypes = new HashMap<>();
     final Map<String, TimeUnit> timeUnits = new HashMap<>();
@@ -171,18 +182,67 @@ public class DataGenerator {
     final Map<String, Integer> cardinality = new HashMap<>();
     final Map<String, IntRange> range = new HashMap<>();
     final Map<String, Map<String, Object>> template = new HashMap<>();
-
-    for (final String col : columns) {
-      dataTypes.put(col, DataType.INT);
-      fieldTypes.put(col, FieldType.DIMENSION);
-      cardinality.put(col, 1000);
+    Map<String, Double> mvCountMap = new HashMap<>();
+    Map<String, Integer> lengthMap = new HashMap<>();
+    List<String> columnNames = new ArrayList<>();
+
+    int cardinalityValue = 5;
+    int strLength = 5;
+
+    String colName = "colInt";
+    dataTypes.put(colName, DataType.INT);
+    fieldTypes.put(colName, FieldType.DIMENSION);
+    cardinality.put(colName, cardinalityValue);
+    columnNames.add(colName);
+    mvCountMap.put(colName, 3.7);
+
+    String colName2 = "colFloat";
+    dataTypes.put(colName2, DataType.FLOAT);
+    fieldTypes.put(colName2, FieldType.DIMENSION);
+    cardinality.put(colName2, cardinalityValue);
+    columnNames.add(colName2);
+    mvCountMap.put(colName2, 3.7);
+
+    String colName3 = "colString";
+    dataTypes.put(colName3, DataType.STRING);
+    fieldTypes.put(colName3, FieldType.DIMENSION);
+    cardinality.put(colName3, cardinalityValue);
+    columnNames.add(colName3);
+    mvCountMap.put(colName3, 3.7);
+    lengthMap.put(colName3, strLength);
+
+    String colName4 = "metric";
+    dataTypes.put(colName4, DataType.DOUBLE);
+    fieldTypes.put(colName4, FieldType.METRIC);
+    cardinality.put(colName4, cardinalityValue);
+    columnNames.add(colName4);
+
+    String colName5 = "colBytes";
+    dataTypes.put(colName5, DataType.BYTES);
+    fieldTypes.put(colName5, FieldType.DIMENSION);
+    cardinality.put(colName5, cardinalityValue);
+    columnNames.add(colName5);
+    mvCountMap.put(colName5, 3.7);
+    lengthMap.put(colName5, strLength + 1);
+
+    for (int i = 0; i < 2; i++) {
+      colName = "colString" + (i + 2);
+      dataTypes.put(colName, DataType.STRING);
+      fieldTypes.put(colName, FieldType.DIMENSION);
+      cardinality.put(colName, cardinalityValue);
+      columnNames.add(colName);
+      lengthMap.put(colName, strLength + i + 2);
     }
+
+    String outputDir = Paths.get(System.getProperty("java.io.tmpdir"), "csv-data").toString();
     final DataGeneratorSpec spec =
-        new DataGeneratorSpec(Arrays.asList(columns), cardinality, range, template, dataTypes, fieldTypes, timeUnits,
-            FileFormat.AVRO, "/tmp/out", true);
+        new DataGeneratorSpec(columnNames, cardinality, range, template, mvCountMap, lengthMap,
+            dataTypes, fieldTypes, timeUnits,
+            FileFormat.CSV, outputDir, true);
 
     final DataGenerator gen = new DataGenerator();
     gen.init(spec);
-    gen.generateAvro(1000000L, 2);
+    gen.generateCsv(100, 1);
+    System.out.println("CSV data is generated under: " + outputDir);
   }
 }
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/data/generator/DataGeneratorSpec.java b/pinot-tools/src/main/java/org/apache/pinot/tools/data/generator/DataGeneratorSpec.java
index c98cd4d..4ffeeb7 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/data/generator/DataGeneratorSpec.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/data/generator/DataGeneratorSpec.java
@@ -38,6 +38,8 @@ public class DataGeneratorSpec {
   private final Map<String, Integer> cardinalityMap;
   private final Map<String, IntRange> rangeMap;
   private final Map<String, Map<String, Object>> patternMap;
+  private final Map<String, Double> mvCountMap; // map of column name to average number of values per entry
+  private final Map<String, Integer> lengthMap; // map of column name to average length of th entry (used for string/byte generator)
 
   private final Map<String, DataType> dataTypesMap;
   private final Map<String, FieldType> fieldTypesMap;
@@ -48,18 +50,20 @@ public class DataGeneratorSpec {
   private final boolean overrideOutDir;
 
   public DataGeneratorSpec() {
-    this(new ArrayList<String>(), new HashMap<>(), new HashMap<>(), new HashMap<>(),
+    this(new ArrayList<String>(), new HashMap<>(), new HashMap<>(), new HashMap<>(), new HashMap<>(), new HashMap<>(),
         new HashMap<>(), new HashMap<>(), new HashMap<>(),
         FileFormat.AVRO, "/tmp/dataGen", true);
   }
 
   public DataGeneratorSpec(List<String> columns, Map<String, Integer> cardinalityMap, Map<String, IntRange> rangeMap,
-      Map<String, Map<String, Object>> patternMap, Map<String, DataType> dataTypesMap, Map<String, FieldType> fieldTypesMap, Map<String, TimeUnit> timeUnitMap,
+      Map<String, Map<String, Object>> patternMap, Map<String, Double> mvCountMap, Map<String, Integer> lengthMap, Map<String, DataType> dataTypesMap, Map<String, FieldType> fieldTypesMap, Map<String, TimeUnit> timeUnitMap,
       FileFormat format, String outputDir, boolean override) {
     this.columns = columns;
     this.cardinalityMap = cardinalityMap;
     this.rangeMap = rangeMap;
     this.patternMap = patternMap;
+    this.mvCountMap = mvCountMap;
+    this.lengthMap = lengthMap;
 
     outputFileFormat = format;
     this.outputDir = outputDir;
@@ -102,6 +106,14 @@ public class DataGeneratorSpec {
     return patternMap;
   }
 
+  public Map<String, Double> getMvCountMap() {
+    return mvCountMap;
+  }
+
+  public Map<String, Integer> getLengthMap() {
+    return lengthMap;
+  }
+
   public FileFormat getOutputFileFormat() {
     return outputFileFormat;
   }
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/data/generator/GeneratorFactory.java b/pinot-tools/src/main/java/org/apache/pinot/tools/data/generator/GeneratorFactory.java
index 215bf9f..94120b6 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/data/generator/GeneratorFactory.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/data/generator/GeneratorFactory.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.tools.data.generator;
 
+import java.util.concurrent.TimeUnit;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 
 import java.util.Map;
@@ -28,12 +29,18 @@ import java.util.Map;
  */
 
 public class GeneratorFactory {
-  public static Generator getGeneratorFor(DataType type, int cardinality) {
+  public static Generator getGeneratorFor(DataType type, Integer cardinality, Double numberOfValuesPerEntry,
+      Integer entryLength, TimeUnit timeUnit) {
     if (type == DataType.STRING) {
-      return new StringGenerator(cardinality);
+      return new StringGenerator(cardinality, numberOfValuesPerEntry, entryLength);
     }
-
-    return new NumberGenerator(cardinality, type, false);
+    if (type == DataType.BYTES) {
+      return new BytesGenerator(cardinality, entryLength);
+    }
+    if (timeUnit != null) {
+      return new TimeGenerator(cardinality, type, timeUnit);
+    }
+    return new NumberGenerator(cardinality, type, numberOfValuesPerEntry);
   }
 
   public static Generator getGeneratorFor(DataType dataType, int start, int end) {
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/data/generator/MultiValueGeneratorHelper.java b/pinot-tools/src/main/java/org/apache/pinot/tools/data/generator/MultiValueGeneratorHelper.java
new file mode 100644
index 0000000..ff62818
--- /dev/null
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/data/generator/MultiValueGeneratorHelper.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.tools.data.generator;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.function.Supplier;
+
+
+/**
+ * A helper class for generating multi value entries
+ */
+public class MultiValueGeneratorHelper {
+
+  /**
+   * Generate MV entries
+   *
+   * @param numberOfValuesPerEntry number of values per each row
+   * @param rand random object
+   * @param nextItemFunc function to get the next random item
+   * @return
+   */
+  public static List<Object> generateMultiValueEntries(double numberOfValuesPerEntry, Random rand,
+      Supplier<Object> nextItemFunc) {
+    List<Object> entries = new ArrayList<>();
+    int i = 0;
+    for (; i < numberOfValuesPerEntry - 1; i++) {
+      entries.add(nextItemFunc.get());
+    }
+    // last item
+    if (rand.nextDouble() < numberOfValuesPerEntry - i) {
+      entries.add(nextItemFunc.get());
+    }
+    return entries;
+  }
+}
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/data/generator/NumberGenerator.java b/pinot-tools/src/main/java/org/apache/pinot/tools/data/generator/NumberGenerator.java
index 32d697f..15bdba5 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/data/generator/NumberGenerator.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/data/generator/NumberGenerator.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.tools.data.generator;
 
+import com.google.common.base.Preconditions;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
@@ -32,10 +33,11 @@ import org.slf4j.LoggerFactory;
 
 public class NumberGenerator implements Generator {
   private static final Logger LOGGER = LoggerFactory.getLogger(NumberGenerator.class);
+  private static final double DEFAULT_NUMBER_OF_VALUES_PER_ENTRY = 1;
 
   private final int cardinality;
   private final DataType columnType;
-  private final boolean sorted;
+  private final double numberOfValuesPerEntry;
 
   private List<Integer> intValues;
   private List<Double> doubleValues;
@@ -44,10 +46,13 @@ public class NumberGenerator implements Generator {
 
   private final Random random;
 
-  public NumberGenerator(Integer cardinality, DataType type, boolean sorted) {
-    this.cardinality = cardinality.intValue();
+  public NumberGenerator(Integer cardinality, DataType type, Double numberOfValuesPerEntry) {
+    this.cardinality = cardinality;
+    this.numberOfValuesPerEntry =
+        numberOfValuesPerEntry != null ? numberOfValuesPerEntry : DEFAULT_NUMBER_OF_VALUES_PER_ENTRY;
+    Preconditions.checkState(this.numberOfValuesPerEntry >= 1,
+        "Number of values per entry (should be >= 1): " + this.numberOfValuesPerEntry);
     columnType = type;
-    this.sorted = sorted;
     random = new Random(System.currentTimeMillis());
   }
 
@@ -61,7 +66,7 @@ public class NumberGenerator implements Generator {
         final int start = rand.nextInt(cardinality);
         final int end = start + cardinality;
         for (int i = start; i < end; i++) {
-          intValues.add(new Integer(i));
+          intValues.add(i);
         }
         break;
       case LONG:
@@ -69,15 +74,15 @@ public class NumberGenerator implements Generator {
         final long longStart = rand.nextInt(cardinality);
         final long longEnd = longStart + cardinality;
         for (long i = longStart; i < longEnd; i++) {
-          longValues.add(new Long(i));
+          longValues.add(i);
         }
         break;
       case FLOAT:
         floatValues = new ArrayList<Float>();
-        final float floatStart = rand.nextFloat() * rand.nextInt(1000);
+        final float floatStart = Math.round(rand.nextFloat()* 100.0f) / 100.0f; // round to two decimal places
         int floatCounter = 1;
         while (true) {
-          floatValues.add(new Float(floatStart + 0.1f));
+          floatValues.add(floatStart + floatCounter);
           if (floatCounter == cardinality) {
             break;
           }
@@ -86,10 +91,10 @@ public class NumberGenerator implements Generator {
         break;
       case DOUBLE:
         doubleValues = new ArrayList<Double>();
-        final double doubleStart = rand.nextDouble() * rand.nextInt(10000);
+        final double doubleStart = Math.round(rand.nextDouble() * 100.0) / 100.0; // round to two decimal places
         int doubleCounter = 1;
         while (true) {
-          doubleValues.add(new Double(doubleStart + 0.1d));
+          doubleValues.add(doubleStart + doubleCounter);
           if (doubleCounter == cardinality) {
             break;
           }
@@ -104,6 +109,13 @@ public class NumberGenerator implements Generator {
 
   @Override
   public Object next() {
+    if (numberOfValuesPerEntry == 1) {
+      return getNextNumber();
+    }
+    return MultiValueGeneratorHelper.generateMultiValueEntries(numberOfValuesPerEntry, random, this::getNextNumber);
+  }
+
+  private Number getNextNumber() {
     switch (columnType) {
       case INT:
         return intValues.get(random.nextInt(cardinality));
@@ -120,7 +132,7 @@ public class NumberGenerator implements Generator {
   }
 
   public static void main(String[] args) {
-    final NumberGenerator gen = new NumberGenerator(10000000, DataType.LONG, false);
+    final NumberGenerator gen = new NumberGenerator(10000000, DataType.LONG, null);
     gen.init();
     for (int i = 0; i < 1000; i++) {
       System.out.println(gen.next());
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/data/generator/StringGenerator.java b/pinot-tools/src/main/java/org/apache/pinot/tools/data/generator/StringGenerator.java
index 8e0b963..7770364 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/data/generator/StringGenerator.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/data/generator/StringGenerator.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.tools.data.generator;
 
+import com.google.common.base.Preconditions;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
@@ -31,15 +32,23 @@ import org.apache.commons.lang.RandomStringUtils;
  */
 
 public class StringGenerator implements Generator {
-
-  private static final int lengthOfEachString = 10;
+  private static final double DEFAULT_NUMBER_OF_VALUES_PER_ENTRY = 1;
+  private static final int DEFAULT_LENGTH_OF_EACH_STRING = 10;
 
   private final int cardinality;
   private final Random rand;
+  private final double numberOfValuesPerEntry;
+  private final int lengthOfEachString;
+
   private List<String> vals;
 
-  public StringGenerator(Integer cardinality) {
-    this.cardinality = cardinality.intValue();
+  public StringGenerator(Integer cardinality, Double numberOfValuesPerEntry, Integer lengthOfEachString) {
+    this.cardinality = cardinality;
+    this.numberOfValuesPerEntry =
+        numberOfValuesPerEntry != null ? numberOfValuesPerEntry : DEFAULT_NUMBER_OF_VALUES_PER_ENTRY;
+    this.lengthOfEachString = lengthOfEachString != null ? lengthOfEachString : DEFAULT_LENGTH_OF_EACH_STRING;
+    Preconditions.checkState(this.numberOfValuesPerEntry >= 1,
+        "Number of values per entry (should be >= 1): " + this.numberOfValuesPerEntry);
     rand = new Random(System.currentTimeMillis());
   }
 
@@ -56,11 +65,18 @@ public class StringGenerator implements Generator {
 
   @Override
   public Object next() {
+    if (numberOfValuesPerEntry == 1) {
+      return getNextString();
+    }
+    return MultiValueGeneratorHelper.generateMultiValueEntries(numberOfValuesPerEntry, rand, this::getNextString);
+  }
+
+  private String getNextString() {
     return vals.get(rand.nextInt(cardinality));
   }
 
   public static void main(String[] args) {
-    final StringGenerator gen = new StringGenerator(10000);
+    final StringGenerator gen = new StringGenerator(10000, null, null);
     gen.init();
     for (int i = 0; i < 1000000; i++) {
       System.out.println(gen.next());
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/data/generator/TimeGenerator.java b/pinot-tools/src/main/java/org/apache/pinot/tools/data/generator/TimeGenerator.java
new file mode 100644
index 0000000..29d80e1
--- /dev/null
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/data/generator/TimeGenerator.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.tools.data.generator;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Date;
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.spi.data.FieldSpec;
+
+
+/**
+ * A class to generate data for a time column
+ */
+public class TimeGenerator implements Generator {
+  private static final double DEFAULT_NUMBER_OF_VALUES_PER_ENTRY = 1;
+
+  private final NumberGenerator _numberGenerator;
+  private final FieldSpec.DataType _dataType;
+  private final Number _initialValue;
+
+  public TimeGenerator(Integer cardinality, FieldSpec.DataType dataType, TimeUnit timeUnit) {
+    _numberGenerator = new NumberGenerator(cardinality, dataType, DEFAULT_NUMBER_OF_VALUES_PER_ENTRY);
+    _dataType = dataType;
+    Date now = new Date();
+    _initialValue = convert(now, timeUnit, dataType);
+  }
+
+  @Override
+  public void init() {
+    _numberGenerator.init();
+  }
+
+  @Override
+  public Object next() {
+    Object next = _numberGenerator.next();
+    if (_dataType == FieldSpec.DataType.LONG) {
+      return ((long) next) + _initialValue.longValue();
+    }
+    return ((int) next) + _initialValue.intValue();
+  }
+
+  @VisibleForTesting
+  static Number convert(Date date, TimeUnit timeUnit, FieldSpec.DataType dataType) {
+    long convertedTime = timeUnit.convert(date.getTime(), TimeUnit.MILLISECONDS);
+    if (dataType == FieldSpec.DataType.LONG) {
+      return convertedTime;
+    }
+    if (dataType == FieldSpec.DataType.INT) {
+      return (int) convertedTime;
+    }
+    throw new IllegalArgumentException("Time column can be only INT or LONG: " + dataType);
+  }
+}
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/realtime/provisioning/MemoryEstimator.java b/pinot-tools/src/main/java/org/apache/pinot/tools/realtime/provisioning/MemoryEstimator.java
index ef0484f..ca760a2 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/realtime/provisioning/MemoryEstimator.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/realtime/provisioning/MemoryEstimator.java
@@ -20,23 +20,52 @@ package org.apache.pinot.tools.realtime.provisioning;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.file.Paths;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.io.FileUtils;
+import org.apache.pinot.controller.recommender.io.metadata.DateTimeFieldSpecMetadata;
+import org.apache.pinot.controller.recommender.io.metadata.FieldMetadata;
+import org.apache.pinot.controller.recommender.io.metadata.SchemaWithMetaData;
+import org.apache.pinot.controller.recommender.io.metadata.TimeFieldSpecMetadata;
+import org.apache.pinot.controller.recommender.io.metadata.TimeGranularitySpecMetadata;
 import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
+import org.apache.pinot.common.segment.ReadMode;
 import org.apache.pinot.core.data.readers.PinotSegmentRecordReader;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
+import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment;
+import org.apache.pinot.core.indexsegment.immutable.ImmutableSegmentLoader;
 import org.apache.pinot.core.indexsegment.mutable.MutableSegmentImpl;
 import org.apache.pinot.core.io.readerwriter.RealtimeIndexOffHeapMemoryManager;
 import org.apache.pinot.core.io.writer.impl.DirectMemoryManager;
 import org.apache.pinot.core.realtime.impl.RealtimeSegmentConfig;
 import org.apache.pinot.core.realtime.impl.RealtimeSegmentStatsHistory;
+import org.apache.pinot.core.segment.creator.SegmentIndexCreationDriver;
+import org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
 import org.apache.pinot.core.segment.index.metadata.SegmentMetadataImpl;
+import org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.DateTimeFormatSpec;
 import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.FileFormat;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.utils.DataSizeUtils;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.tools.data.generator.DataGenerator;
+import org.apache.pinot.tools.data.generator.DataGeneratorSpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
@@ -70,6 +99,9 @@ public class MemoryEstimator {
   private String[][] _consumingMemoryPerHost;
   private String[][] _numSegmentsQueriedPerHost;
 
+  /**
+   * Constructor used for processing the given completed segment
+   */
   public MemoryEstimator(TableConfig tableConfig, File sampleCompletedSegment, int ingestionRate,
       long maxUsableHostMemory, int tableRetentionHours) {
     _maxUsableHostMemory = maxUsableHostMemory;
@@ -108,6 +140,18 @@ public class MemoryEstimator {
   }
 
   /**
+   * Constructor used for processing the given data characteristics (instead of completed segment)
+   */
+  public MemoryEstimator(TableConfig tableConfig, File schemaWithMetadataFile, int numberOfRows, int ingestionRate,
+      long maxUsableHostMemory, int tableRetentionHours) {
+    this(tableConfig,
+        generateCompletedSegment(schemaWithMetadataFile, tableConfig, numberOfRows),
+        ingestionRate,
+        maxUsableHostMemory,
+        tableRetentionHours);
+  }
+
+  /**
    * Initialize the stats file using the sample segment provided.
    * <br>This involves indexing each row of the sample segment using MutableSegmentImpl. This is equivalent to consuming the rows of a segment.
    * Although they will be in a different order than consumed by the host, the stats should be equivalent.
@@ -387,4 +431,160 @@ public class MemoryEstimator {
   public String[][] getNumSegmentsQueriedPerHost() {
     return _numSegmentsQueriedPerHost;
   }
+
+  private static File generateCompletedSegment(File schemaWithMetadataFile, TableConfig tableConfig, int numberOfRows) {
+    SegmentGenerator segmentGenerator = new SegmentGenerator(schemaWithMetadataFile, tableConfig, numberOfRows, true);
+    return segmentGenerator.generate();
+  }
+
+  /**
+   * This class is used in Memory Estimator to generate segment based on the the given characteristics of data
+   */
+  public static class SegmentGenerator {
+    private static final Logger LOGGER = LoggerFactory.getLogger(SegmentGenerator.class);
+    private static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd_HH:mm:ss");
+
+    private File _schemaWithMetadataFile;
+    private TableConfig _tableConfig;
+    private int _numberOfRows;
+    private boolean _deleteCsv;
+
+    public SegmentGenerator(File schemaWithMetadataFile, TableConfig tableConfig, int numberOfRows, boolean deleteCsv) {
+      _schemaWithMetadataFile = schemaWithMetadataFile;
+      _tableConfig = tableConfig;
+      _numberOfRows = numberOfRows;
+      _deleteCsv = deleteCsv;
+    }
+
+    public File generate() {
+      Date now = new Date();
+      File csvDataFile = generateData(now);
+      File segment = createSegment(csvDataFile, now);
+      if (_deleteCsv) {
+        csvDataFile.delete();
+      }
+      return segment;
+    }
+
+    private File generateData(Date now) {
+
+      // deserialize schema
+      SchemaWithMetaData schema;
+      try {
+        schema = JsonUtils.fileToObject(_schemaWithMetadataFile, SchemaWithMetaData.class);
+      } catch (Exception e) {
+        throw new RuntimeException(String.format("Cannot read schema file '%s' to SchemaWithMetaData object.",
+            _schemaWithMetadataFile));
+      }
+
+      // create maps of "column name" to ...
+      Map<String, Integer> lengths = new HashMap<>();
+      Map<String, Double> mvCounts = new HashMap<>();
+      Map<String, Integer> cardinalities = new HashMap<>();
+      Map<String, FieldSpec.DataType> dataTypes = new HashMap<>();
+      Map<String, FieldSpec.FieldType> fieldTypes = new HashMap<>();
+      Map<String, TimeUnit> timeUnits = new HashMap<>();
+      List<String> colNames = new ArrayList<>();
+      List<FieldMetadata> dimensions = schema.getDimensionFieldSpecs();
+      List<FieldMetadata> metrics = schema.getMetricFieldSpecs();
+      List<DateTimeFieldSpecMetadata> dateTimes = schema.getDateTimeFieldSpecs();
+      Stream.concat(Stream.concat(dimensions.stream(), metrics.stream()), dateTimes.stream()).forEach(column -> {
+        String name = column.getName();
+        colNames.add(name);
+        lengths.put(name, column.getAverageLength());
+        mvCounts.put(name, column.getNumValuesPerEntry());
+        cardinalities.put(name, column.getCardinality());
+        dataTypes.put(name, column.getDataType());
+        fieldTypes.put(name, column.getFieldType());
+      });
+      dateTimes.forEach(dateTimeColumn -> {
+        TimeUnit timeUnit = new DateTimeFormatSpec(dateTimeColumn.getFormat()).getColumnUnit();
+        timeUnits.put(dateTimeColumn.getName(), timeUnit);
+      });
+      TimeFieldSpecMetadata timeSpec = schema.getTimeFieldSpec();
+      if (timeSpec != null) {
+        String name = timeSpec.getName();
+        colNames.add(name);
+        cardinalities.put(name, timeSpec.getCardinality());
+        dataTypes.put(name, timeSpec.getDataType());
+        fieldTypes.put(name, timeSpec.getFieldType());
+        TimeGranularitySpecMetadata timeGranSpec = timeSpec.getOutgoingGranularitySpec() != null
+            ? timeSpec.getOutgoingGranularitySpec()
+            : timeSpec.getIncomingGranularitySpec();
+        timeUnits.put(name, timeGranSpec.getTimeType());
+      }
+
+      // generate data
+      String outputDir = getOutputDir(now, "-csv");
+      DataGeneratorSpec spec =
+          new DataGeneratorSpec(colNames, cardinalities, new HashMap<>(), new HashMap<>(), mvCounts, lengths, dataTypes,
+              fieldTypes, timeUnits, FileFormat.CSV, outputDir, true);
+      DataGenerator dataGenerator = new DataGenerator();
+      try {
+        dataGenerator.init(spec);
+        dataGenerator.generateCsv(_numberOfRows, 1);
+        File outputFile = Paths.get(outputDir, "output_0.csv").toFile();
+        LOGGER.info("Successfully generated data file: {}", outputFile);
+        return outputFile;
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    private File createSegment(File csvDataFile, Date now) {
+
+      // deserialize schema
+      Schema schema;
+      try {
+        schema = JsonUtils.fileToObject(_schemaWithMetadataFile, Schema.class);
+      } catch (Exception e) {
+        throw new RuntimeException(
+            String.format("Cannot read schema file '%s' to schema object.", _schemaWithMetadataFile));
+      }
+
+      // create segment
+      LOGGER.info("Started creating segment from file: {}", csvDataFile);
+      String outDir = getOutputDir(now, "-segment");
+      SegmentGeneratorConfig segmentGeneratorConfig = getSegmentGeneratorConfig(csvDataFile, schema, outDir);
+      SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
+      try {
+        driver.init(segmentGeneratorConfig);
+        driver.build();
+      } catch (Exception e) {
+        throw new RuntimeException("Caught exception while generating segment from file: " + csvDataFile, e);
+      }
+      String segmentName = driver.getSegmentName();
+      File indexDir = new File(outDir, segmentName);
+      LOGGER.info("Successfully created segment: {} at directory: {}", segmentName, indexDir);
+
+      // verify segment
+      LOGGER.info("Verifying the segment by loading it");
+      ImmutableSegment segment;
+      try {
+        segment = ImmutableSegmentLoader.load(indexDir, ReadMode.mmap);
+      } catch (Exception e) {
+        throw new RuntimeException("Caught exception while verifying the created segment", e);
+      }
+      LOGGER.info("Successfully loaded segment: {} of size: {} bytes", segmentName,
+          segment.getSegmentSizeBytes());
+      segment.destroy();
+
+      return indexDir;
+    }
+
+    private SegmentGeneratorConfig getSegmentGeneratorConfig(File csvDataFile, Schema schema, String outDir) {
+      SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(_tableConfig, schema);
+      segmentGeneratorConfig.setInputFilePath(csvDataFile.getPath());
+      segmentGeneratorConfig.setFormat(FileFormat.CSV);
+      segmentGeneratorConfig.setOutDir(outDir);
+      segmentGeneratorConfig.setReaderConfig(new CSVRecordReaderConfig());
+      segmentGeneratorConfig.setTableName(_tableConfig.getTableName());
+      segmentGeneratorConfig.setSequenceId(0);
+      return segmentGeneratorConfig;
+    }
+
+    private String getOutputDir(Date date, String suffix) {
+      return Paths.get(System.getProperty("java.io.tmpdir"), DATE_FORMAT.format(date) + suffix).toString();
+    }
+  }
 }
diff --git a/pinot-tools/src/test/java/org/apache/pinot/tools/data/generator/MultiValueGeneratorHelperTest.java b/pinot-tools/src/test/java/org/apache/pinot/tools/data/generator/MultiValueGeneratorHelperTest.java
new file mode 100644
index 0000000..6a98074
--- /dev/null
+++ b/pinot-tools/src/test/java/org/apache/pinot/tools/data/generator/MultiValueGeneratorHelperTest.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.tools.data.generator;
+
+import java.util.Arrays;
+import java.util.Random;
+import java.util.function.Supplier;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.*;
+import static org.testng.Assert.*;
+
+
+public class MultiValueGeneratorHelperTest {
+
+  @Test
+  public void testGenerateMultiValueEntries() {
+
+    Random rand = mock(Random.class);
+    when(rand.nextInt()).thenReturn(10, 20, 30, 40, 50, 60, 70);
+    when(rand.nextDouble()).thenReturn(0.4);
+    Supplier<Object> next = rand::nextInt;
+
+    double numberOfValuesPerEntry = 3.3;
+    assertEquals(MultiValueGeneratorHelper.generateMultiValueEntries(numberOfValuesPerEntry, rand, next),
+        Arrays.asList(10, 20, 30));
+
+    numberOfValuesPerEntry = 3.5;
+    assertEquals(MultiValueGeneratorHelper.generateMultiValueEntries(numberOfValuesPerEntry, rand, next),
+        Arrays.asList(40, 50, 60, 70));
+  }
+}
diff --git a/pinot-tools/src/test/java/org/apache/pinot/tools/data/generator/TimeGeneratorTest.java b/pinot-tools/src/test/java/org/apache/pinot/tools/data/generator/TimeGeneratorTest.java
new file mode 100644
index 0000000..b235a80
--- /dev/null
+++ b/pinot-tools/src/test/java/org/apache/pinot/tools/data/generator/TimeGeneratorTest.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.tools.data.generator;
+
+import java.util.Date;
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.*;
+
+
+public class TimeGeneratorTest {
+
+  @Test
+  public void testConvert() {
+    int oneHourInMillis = 60 * 60 * 1000;
+    Date date = new Date(oneHourInMillis + 1);
+
+    // seconds
+    Number convertedTime = TimeGenerator.convert(date, TimeUnit.SECONDS, FieldSpec.DataType.LONG);
+    assertTrue(convertedTime instanceof Long);
+    assertEquals(3600L, convertedTime);
+
+    // minutes
+    convertedTime = TimeGenerator.convert(date, TimeUnit.MINUTES, FieldSpec.DataType.INT);
+    assertTrue(convertedTime instanceof Integer);
+    assertEquals(60, convertedTime);
+
+    // check hours
+    convertedTime = TimeGenerator.convert(date, TimeUnit.HOURS, FieldSpec.DataType.INT);
+    assertTrue(convertedTime instanceof Integer);
+    assertEquals(1, convertedTime);
+  }
+}
diff --git a/pinot-tools/src/test/java/org/apache/pinot/tools/realtime/provisioning/MemoryEstimatorTest.java b/pinot-tools/src/test/java/org/apache/pinot/tools/realtime/provisioning/MemoryEstimatorTest.java
new file mode 100644
index 0000000..46f2727
--- /dev/null
+++ b/pinot-tools/src/test/java/org/apache/pinot/tools/realtime/provisioning/MemoryEstimatorTest.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.tools.realtime.provisioning;
+
+import java.io.File;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.function.Consumer;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+
+public class MemoryEstimatorTest {
+
+  @Test
+  public void testSegmentGenerator() throws Exception {
+    runTest("memory_estimation/schema-with-metadata.json", metadata -> {
+      assertEquals(extract(metadata, "segment.total.docs = (\\d+)"), "10000");
+      assertEquals(extract(metadata, "column.colInt.cardinality = (\\d+)"), "100");
+      assertEquals(extract(metadata, "column.colIntMV.cardinality = (\\d+)"), "150");
+      assertEquals(extract(metadata, "column.colFloat.cardinality = (\\d+)"), "200");
+      assertEquals(extract(metadata, "column.colFloatMV.cardinality = (\\d+)"), "250");
+      assertEquals(extract(metadata, "column.colString.cardinality = (\\d+)"), "300");
+      assertEquals(extract(metadata, "column.colStringMV.cardinality = (\\d+)"), "350");
+      assertEquals(extract(metadata, "column.colBytes.cardinality = (\\d+)"), "400");
+      assertEquals(extract(metadata, "column.colLong.cardinality = (\\d+)"), "500");
+      assertEquals(extract(metadata, "column.colLongMV.cardinality = (\\d+)"), "550");
+      assertEquals(extract(metadata, "column.colDouble.cardinality = (\\d+)"), "600");
+      assertEquals(extract(metadata, "column.colDoubleMV.cardinality = (\\d+)"), "650");
+      assertEquals(extract(metadata, "column.colDoubleMetric.cardinality = (\\d+)"), "700");
+      assertEquals(extract(metadata, "column.colFloatMetric.cardinality = (\\d+)"), "800");
+      assertEquals(extract(metadata, "column.colTime.cardinality = (\\d+)"), "900");
+      assertEquals(extract(metadata, "column.colInt.maxNumberOfMultiValues = (\\d+)"), "0");
+      assertEquals(extract(metadata, "column.colIntMV.maxNumberOfMultiValues = (\\d+)"), "3");
+      assertEquals(extract(metadata, "column.colFloat.maxNumberOfMultiValues = (\\d+)"), "0");
+      assertEquals(extract(metadata, "column.colFloatMV.maxNumberOfMultiValues = (\\d+)"), "2");
+      assertEquals(extract(metadata, "column.colString.maxNumberOfMultiValues = (\\d+)"), "0");
+      assertEquals(extract(metadata, "column.colStringMV.maxNumberOfMultiValues = (\\d+)"), "2");
+      assertEquals(extract(metadata, "column.colBytes.maxNumberOfMultiValues = (\\d+)"), "0");
+      assertEquals(extract(metadata, "column.colLong.maxNumberOfMultiValues = (\\d+)"), "0");
+      assertEquals(extract(metadata, "column.colLongMV.maxNumberOfMultiValues = (\\d+)"), "3");
+      assertEquals(extract(metadata, "column.colDouble.maxNumberOfMultiValues = (\\d+)"), "0");
+      assertEquals(extract(metadata, "column.colDoubleMV.maxNumberOfMultiValues = (\\d+)"), "4");
+      assertEquals(extract(metadata, "column.colDoubleMetric.maxNumberOfMultiValues = (\\d+)"), "0");
+      assertEquals(extract(metadata, "column.colFloatMetric.maxNumberOfMultiValues = (\\d+)"), "0");
+      assertEquals(extract(metadata, "column.colTime.maxNumberOfMultiValues = (\\d+)"), "0");
+    });
+  }
+
+  @Test
+  public void testSegmentGenerator_withDateTimeFieldSpec() throws Exception {
+    runTest("memory_estimation/schema-with-metadata__dateTimeFieldSpec.json", metadata -> {
+      assertEquals(extract(metadata, "segment.total.docs = (\\d+)"), "10000");
+      assertEquals(extract(metadata, "column.colInt.cardinality = (\\d+)"), "500");
+      assertEquals(extract(metadata, "column.colFloat.cardinality = (\\d+)"), "600");
+      assertEquals(extract(metadata, "column.colString.cardinality = (\\d+)"), "700");
+      assertEquals(extract(metadata, "column.colBytes.cardinality = (\\d+)"), "800");
+      assertEquals(extract(metadata, "column.colMetric.cardinality = (\\d+)"), "900");
+      assertEquals(extract(metadata, "column.colTime.cardinality = (\\d+)"), "250");
+      assertEquals(extract(metadata, "column.colTime2.cardinality = (\\d+)"), "750");
+      assertEquals(extract(metadata, "column.colInt.maxNumberOfMultiValues = (\\d+)"), "3");
+      assertEquals(extract(metadata, "column.colFloat.maxNumberOfMultiValues = (\\d+)"), "2");
+      assertEquals(extract(metadata, "column.colString.maxNumberOfMultiValues = (\\d+)"), "0");
+      assertEquals(extract(metadata, "column.colBytes.maxNumberOfMultiValues = (\\d+)"), "0");
+      assertEquals(extract(metadata, "column.colMetric.maxNumberOfMultiValues = (\\d+)"), "0");
+      assertEquals(extract(metadata, "column.colTime.maxNumberOfMultiValues = (\\d+)"), "0");
+      assertEquals(extract(metadata, "column.colTime2.maxNumberOfMultiValues = (\\d+)"), "0");
+    });
+  }
+
+  private void runTest(String schemaFileName, Consumer<String> assertFunc) throws Exception {
+
+    // arrange inputs
+    File schemaFile = readFile(schemaFileName);
+    File tableConfigFile = readFile("memory_estimation/table-config.json");
+    TableConfig tableConfig = JsonUtils.fileToObject(tableConfigFile, TableConfig.class);
+    int numberOfRows = 10_000;
+
+    // act
+    MemoryEstimator.SegmentGenerator
+        segmentGenerator = new MemoryEstimator.SegmentGenerator(schemaFile, tableConfig, numberOfRows, true);
+    File generatedSegment = segmentGenerator.generate();
+
+    // assert
+    Path metadataFile = Paths.get(generatedSegment.getPath(), "v3", "metadata.properties");
+    String metadata = new String(Files.readAllBytes(metadataFile));
+    assertFunc.accept(metadata);
+
+    // cleanup
+    FileUtils.deleteDirectory(generatedSegment);
+  }
+
+  private String extract(String metadataContent, String patternStr) {
+    Pattern pattern = Pattern.compile(patternStr);
+    Matcher matcher = pattern.matcher(metadataContent);
+    matcher.find();
+    return matcher.group(1);
+  }
+
+  private File readFile(String fileName) throws Exception {
+    URL resource = getClass().getClassLoader().getResource(fileName);
+    return new File(resource.toURI());
+  }
+}
diff --git a/pinot-tools/src/test/resources/memory_estimation/schema-with-metadata.json b/pinot-tools/src/test/resources/memory_estimation/schema-with-metadata.json
new file mode 100644
index 0000000..e048d87
--- /dev/null
+++ b/pinot-tools/src/test/resources/memory_estimation/schema-with-metadata.json
@@ -0,0 +1,93 @@
+{
+  "schemaName": "testTable",
+  "dimensionFieldSpecs": [
+    {
+      "dataType": "INT",
+      "name": "colInt",
+      "cardinality": 100
+    },
+    {
+      "dataType": "INT",
+      "name": "colIntMV",
+      "singleValueField": false,
+      "cardinality": 150,
+      "numValuesPerEntry": 3
+    },
+    {
+      "dataType": "FLOAT",
+      "name": "colFloat",
+      "cardinality": 200
+    },
+    {
+      "dataType": "FLOAT",
+      "name": "colFloatMV",
+      "singleValueField": false,
+      "cardinality": 250,
+      "numValuesPerEntry": 1.7
+    },
+    {
+      "dataType": "STRING",
+      "name": "colString",
+      "cardinality": 300,
+      "averageLength": 10
+    },
+    {
+      "dataType": "STRING",
+      "name": "colStringMV",
+      "singleValueField": false,
+      "cardinality": 350,
+      "averageLength": 10,
+      "numValuesPerEntry": 1.3
+    },
+    {
+      "dataType": "BYTES",
+      "name": "colBytes",
+      "cardinality": 400,
+      "averageLength": 5
+    },
+    {
+      "dataType": "LONG",
+      "name": "colLong",
+      "cardinality": 500
+    },
+    {
+      "dataType": "LONG",
+      "name": "colLongMV",
+      "singleValueField": false,
+      "cardinality": 550,
+      "numValuesPerEntry": 2.8
+    },
+    {
+      "dataType": "DOUBLE",
+      "name": "colDouble",
+      "cardinality": 600
+    },
+    {
+      "dataType": "DOUBLE",
+      "name": "colDoubleMV",
+      "singleValueField": false,
+      "cardinality": 650,
+      "numValuesPerEntry": 3.4
+    }
+  ],
+  "metricFieldSpecs": [
+    {
+      "dataType": "DOUBLE",
+      "name": "colDoubleMetric",
+      "cardinality": 700
+    },
+    {
+      "dataType": "FLOAT",
+      "name": "colFloatMetric",
+      "cardinality": 800
+    }
+  ],
+  "timeFieldSpec": {
+    "incomingGranularitySpec" : {
+      "dataType": "LONG",
+      "name": "colTime",
+      "timeType": "DAYS",
+      "cardinality": 900
+    }
+  }
+}
diff --git a/pinot-tools/src/test/resources/memory_estimation/schema-with-metadata__dateTimeFieldSpec.json b/pinot-tools/src/test/resources/memory_estimation/schema-with-metadata__dateTimeFieldSpec.json
new file mode 100644
index 0000000..c331a37
--- /dev/null
+++ b/pinot-tools/src/test/resources/memory_estimation/schema-with-metadata__dateTimeFieldSpec.json
@@ -0,0 +1,54 @@
+{
+  "schemaName": "testTable",
+  "dimensionFieldSpecs": [
+    {
+      "dataType": "INT",
+      "name": "colInt",
+      "singleValueField": false,
+      "cardinality": 500,
+      "numValuesPerEntry": 3
+    },
+    {
+      "dataType": "FLOAT",
+      "name": "colFloat",
+      "singleValueField": false,
+      "cardinality": 600,
+      "numValuesPerEntry": 1.7
+    },
+    {
+      "dataType": "STRING",
+      "name": "colString",
+      "cardinality": 700,
+      "averageLength": 10
+    },
+    {
+      "dataType": "BYTES",
+      "name": "colBytes",
+      "cardinality": 800,
+      "averageLength": 5
+    }
+  ],
+  "metricFieldSpecs": [
+    {
+      "dataType": "DOUBLE",
+      "name": "colMetric",
+      "cardinality": 900
+    }
+  ],
+  "dateTimeFieldSpecs": [
+    {
+      "name": "colTime",
+      "dataType": "INT",
+      "format": "1:DAYS:EPOCH",
+      "granularity": "1:DAYS",
+      "cardinality": 250
+    },
+    {
+      "name": "colTime2",
+      "dataType": "LONG",
+      "format": "1:MILLISECONDS:EPOCH",
+      "granularity": "1:MILLISECONDS",
+      "cardinality": 750
+    }
+  ]
+}
diff --git a/pinot-tools/src/test/resources/memory_estimation/table-config.json b/pinot-tools/src/test/resources/memory_estimation/table-config.json
new file mode 100644
index 0000000..5e06e3f
--- /dev/null
+++ b/pinot-tools/src/test/resources/memory_estimation/table-config.json
@@ -0,0 +1,49 @@
+{
+  "metadata": {},
+  "routing": {
+    "routingTableBuilderName": "PartitionAwareRealtime",
+    "routingTableBuilderOptions": {}
+  },
+  "segmentsConfig": {
+    "replicasPerPartition": 3,
+    "replication": "3",
+    "retentionTimeUnit": "DAYS",
+    "retentionTimeValue": "5",
+    "schemaName": "testTable",
+    "segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy",
+    "segmentPushFrequency": "daily",
+    "segmentPushType": "APPEND",
+    "timeColumnName": "colTime",
+    "timeType": "DAYS"
+  },
+  "tableIndexConfig": {
+    "aggregateMetrics": true,
+    "invertedIndexColumns": [
+      "colInt",
+      "colString"
+    ],
+    "loadMode": "MMAP",
+    "noDictionaryColumns": [
+      "colBytes"
+    ],
+    "segmentFormatVersion": "v3",
+    "sortedColumn": [],
+    "streamConfigs": {
+      "realtime.segment.flush.threshold.size": 100000000,
+      "realtime.segment.flush.threshold.time": "6h",
+      "stream.kafka.clusterGroup": "aggregate-tracking",
+      "stream.kafka.consumer.factory.class.name": "com.linkedin.pinot.v2.server.LiPinotKafkaConsumerFactory",
+      "stream.kafka.consumer.prop.auto.offset.reset": "largest",
+      "stream.kafka.consumer.type": "simple",
+      "stream.kafka.decoder.class.name": "com.linkedin.pinot.v2.server.LiKafkaDecoder",
+      "stream.kafka.topic.name": "UserGeneratedContentGestureCountEvent",
+      "streamType": "kafka"
+    }
+  },
+  "tableName": "testTable",
+  "tableType": "REALTIME",
+  "tenants": {
+    "broker": "test",
+    "server": "test"
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org