You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by si...@apache.org on 2021/02/18 04:02:07 UTC
[incubator-pinot] branch master updated: Improve Real Time
Provisioning Helper tool (#6546)
This is an automated email from the ASF dual-hosted git repository.
siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 4c3af59 Improve Real Time Provisioning Helper tool (#6546)
4c3af59 is described below
commit 4c3af59e19c33fdb9162589b18cff8040ccb3b75
Author: Sajjad Moradi <mo...@gmail.com>
AuthorDate: Wed Feb 17 20:01:48 2021 -0800
Improve Real Time Provisioning Helper tool (#6546)
* Improve Realtime Provisioning Helper tool
* Fix license header issue - was missing a `*`
* Use existing schemaWithMetadata class instead dataCharacteristics
* Fix license header issue - was missing a `*`
* Move data objects to pinot-controller
---
.../controller/recommender/io/InputManager.java | 5 +-
.../io/metadata/DateTimeFieldSpecMetadata.java | 54 ++++++
.../recommender/io/metadata/FieldMetadata.java | 6 +-
.../io/metadata/SchemaWithMetaData.java | 8 +-
.../io/metadata/TimeFieldSpecMetadata.java | 27 +--
.../io/metadata/TimeGranularitySpecMetadata.java | 40 +++++
.../rules/io/params/RecommenderConstants.java | 2 +-
pinot-tools/pom.xml | 5 +
.../tools/admin/command/GenerateDataCommand.java | 10 +-
.../command/RealtimeProvisioningHelperCommand.java | 44 ++++-
.../pinot/tools/data/generator/BytesGenerator.java | 47 +++++
.../pinot/tools/data/generator/DataGenerator.java | 104 ++++++++---
.../tools/data/generator/DataGeneratorSpec.java | 16 +-
.../tools/data/generator/GeneratorFactory.java | 15 +-
.../data/generator/MultiValueGeneratorHelper.java | 54 ++++++
.../tools/data/generator/NumberGenerator.java | 34 ++--
.../tools/data/generator/StringGenerator.java | 26 ++-
.../pinot/tools/data/generator/TimeGenerator.java | 70 ++++++++
.../realtime/provisioning/MemoryEstimator.java | 200 +++++++++++++++++++++
.../generator/MultiValueGeneratorHelperTest.java | 49 +++++
.../tools/data/generator/TimeGeneratorTest.java | 52 ++++++
.../realtime/provisioning/MemoryEstimatorTest.java | 129 +++++++++++++
.../memory_estimation/schema-with-metadata.json | 93 ++++++++++
.../schema-with-metadata__dateTimeFieldSpec.json | 54 ++++++
.../resources/memory_estimation/table-config.json | 49 +++++
25 files changed, 1116 insertions(+), 77 deletions(-)
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/io/InputManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/io/InputManager.java
index 8a64892..f45d6bb 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/io/InputManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/io/InputManager.java
@@ -155,8 +155,9 @@ public class InputManager {
}
_metaDataMap.keySet().forEach(colName -> {
- double cardinality = _metaDataMap.get(colName).getCardinality();
- _metaDataMap.get(colName).setCardinality(regulateCardinalityInfinitePopulation(cardinality, sampleSize));
+ int cardinality = _metaDataMap.get(colName).getCardinality();
+ double regulatedCardinality = regulateCardinalityInfinitePopulation(cardinality, sampleSize);
+ _metaDataMap.get(colName).setCardinality((int) Math.round(regulatedCardinality));
});
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/io/metadata/DateTimeFieldSpecMetadata.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/io/metadata/DateTimeFieldSpecMetadata.java
new file mode 100644
index 0000000..3d6f093
--- /dev/null
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/io/metadata/DateTimeFieldSpecMetadata.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.controller.recommender.io.metadata;
+
+import org.apache.pinot.spi.data.DateTimeFieldSpec;
+import org.apache.pinot.spi.data.Schema;
+
+
+/**
+ * This class is used in {@link SchemaWithMetaData} to add metadata to {@link DateTimeFieldSpec}. Without this object,
+ * json representation of {@link SchemaWithMetaData} object cannot be deserialized to {@link Schema} object.
+ */
+public class DateTimeFieldSpecMetadata extends FieldMetadata {
+ private String _format;
+ private String _granularity;
+
+ public String getFormat() {
+ return _format;
+ }
+
+ public void setFormat(String format) {
+ _format = format;
+ }
+
+ public String getGranularity() {
+ return _granularity;
+ }
+
+ public void setGranularity(String granularity) {
+ _granularity = granularity;
+ }
+
+ @Override
+ public FieldType getFieldType() {
+ return FieldType.DATE_TIME;
+ }
+}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/io/metadata/FieldMetadata.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/io/metadata/FieldMetadata.java
index f39ede5..22b91db 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/io/metadata/FieldMetadata.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/io/metadata/FieldMetadata.java
@@ -33,7 +33,7 @@ import static org.apache.pinot.controller.recommender.rules.io.params.Recommende
*/
@JsonIgnoreProperties(ignoreUnknown = true)
public class FieldMetadata extends FieldSpec {
- double _cardinality = DEFAULT_CARDINALITY;
+ int _cardinality = DEFAULT_CARDINALITY;
int _averageLength = DEFAULT_DATA_LENGTH;
double _numValuesPerEntry = DEFAULT_AVERAGE_NUM_VALUES_PER_ENTRY; // for multi-values
@@ -55,12 +55,12 @@ public class FieldMetadata extends FieldSpec {
this._numValuesPerEntry = numValuesPerEntry;
}
- public double getCardinality() {
+ public int getCardinality() {
return _cardinality;
}
@JsonSetter(nulls = Nulls.SKIP)
- public void setCardinality(double cardinality) {
+ public void setCardinality(int cardinality) {
this._cardinality = cardinality;
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/io/metadata/SchemaWithMetaData.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/io/metadata/SchemaWithMetaData.java
index 2254082..7385093 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/io/metadata/SchemaWithMetaData.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/io/metadata/SchemaWithMetaData.java
@@ -31,7 +31,7 @@ import java.util.List;
public class SchemaWithMetaData {
private List<FieldMetadata> _dimensionFieldSpecs = new ArrayList<>();
private List<FieldMetadata> _metricFieldSpecs = new ArrayList<>();
- private List<FieldMetadata> _dateTimeFieldSpecs = new ArrayList<>();
+ private List<DateTimeFieldSpecMetadata> _dateTimeFieldSpecs = new ArrayList<>();
private TimeFieldSpecMetadata _timeFieldSpec;
@@ -51,15 +51,15 @@ public class SchemaWithMetaData {
_metricFieldSpecs = metricFieldSpecs;
}
- public List<FieldMetadata> getDateTimeFieldSpecs() {
+ public List<DateTimeFieldSpecMetadata> getDateTimeFieldSpecs() {
return _dateTimeFieldSpecs;
}
- public void setDateTimeFieldSpecs(List<FieldMetadata> dateTimeFieldSpecs) {
+ public void setDateTimeFieldSpecs(List<DateTimeFieldSpecMetadata> dateTimeFieldSpecs) {
_dateTimeFieldSpecs = dateTimeFieldSpecs;
}
- public FieldMetadata getTimeFieldSpec() {
+ public TimeFieldSpecMetadata getTimeFieldSpec() {
return _timeFieldSpec;
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/io/metadata/TimeFieldSpecMetadata.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/io/metadata/TimeFieldSpecMetadata.java
index 7c47d2e..74e9e98 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/io/metadata/TimeFieldSpecMetadata.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/io/metadata/TimeFieldSpecMetadata.java
@@ -21,36 +21,41 @@ package org.apache.pinot.controller.recommender.io.metadata;
import com.fasterxml.jackson.annotation.JsonSetter;
import com.fasterxml.jackson.annotation.Nulls;
+import static org.apache.pinot.controller.recommender.rules.io.params.RecommenderConstants.DEFAULT_CARDINALITY;
+import static org.apache.pinot.controller.recommender.rules.io.params.RecommenderConstants.DEFAULT_DATA_LENGTH;
+
public class TimeFieldSpecMetadata extends FieldMetadata {
- private FieldMetadata _incomingGranularitySpec;
- private FieldMetadata _outgoingGranularitySpec;
+ private TimeGranularitySpecMetadata _incomingGranularitySpec;
+ private TimeGranularitySpecMetadata _outgoingGranularitySpec;
- public FieldMetadata getIncomingGranularitySpec() {
+ public TimeGranularitySpecMetadata getIncomingGranularitySpec() {
return _incomingGranularitySpec;
}
- public void setIncomingGranularitySpec(FieldMetadata incomingGranularitySpec) {
+ public void setIncomingGranularitySpec(TimeGranularitySpecMetadata incomingGranularitySpec) {
_incomingGranularitySpec = incomingGranularitySpec;
if (_outgoingGranularitySpec == null) {
- super.setNumValuesPerEntry(incomingGranularitySpec.getNumValuesPerEntry());
- super.setAverageLength(incomingGranularitySpec.getAverageLength());
+ super.setNumValuesPerEntry(DEFAULT_CARDINALITY);
+ super.setAverageLength(DEFAULT_DATA_LENGTH);
super.setCardinality(incomingGranularitySpec.getCardinality());
super.setName(incomingGranularitySpec.getName());
+ super.setDataType(incomingGranularitySpec.getDataType());
}
}
- public FieldMetadata getOutgoingGranularitySpec() {
+ public TimeGranularitySpecMetadata getOutgoingGranularitySpec() {
return _outgoingGranularitySpec;
}
- public void setOutgoingGranularitySpec(FieldMetadata outgoingGranularitySpec) {
+ public void setOutgoingGranularitySpec(TimeGranularitySpecMetadata outgoingGranularitySpec) {
_outgoingGranularitySpec = outgoingGranularitySpec;
- super.setNumValuesPerEntry(outgoingGranularitySpec.getNumValuesPerEntry());
- super.setAverageLength(outgoingGranularitySpec.getAverageLength());
+ super.setNumValuesPerEntry(DEFAULT_CARDINALITY);
+ super.setAverageLength(DEFAULT_DATA_LENGTH);
super.setCardinality(outgoingGranularitySpec.getCardinality());
super.setName(outgoingGranularitySpec.getName());
+ super.setDataType(outgoingGranularitySpec.getDataType());
}
@Override
@@ -67,7 +72,7 @@ public class TimeFieldSpecMetadata extends FieldMetadata {
@Override
@JsonSetter(nulls = Nulls.SKIP)
- public void setCardinality(double cardinality) {
+ public void setCardinality(int cardinality) {
;
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/io/metadata/TimeGranularitySpecMetadata.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/io/metadata/TimeGranularitySpecMetadata.java
new file mode 100644
index 0000000..33fbc40
--- /dev/null
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/io/metadata/TimeGranularitySpecMetadata.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.controller.recommender.io.metadata;
+
+import com.fasterxml.jackson.annotation.JsonSetter;
+import com.fasterxml.jackson.annotation.Nulls;
+import org.apache.pinot.spi.data.TimeGranularitySpec;
+
+import static org.apache.pinot.controller.recommender.rules.io.params.RecommenderConstants.DEFAULT_CARDINALITY;
+
+
+public class TimeGranularitySpecMetadata extends TimeGranularitySpec {
+ private int _cardinality = DEFAULT_CARDINALITY;
+
+ public int getCardinality() {
+ return _cardinality;
+ }
+
+ @JsonSetter(nulls = Nulls.SKIP)
+ public void setCardinality(int cardinality) {
+ _cardinality = cardinality;
+ }
+}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/io/params/RecommenderConstants.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/io/params/RecommenderConstants.java
index 4a355b9..776113d 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/io/params/RecommenderConstants.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/io/params/RecommenderConstants.java
@@ -85,7 +85,7 @@ public class RecommenderConstants {
public static final String OFFLINE = "offline";
public static final String REALTIME = "realtime";
public static final String HYBRID = "hybrid";
- public static final double DEFAULT_CARDINALITY = 1;
+ public static final int DEFAULT_CARDINALITY = 1;
public static final double MIN_CARDINALITY = 1;
public static final double DEFAULT_AVERAGE_NUM_VALUES_PER_ENTRY = 1d;
public static final int DEFAULT_NULL_SIZE = 0;
diff --git a/pinot-tools/pom.xml b/pinot-tools/pom.xml
index 43afd0f..a93df73 100644
--- a/pinot-tools/pom.xml
+++ b/pinot-tools/pom.xml
@@ -154,6 +154,11 @@
<groupId>org.apache.pinot</groupId>
<artifactId>pinot-minion</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
<plugins>
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/GenerateDataCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/GenerateDataCommand.java
index fe3e182..f64c8e5 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/GenerateDataCommand.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/GenerateDataCommand.java
@@ -132,11 +132,14 @@ public class GenerateDataCommand extends AbstractBaseAdminCommand implements Com
final HashMap<String, Integer> cardinality = new HashMap<>();
final HashMap<String, IntRange> range = new HashMap<>();
final HashMap<String, Map<String, Object>> pattern = new HashMap<>();
+ final HashMap<String, Double> mvCountMap = new HashMap<>();
+ final HashMap<String, Integer> lengthMap = new HashMap<>();
buildCardinalityRangeMaps(_schemaAnnFile, cardinality, range, pattern);
final DataGeneratorSpec spec =
- buildDataGeneratorSpec(schema, columns, dataTypes, fieldTypes, timeUnits, cardinality, range, pattern);
+ buildDataGeneratorSpec(schema, columns, dataTypes, fieldTypes, timeUnits, cardinality, range, pattern,
+ mvCountMap, lengthMap);
final DataGenerator gen = new DataGenerator();
gen.init(spec);
@@ -176,7 +179,8 @@ public class GenerateDataCommand extends AbstractBaseAdminCommand implements Com
private DataGeneratorSpec buildDataGeneratorSpec(Schema schema, List<String> columns,
HashMap<String, DataType> dataTypes, HashMap<String, FieldType> fieldTypes, HashMap<String, TimeUnit> timeUnits,
- HashMap<String, Integer> cardinality, HashMap<String, IntRange> range, HashMap<String, Map<String, Object>> pattern) {
+ HashMap<String, Integer> cardinality, HashMap<String, IntRange> range, HashMap<String,
+ Map<String, Object>> pattern, Map<String, Double> mvCountMap, Map<String, Integer> lengthMap) {
for (final FieldSpec fs : schema.getAllFieldSpecs()) {
String col = fs.getName();
@@ -215,7 +219,7 @@ public class GenerateDataCommand extends AbstractBaseAdminCommand implements Com
}
}
- return new DataGeneratorSpec(columns, cardinality, range, pattern, dataTypes, fieldTypes, timeUnits, FileFormat.AVRO,
+ return new DataGeneratorSpec(columns, cardinality, range, pattern, mvCountMap, lengthMap, dataTypes, fieldTypes, timeUnits, FileFormat.AVRO,
_outDir, _overwrite);
}
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/RealtimeProvisioningHelperCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/RealtimeProvisioningHelperCommand.java
index b7c5deb..addd0e6 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/RealtimeProvisioningHelperCommand.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/RealtimeProvisioningHelperCommand.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.tools.admin.command;
+import com.google.common.base.Preconditions;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
@@ -49,6 +50,7 @@ public class RealtimeProvisioningHelperCommand extends AbstractBaseAdminCommand
private static final int DEFAULT_RETENTION_FOR_DAILY_PUSH = 72;
private static final int DEFAULT_RETENTION_FOR_WEEKLY_PUSH = 24*7 + 72;
private static final int DEFAULT_RETENTION_FOR_MONTHLY_PUSH = 24*31 + 72;
+ private static final int DEFAULT_NUMBER_OF_ROWS = 10_000;
@Option(name = "-tableConfigFile", required = true, metaVar = "<String>")
private String _tableConfigFile;
@@ -71,9 +73,15 @@ public class RealtimeProvisioningHelperCommand extends AbstractBaseAdminCommand
@Option(name = "-numHours", metaVar = "<String>", usage = "number of hours to consume as comma separated values (default 2,3,4,5,6,7,8,9,10,11,12)")
private String _numHours = "2,3,4,5,6,7,8,9,10,11,12";
- @Option(name = "-sampleCompletedSegmentDir", required = true, metaVar = "<String>", usage = "Consume from the topic for n hours and provide the path of the segment dir after it completes")
+ @Option(name = "-sampleCompletedSegmentDir", required = false, metaVar = "<String>", usage = "Consume from the topic for n hours and provide the path of the segment dir after it completes")
private String _sampleCompletedSegmentDir;
+ @Option(name = "-schemaWithMetadataFile", required = false, metaVar = "<String>", usage = "Schema file with extra information on each column describing characteristics of data")
+ private String _schemaWithMetadataFile;
+
+ @Option(name = "-numRows", required = false, metaVar = "<String>", usage = "Number of rows to be generated based on schema with metadata file")
+ private int _numRows;
+
@Option(name = "-ingestionRate", required = true, metaVar = "<String>", usage = "Avg number of messages per second ingested on any one stream partition (assumed all partitions are uniform)")
private int _ingestionRate;
@@ -128,12 +136,20 @@ public class RealtimeProvisioningHelperCommand extends AbstractBaseAdminCommand
return this;
}
+ public RealtimeProvisioningHelperCommand setNumRows(int numRows) {
+ _numRows = numRows;
+ return this;
+ }
+
@Override
public String toString() {
- return ("RealtimeProvisioningHelper -tableConfigFile " + _tableConfigFile + " -numPartitions "
- + _numPartitions + " -pushFrequency " + _pushFrequency + " -numHosts " + _numHosts + " -numHours " + _numHours
- + " -sampleCompletedSegmentDir " + _sampleCompletedSegmentDir + " -ingestionRate "
- + _ingestionRate + " -maxUsableHostMemory " + _maxUsableHostMemory + " -retentionHours " + _retentionHours);
+ String segmentStr = _sampleCompletedSegmentDir != null
+ ? " -sampleCompletedSegmentDir " + _sampleCompletedSegmentDir
+ : " -schemaWithMetadataFile " + _schemaWithMetadataFile + " -numRows " + _numRows;
+ return "RealtimeProvisioningHelper -tableConfigFile " + _tableConfigFile + " -numPartitions " + _numPartitions
+ + " -pushFrequency " + _pushFrequency + " -numHosts " + _numHosts + " -numHours " + _numHours + segmentStr
+ + " -ingestionRate " + _ingestionRate + " -maxUsableHostMemory " + _maxUsableHostMemory + " -retentionHours "
+ + _retentionHours;
}
@Override
@@ -145,7 +161,8 @@ public class RealtimeProvisioningHelperCommand extends AbstractBaseAdminCommand
public String description() {
return
"Given the table config, partitions, retention and a sample completed segment for a realtime table to be setup, "
- + "this tool will provide memory used by each host and an optimal segment size for various combinations of hours to consume and hosts";
+ + "this tool will provide memory used by each host and an optimal segment size for various combinations of hours to consume and hosts. "
+ + "Instead of a completed segment, if schema with characteristics of data is provided, a segment will be generated and used for memory estimation.";
}
@Override
@@ -171,6 +188,11 @@ public class RealtimeProvisioningHelperCommand extends AbstractBaseAdminCommand
@Override
public boolean execute()
throws IOException {
+
+ boolean segmentProvided = _sampleCompletedSegmentDir != null;
+ boolean characteristicsProvided = _schemaWithMetadataFile != null;
+ Preconditions.checkState(segmentProvided ^ characteristicsProvided, "Either completed segment should be provided or schema with characteristics file!");
+
LOGGER.info("Executing command: {}", toString());
TableConfig tableConfig;
@@ -217,12 +239,16 @@ public class RealtimeProvisioningHelperCommand extends AbstractBaseAdminCommand
// TODO: allow multiple segments.
// Consuming: Build statsHistory using multiple segments. Use multiple data points of (totalDocs,numHoursConsumed) to calculate totalDocs for our numHours
// Completed: Use multiple (completedSize,numHours) data points to calculate completed size for our numHours
- File sampleCompletedSegmentFile = new File(_sampleCompletedSegmentDir);
long maxUsableHostMemBytes = DataSizeUtils.toBytes(_maxUsableHostMemory);
- MemoryEstimator memoryEstimator =
- new MemoryEstimator(tableConfig, sampleCompletedSegmentFile, _ingestionRate, maxUsableHostMemBytes, tableRetentionHours);
+ if (_numRows == 0) {
+ _numRows = DEFAULT_NUMBER_OF_ROWS;
+ }
+
+ MemoryEstimator memoryEstimator = segmentProvided
+ ? new MemoryEstimator(tableConfig, new File(_sampleCompletedSegmentDir), _ingestionRate, maxUsableHostMemBytes, tableRetentionHours)
+ : new MemoryEstimator(tableConfig, new File(_schemaWithMetadataFile), _numRows, _ingestionRate, maxUsableHostMemBytes, tableRetentionHours);
File sampleStatsHistory = memoryEstimator.initializeStatsHistory();
memoryEstimator
.estimateMemoryUsed(sampleStatsHistory, numHosts, numHours, totalConsumingPartitions, _retentionHours);
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/data/generator/BytesGenerator.java b/pinot-tools/src/main/java/org/apache/pinot/tools/data/generator/BytesGenerator.java
new file mode 100644
index 0000000..c7624d0
--- /dev/null
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/data/generator/BytesGenerator.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.tools.data.generator;
+
+import org.apache.commons.codec.binary.Hex;
+
+
+/**
+ * A class to generating data for a column with type BYTES
+ */
+public class BytesGenerator implements Generator {
+ private static final double DEFAULT_NUMBER_OF_VALUES_PER_ENTRY = 1;
+
+ private final StringGenerator _stringGenerator;
+
+ public BytesGenerator(Integer cardinality, Integer entryLength) {
+ _stringGenerator = new StringGenerator(cardinality, DEFAULT_NUMBER_OF_VALUES_PER_ENTRY, entryLength);
+ }
+
+ @Override
+ public void init() {
+ _stringGenerator.init();
+ }
+
+ @Override
+ public Object next() {
+ String next = (String) _stringGenerator.next();
+ return Hex.encodeHexString(next.getBytes());
+ }
+}
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/data/generator/DataGenerator.java b/pinot-tools/src/main/java/org/apache/pinot/tools/data/generator/DataGenerator.java
index 7944ecb..0ac754d 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/data/generator/DataGenerator.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/data/generator/DataGenerator.java
@@ -18,6 +18,9 @@
*/
package org.apache.pinot.tools.data.generator;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.math.IntRange;
@@ -31,7 +34,6 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@@ -71,26 +73,27 @@ public class DataGenerator {
for (final String column : genSpec.getColumns()) {
DataType dataType = genSpec.getDataTypesMap().get(column);
+ Generator generator;
if (genSpec.getPatternMap().containsKey(column)) {
- generators.put(column,
- GeneratorFactory.getGeneratorFor(
- PatternType.valueOf(genSpec.getPatternMap().get(column).get("type").toString()),
- genSpec.getPatternMap().get(column)));
+ generator = GeneratorFactory
+ .getGeneratorFor(PatternType.valueOf(genSpec.getPatternMap().get(column).get("type").toString()),
+ genSpec.getPatternMap().get(column));
} else if (genSpec.getCardinalityMap().containsKey(column)) {
- generators.put(column, GeneratorFactory.getGeneratorFor(dataType, genSpec.getCardinalityMap().get(column)));
-
+ generator = GeneratorFactory.getGeneratorFor(dataType,
+ genSpec.getCardinalityMap().get(column),
+ genSpec.getMvCountMap().get(column),
+ genSpec.getLengthMap().get(column),
+ genSpec.getTimeUnitMap().get(column));
} else if (genSpec.getRangeMap().containsKey(column)) {
IntRange range = genSpec.getRangeMap().get(column);
- generators.put(column,
- GeneratorFactory.getGeneratorFor(dataType, range.getMinimumInteger(), range.getMaximumInteger()));
-
+ generator = GeneratorFactory.getGeneratorFor(dataType, range.getMinimumInteger(), range.getMaximumInteger());
} else {
LOGGER.error("cardinality for this column does not exist : " + column);
throw new RuntimeException("cardinality for this column does not exist");
}
-
- generators.get(column).init();
+ generator.init();
+ generators.put(column, generator);
}
}
@@ -115,7 +118,8 @@ public class DataGenerator {
for (int j = 0; j < numPerFiles; j++) {
Object[] values = new Object[genSpec.getColumns().size()];
for (int k = 0; k < genSpec.getColumns().size(); k++) {
- values[k] = generators.get(genSpec.getColumns().get(k)).next();
+ Object next = generators.get(genSpec.getColumns().get(k)).next();
+ values[k] = serializeIfMultiValue(next);
}
writer.append(StringUtils.join(values, ",")).append('\n');
}
@@ -123,6 +127,13 @@ public class DataGenerator {
}
}
+ private Object serializeIfMultiValue(Object obj) {
+ if (obj instanceof List) {
+ return StringUtils.join((List) obj, ";");
+ }
+ return obj;
+ }
+
public Schema fetchSchema() {
final Schema schema = new Schema();
for (final String column : genSpec.getColumns()) {
@@ -163,7 +174,7 @@ public class DataGenerator {
public static void main(String[] args)
throws IOException {
- final String[] columns = {"column1", "column2", "column3", "column4", "column5"};
+
final Map<String, DataType> dataTypes = new HashMap<>();
final Map<String, FieldType> fieldTypes = new HashMap<>();
final Map<String, TimeUnit> timeUnits = new HashMap<>();
@@ -171,18 +182,67 @@ public class DataGenerator {
final Map<String, Integer> cardinality = new HashMap<>();
final Map<String, IntRange> range = new HashMap<>();
final Map<String, Map<String, Object>> template = new HashMap<>();
-
- for (final String col : columns) {
- dataTypes.put(col, DataType.INT);
- fieldTypes.put(col, FieldType.DIMENSION);
- cardinality.put(col, 1000);
+ Map<String, Double> mvCountMap = new HashMap<>();
+ Map<String, Integer> lengthMap = new HashMap<>();
+ List<String> columnNames = new ArrayList<>();
+
+ int cardinalityValue = 5;
+ int strLength = 5;
+
+ String colName = "colInt";
+ dataTypes.put(colName, DataType.INT);
+ fieldTypes.put(colName, FieldType.DIMENSION);
+ cardinality.put(colName, cardinalityValue);
+ columnNames.add(colName);
+ mvCountMap.put(colName, 3.7);
+
+ String colName2 = "colFloat";
+ dataTypes.put(colName2, DataType.FLOAT);
+ fieldTypes.put(colName2, FieldType.DIMENSION);
+ cardinality.put(colName2, cardinalityValue);
+ columnNames.add(colName2);
+ mvCountMap.put(colName2, 3.7);
+
+ String colName3 = "colString";
+ dataTypes.put(colName3, DataType.STRING);
+ fieldTypes.put(colName3, FieldType.DIMENSION);
+ cardinality.put(colName3, cardinalityValue);
+ columnNames.add(colName3);
+ mvCountMap.put(colName3, 3.7);
+ lengthMap.put(colName3, strLength);
+
+ String colName4 = "metric";
+ dataTypes.put(colName4, DataType.DOUBLE);
+ fieldTypes.put(colName4, FieldType.METRIC);
+ cardinality.put(colName4, cardinalityValue);
+ columnNames.add(colName4);
+
+ String colName5 = "colBytes";
+ dataTypes.put(colName5, DataType.BYTES);
+ fieldTypes.put(colName5, FieldType.DIMENSION);
+ cardinality.put(colName5, cardinalityValue);
+ columnNames.add(colName5);
+ mvCountMap.put(colName5, 3.7);
+ lengthMap.put(colName5, strLength + 1);
+
+ for (int i = 0; i < 2; i++) {
+ colName = "colString" + (i + 2);
+ dataTypes.put(colName, DataType.STRING);
+ fieldTypes.put(colName, FieldType.DIMENSION);
+ cardinality.put(colName, cardinalityValue);
+ columnNames.add(colName);
+ lengthMap.put(colName, strLength + i + 2);
}
+
+ String outputDir = Paths.get(System.getProperty("java.io.tmpdir"), "csv-data").toString();
final DataGeneratorSpec spec =
- new DataGeneratorSpec(Arrays.asList(columns), cardinality, range, template, dataTypes, fieldTypes, timeUnits,
- FileFormat.AVRO, "/tmp/out", true);
+ new DataGeneratorSpec(columnNames, cardinality, range, template, mvCountMap, lengthMap,
+ dataTypes, fieldTypes, timeUnits,
+ FileFormat.CSV, outputDir, true);
final DataGenerator gen = new DataGenerator();
gen.init(spec);
- gen.generateAvro(1000000L, 2);
+ gen.generateCsv(100, 1);
+ System.out.println("CSV data is generated under: " + outputDir);
}
}
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/data/generator/DataGeneratorSpec.java b/pinot-tools/src/main/java/org/apache/pinot/tools/data/generator/DataGeneratorSpec.java
index c98cd4d..4ffeeb7 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/data/generator/DataGeneratorSpec.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/data/generator/DataGeneratorSpec.java
@@ -38,6 +38,8 @@ public class DataGeneratorSpec {
private final Map<String, Integer> cardinalityMap;
private final Map<String, IntRange> rangeMap;
private final Map<String, Map<String, Object>> patternMap;
+ private final Map<String, Double> mvCountMap; // map of column name to average number of values per entry
+ private final Map<String, Integer> lengthMap; // map of column name to average length of th entry (used for string/byte generator)
private final Map<String, DataType> dataTypesMap;
private final Map<String, FieldType> fieldTypesMap;
@@ -48,18 +50,20 @@ public class DataGeneratorSpec {
private final boolean overrideOutDir;
public DataGeneratorSpec() {
- this(new ArrayList<String>(), new HashMap<>(), new HashMap<>(), new HashMap<>(),
+ this(new ArrayList<String>(), new HashMap<>(), new HashMap<>(), new HashMap<>(), new HashMap<>(), new HashMap<>(),
new HashMap<>(), new HashMap<>(), new HashMap<>(),
FileFormat.AVRO, "/tmp/dataGen", true);
}
public DataGeneratorSpec(List<String> columns, Map<String, Integer> cardinalityMap, Map<String, IntRange> rangeMap,
- Map<String, Map<String, Object>> patternMap, Map<String, DataType> dataTypesMap, Map<String, FieldType> fieldTypesMap, Map<String, TimeUnit> timeUnitMap,
+ Map<String, Map<String, Object>> patternMap, Map<String, Double> mvCountMap, Map<String, Integer> lengthMap, Map<String, DataType> dataTypesMap, Map<String, FieldType> fieldTypesMap, Map<String, TimeUnit> timeUnitMap,
FileFormat format, String outputDir, boolean override) {
this.columns = columns;
this.cardinalityMap = cardinalityMap;
this.rangeMap = rangeMap;
this.patternMap = patternMap;
+ this.mvCountMap = mvCountMap;
+ this.lengthMap = lengthMap;
outputFileFormat = format;
this.outputDir = outputDir;
@@ -102,6 +106,14 @@ public class DataGeneratorSpec {
return patternMap;
}
+ public Map<String, Double> getMvCountMap() {
+ return mvCountMap;
+ }
+
+ public Map<String, Integer> getLengthMap() {
+ return lengthMap;
+ }
+
public FileFormat getOutputFileFormat() {
return outputFileFormat;
}
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/data/generator/GeneratorFactory.java b/pinot-tools/src/main/java/org/apache/pinot/tools/data/generator/GeneratorFactory.java
index 215bf9f..94120b6 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/data/generator/GeneratorFactory.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/data/generator/GeneratorFactory.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.tools.data.generator;
+import java.util.concurrent.TimeUnit;
import org.apache.pinot.spi.data.FieldSpec.DataType;
import java.util.Map;
@@ -28,12 +29,18 @@ import java.util.Map;
*/
public class GeneratorFactory {
- public static Generator getGeneratorFor(DataType type, int cardinality) {
+ public static Generator getGeneratorFor(DataType type, Integer cardinality, Double numberOfValuesPerEntry,
+ Integer entryLength, TimeUnit timeUnit) {
if (type == DataType.STRING) {
- return new StringGenerator(cardinality);
+ return new StringGenerator(cardinality, numberOfValuesPerEntry, entryLength);
}
-
- return new NumberGenerator(cardinality, type, false);
+ if (type == DataType.BYTES) {
+ return new BytesGenerator(cardinality, entryLength);
+ }
+ if (timeUnit != null) {
+ return new TimeGenerator(cardinality, type, timeUnit);
+ }
+ return new NumberGenerator(cardinality, type, numberOfValuesPerEntry);
}
public static Generator getGeneratorFor(DataType dataType, int start, int end) {
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/data/generator/MultiValueGeneratorHelper.java b/pinot-tools/src/main/java/org/apache/pinot/tools/data/generator/MultiValueGeneratorHelper.java
new file mode 100644
index 0000000..ff62818
--- /dev/null
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/data/generator/MultiValueGeneratorHelper.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.tools.data.generator;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.function.Supplier;
+
+
+/**
+ * A helper class for generating multi value entries
+ */
+public class MultiValueGeneratorHelper {
+
+ /**
+ * Generate MV entries
+ *
+ * @param numberOfValuesPerEntry number of values per each row
+ * @param rand random object
+ * @param nextItemFunc function to get the next random item
+ * @return
+ */
+ public static List<Object> generateMultiValueEntries(double numberOfValuesPerEntry, Random rand,
+ Supplier<Object> nextItemFunc) {
+ List<Object> entries = new ArrayList<>();
+ int i = 0;
+ for (; i < numberOfValuesPerEntry - 1; i++) {
+ entries.add(nextItemFunc.get());
+ }
+ // last item
+ if (rand.nextDouble() < numberOfValuesPerEntry - i) {
+ entries.add(nextItemFunc.get());
+ }
+ return entries;
+ }
+}
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/data/generator/NumberGenerator.java b/pinot-tools/src/main/java/org/apache/pinot/tools/data/generator/NumberGenerator.java
index 32d697f..15bdba5 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/data/generator/NumberGenerator.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/data/generator/NumberGenerator.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.tools.data.generator;
+import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
@@ -32,10 +33,11 @@ import org.slf4j.LoggerFactory;
public class NumberGenerator implements Generator {
private static final Logger LOGGER = LoggerFactory.getLogger(NumberGenerator.class);
+ private static final double DEFAULT_NUMBER_OF_VALUES_PER_ENTRY = 1;
private final int cardinality;
private final DataType columnType;
- private final boolean sorted;
+ private final double numberOfValuesPerEntry;
private List<Integer> intValues;
private List<Double> doubleValues;
@@ -44,10 +46,13 @@ public class NumberGenerator implements Generator {
private final Random random;
- public NumberGenerator(Integer cardinality, DataType type, boolean sorted) {
- this.cardinality = cardinality.intValue();
+ public NumberGenerator(Integer cardinality, DataType type, Double numberOfValuesPerEntry) {
+ this.cardinality = cardinality;
+ this.numberOfValuesPerEntry =
+ numberOfValuesPerEntry != null ? numberOfValuesPerEntry : DEFAULT_NUMBER_OF_VALUES_PER_ENTRY;
+ Preconditions.checkState(this.numberOfValuesPerEntry >= 1,
+ "Number of values per entry (should be >= 1): " + this.numberOfValuesPerEntry);
columnType = type;
- this.sorted = sorted;
random = new Random(System.currentTimeMillis());
}
@@ -61,7 +66,7 @@ public class NumberGenerator implements Generator {
final int start = rand.nextInt(cardinality);
final int end = start + cardinality;
for (int i = start; i < end; i++) {
- intValues.add(new Integer(i));
+ intValues.add(i);
}
break;
case LONG:
@@ -69,15 +74,15 @@ public class NumberGenerator implements Generator {
final long longStart = rand.nextInt(cardinality);
final long longEnd = longStart + cardinality;
for (long i = longStart; i < longEnd; i++) {
- longValues.add(new Long(i));
+ longValues.add(i);
}
break;
case FLOAT:
floatValues = new ArrayList<Float>();
- final float floatStart = rand.nextFloat() * rand.nextInt(1000);
+ final float floatStart = Math.round(rand.nextFloat()* 100.0f) / 100.0f; // round to two decimal places
int floatCounter = 1;
while (true) {
- floatValues.add(new Float(floatStart + 0.1f));
+ floatValues.add(floatStart + floatCounter);
if (floatCounter == cardinality) {
break;
}
@@ -86,10 +91,10 @@ public class NumberGenerator implements Generator {
break;
case DOUBLE:
doubleValues = new ArrayList<Double>();
- final double doubleStart = rand.nextDouble() * rand.nextInt(10000);
+ final double doubleStart = Math.round(rand.nextDouble() * 100.0) / 100.0; // round to two decimal places
int doubleCounter = 1;
while (true) {
- doubleValues.add(new Double(doubleStart + 0.1d));
+ doubleValues.add(doubleStart + doubleCounter);
if (doubleCounter == cardinality) {
break;
}
@@ -104,6 +109,13 @@ public class NumberGenerator implements Generator {
@Override
public Object next() {
+ if (numberOfValuesPerEntry == 1) {
+ return getNextNumber();
+ }
+ return MultiValueGeneratorHelper.generateMultiValueEntries(numberOfValuesPerEntry, random, this::getNextNumber);
+ }
+
+ private Number getNextNumber() {
switch (columnType) {
case INT:
return intValues.get(random.nextInt(cardinality));
@@ -120,7 +132,7 @@ public class NumberGenerator implements Generator {
}
public static void main(String[] args) {
- final NumberGenerator gen = new NumberGenerator(10000000, DataType.LONG, false);
+ final NumberGenerator gen = new NumberGenerator(10000000, DataType.LONG, null);
gen.init();
for (int i = 0; i < 1000; i++) {
System.out.println(gen.next());
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/data/generator/StringGenerator.java b/pinot-tools/src/main/java/org/apache/pinot/tools/data/generator/StringGenerator.java
index 8e0b963..7770364 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/data/generator/StringGenerator.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/data/generator/StringGenerator.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.tools.data.generator;
+import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
@@ -31,15 +32,23 @@ import org.apache.commons.lang.RandomStringUtils;
*/
public class StringGenerator implements Generator {
-
- private static final int lengthOfEachString = 10;
+ private static final double DEFAULT_NUMBER_OF_VALUES_PER_ENTRY = 1;
+ private static final int DEFAULT_LENGTH_OF_EACH_STRING = 10;
private final int cardinality;
private final Random rand;
+ private final double numberOfValuesPerEntry;
+ private final int lengthOfEachString;
+
private List<String> vals;
- public StringGenerator(Integer cardinality) {
- this.cardinality = cardinality.intValue();
+ public StringGenerator(Integer cardinality, Double numberOfValuesPerEntry, Integer lengthOfEachString) {
+ this.cardinality = cardinality;
+ this.numberOfValuesPerEntry =
+ numberOfValuesPerEntry != null ? numberOfValuesPerEntry : DEFAULT_NUMBER_OF_VALUES_PER_ENTRY;
+ this.lengthOfEachString = lengthOfEachString != null ? lengthOfEachString : DEFAULT_LENGTH_OF_EACH_STRING;
+ Preconditions.checkState(this.numberOfValuesPerEntry >= 1,
+ "Number of values per entry (should be >= 1): " + this.numberOfValuesPerEntry);
rand = new Random(System.currentTimeMillis());
}
@@ -56,11 +65,18 @@ public class StringGenerator implements Generator {
@Override
public Object next() {
+ if (numberOfValuesPerEntry == 1) {
+ return getNextString();
+ }
+ return MultiValueGeneratorHelper.generateMultiValueEntries(numberOfValuesPerEntry, rand, this::getNextString);
+ }
+
+ private String getNextString() {
return vals.get(rand.nextInt(cardinality));
}
public static void main(String[] args) {
- final StringGenerator gen = new StringGenerator(10000);
+ final StringGenerator gen = new StringGenerator(10000, null, null);
gen.init();
for (int i = 0; i < 1000000; i++) {
System.out.println(gen.next());
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/data/generator/TimeGenerator.java b/pinot-tools/src/main/java/org/apache/pinot/tools/data/generator/TimeGenerator.java
new file mode 100644
index 0000000..29d80e1
--- /dev/null
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/data/generator/TimeGenerator.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.tools.data.generator;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Date;
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.spi.data.FieldSpec;
+
+
+/**
+ * A class to generate data for a time column
+ */
+public class TimeGenerator implements Generator {
+ private static final double DEFAULT_NUMBER_OF_VALUES_PER_ENTRY = 1;
+
+ private final NumberGenerator _numberGenerator;
+ private final FieldSpec.DataType _dataType;
+ private final Number _initialValue;
+
+ public TimeGenerator(Integer cardinality, FieldSpec.DataType dataType, TimeUnit timeUnit) {
+ _numberGenerator = new NumberGenerator(cardinality, dataType, DEFAULT_NUMBER_OF_VALUES_PER_ENTRY);
+ _dataType = dataType;
+ Date now = new Date();
+ _initialValue = convert(now, timeUnit, dataType);
+ }
+
+ @Override
+ public void init() {
+ _numberGenerator.init();
+ }
+
+ @Override
+ public Object next() {
+ Object next = _numberGenerator.next();
+ if (_dataType == FieldSpec.DataType.LONG) {
+ return ((long) next) + _initialValue.longValue();
+ }
+ return ((int) next) + _initialValue.intValue();
+ }
+
+ @VisibleForTesting
+ static Number convert(Date date, TimeUnit timeUnit, FieldSpec.DataType dataType) {
+ long convertedTime = timeUnit.convert(date.getTime(), TimeUnit.MILLISECONDS);
+ if (dataType == FieldSpec.DataType.LONG) {
+ return convertedTime;
+ }
+ if (dataType == FieldSpec.DataType.INT) {
+ return (int) convertedTime;
+ }
+ throw new IllegalArgumentException("Time column can be only INT or LONG: " + dataType);
+ }
+}
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/realtime/provisioning/MemoryEstimator.java b/pinot-tools/src/main/java/org/apache/pinot/tools/realtime/provisioning/MemoryEstimator.java
index ef0484f..ca760a2 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/realtime/provisioning/MemoryEstimator.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/realtime/provisioning/MemoryEstimator.java
@@ -20,23 +20,52 @@ package org.apache.pinot.tools.realtime.provisioning;
import java.io.File;
import java.io.IOException;
+import java.nio.file.Paths;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.io.FileUtils;
+import org.apache.pinot.controller.recommender.io.metadata.DateTimeFieldSpecMetadata;
+import org.apache.pinot.controller.recommender.io.metadata.FieldMetadata;
+import org.apache.pinot.controller.recommender.io.metadata.SchemaWithMetaData;
+import org.apache.pinot.controller.recommender.io.metadata.TimeFieldSpecMetadata;
+import org.apache.pinot.controller.recommender.io.metadata.TimeGranularitySpecMetadata;
import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
+import org.apache.pinot.common.segment.ReadMode;
import org.apache.pinot.core.data.readers.PinotSegmentRecordReader;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
+import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment;
+import org.apache.pinot.core.indexsegment.immutable.ImmutableSegmentLoader;
import org.apache.pinot.core.indexsegment.mutable.MutableSegmentImpl;
import org.apache.pinot.core.io.readerwriter.RealtimeIndexOffHeapMemoryManager;
import org.apache.pinot.core.io.writer.impl.DirectMemoryManager;
import org.apache.pinot.core.realtime.impl.RealtimeSegmentConfig;
import org.apache.pinot.core.realtime.impl.RealtimeSegmentStatsHistory;
+import org.apache.pinot.core.segment.creator.SegmentIndexCreationDriver;
+import org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
import org.apache.pinot.core.segment.index.metadata.SegmentMetadataImpl;
+import org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig;
import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.DateTimeFormatSpec;
import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.FileFormat;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.utils.DataSizeUtils;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.tools.data.generator.DataGenerator;
+import org.apache.pinot.tools.data.generator.DataGeneratorSpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
@@ -70,6 +99,9 @@ public class MemoryEstimator {
private String[][] _consumingMemoryPerHost;
private String[][] _numSegmentsQueriedPerHost;
+ /**
+ * Constructor used for processing the given completed segment
+ */
public MemoryEstimator(TableConfig tableConfig, File sampleCompletedSegment, int ingestionRate,
long maxUsableHostMemory, int tableRetentionHours) {
_maxUsableHostMemory = maxUsableHostMemory;
@@ -108,6 +140,18 @@ public class MemoryEstimator {
}
/**
+ * Constructor used for processing the given data characteristics (instead of completed segment)
+ */
+ public MemoryEstimator(TableConfig tableConfig, File schemaWithMetadataFile, int numberOfRows, int ingestionRate,
+ long maxUsableHostMemory, int tableRetentionHours) {
+ this(tableConfig,
+ generateCompletedSegment(schemaWithMetadataFile, tableConfig, numberOfRows),
+ ingestionRate,
+ maxUsableHostMemory,
+ tableRetentionHours);
+ }
+
+ /**
* Initialize the stats file using the sample segment provided.
* <br>This involves indexing each row of the sample segment using MutableSegmentImpl. This is equivalent to consuming the rows of a segment.
* Although they will be in a different order than consumed by the host, the stats should be equivalent.
@@ -387,4 +431,160 @@ public class MemoryEstimator {
public String[][] getNumSegmentsQueriedPerHost() {
return _numSegmentsQueriedPerHost;
}
+
+ private static File generateCompletedSegment(File schemaWithMetadataFile, TableConfig tableConfig, int numberOfRows) {
+ SegmentGenerator segmentGenerator = new SegmentGenerator(schemaWithMetadataFile, tableConfig, numberOfRows, true);
+ return segmentGenerator.generate();
+ }
+
+ /**
+ * This class is used in Memory Estimator to generate segment based on the the given characteristics of data
+ */
+ public static class SegmentGenerator {
+ private static final Logger LOGGER = LoggerFactory.getLogger(SegmentGenerator.class);
+ private static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd_HH:mm:ss");
+
+ private File _schemaWithMetadataFile;
+ private TableConfig _tableConfig;
+ private int _numberOfRows;
+ private boolean _deleteCsv;
+
+ public SegmentGenerator(File schemaWithMetadataFile, TableConfig tableConfig, int numberOfRows, boolean deleteCsv) {
+ _schemaWithMetadataFile = schemaWithMetadataFile;
+ _tableConfig = tableConfig;
+ _numberOfRows = numberOfRows;
+ _deleteCsv = deleteCsv;
+ }
+
+ public File generate() {
+ Date now = new Date();
+ File csvDataFile = generateData(now);
+ File segment = createSegment(csvDataFile, now);
+ if (_deleteCsv) {
+ csvDataFile.delete();
+ }
+ return segment;
+ }
+
+ private File generateData(Date now) {
+
+ // deserialize schema
+ SchemaWithMetaData schema;
+ try {
+ schema = JsonUtils.fileToObject(_schemaWithMetadataFile, SchemaWithMetaData.class);
+ } catch (Exception e) {
+ throw new RuntimeException(String.format("Cannot read schema file '%s' to SchemaWithMetaData object.",
+ _schemaWithMetadataFile));
+ }
+
+ // create maps of "column name" to ...
+ Map<String, Integer> lengths = new HashMap<>();
+ Map<String, Double> mvCounts = new HashMap<>();
+ Map<String, Integer> cardinalities = new HashMap<>();
+ Map<String, FieldSpec.DataType> dataTypes = new HashMap<>();
+ Map<String, FieldSpec.FieldType> fieldTypes = new HashMap<>();
+ Map<String, TimeUnit> timeUnits = new HashMap<>();
+ List<String> colNames = new ArrayList<>();
+ List<FieldMetadata> dimensions = schema.getDimensionFieldSpecs();
+ List<FieldMetadata> metrics = schema.getMetricFieldSpecs();
+ List<DateTimeFieldSpecMetadata> dateTimes = schema.getDateTimeFieldSpecs();
+ Stream.concat(Stream.concat(dimensions.stream(), metrics.stream()), dateTimes.stream()).forEach(column -> {
+ String name = column.getName();
+ colNames.add(name);
+ lengths.put(name, column.getAverageLength());
+ mvCounts.put(name, column.getNumValuesPerEntry());
+ cardinalities.put(name, column.getCardinality());
+ dataTypes.put(name, column.getDataType());
+ fieldTypes.put(name, column.getFieldType());
+ });
+ dateTimes.forEach(dateTimeColumn -> {
+ TimeUnit timeUnit = new DateTimeFormatSpec(dateTimeColumn.getFormat()).getColumnUnit();
+ timeUnits.put(dateTimeColumn.getName(), timeUnit);
+ });
+ TimeFieldSpecMetadata timeSpec = schema.getTimeFieldSpec();
+ if (timeSpec != null) {
+ String name = timeSpec.getName();
+ colNames.add(name);
+ cardinalities.put(name, timeSpec.getCardinality());
+ dataTypes.put(name, timeSpec.getDataType());
+ fieldTypes.put(name, timeSpec.getFieldType());
+ TimeGranularitySpecMetadata timeGranSpec = timeSpec.getOutgoingGranularitySpec() != null
+ ? timeSpec.getOutgoingGranularitySpec()
+ : timeSpec.getIncomingGranularitySpec();
+ timeUnits.put(name, timeGranSpec.getTimeType());
+ }
+
+ // generate data
+ String outputDir = getOutputDir(now, "-csv");
+ DataGeneratorSpec spec =
+ new DataGeneratorSpec(colNames, cardinalities, new HashMap<>(), new HashMap<>(), mvCounts, lengths, dataTypes,
+ fieldTypes, timeUnits, FileFormat.CSV, outputDir, true);
+ DataGenerator dataGenerator = new DataGenerator();
+ try {
+ dataGenerator.init(spec);
+ dataGenerator.generateCsv(_numberOfRows, 1);
+ File outputFile = Paths.get(outputDir, "output_0.csv").toFile();
+ LOGGER.info("Successfully generated data file: {}", outputFile);
+ return outputFile;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private File createSegment(File csvDataFile, Date now) {
+
+ // deserialize schema
+ Schema schema;
+ try {
+ schema = JsonUtils.fileToObject(_schemaWithMetadataFile, Schema.class);
+ } catch (Exception e) {
+ throw new RuntimeException(
+ String.format("Cannot read schema file '%s' to schema object.", _schemaWithMetadataFile));
+ }
+
+ // create segment
+ LOGGER.info("Started creating segment from file: {}", csvDataFile);
+ String outDir = getOutputDir(now, "-segment");
+ SegmentGeneratorConfig segmentGeneratorConfig = getSegmentGeneratorConfig(csvDataFile, schema, outDir);
+ SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
+ try {
+ driver.init(segmentGeneratorConfig);
+ driver.build();
+ } catch (Exception e) {
+ throw new RuntimeException("Caught exception while generating segment from file: " + csvDataFile, e);
+ }
+ String segmentName = driver.getSegmentName();
+ File indexDir = new File(outDir, segmentName);
+ LOGGER.info("Successfully created segment: {} at directory: {}", segmentName, indexDir);
+
+ // verify segment
+ LOGGER.info("Verifying the segment by loading it");
+ ImmutableSegment segment;
+ try {
+ segment = ImmutableSegmentLoader.load(indexDir, ReadMode.mmap);
+ } catch (Exception e) {
+ throw new RuntimeException("Caught exception while verifying the created segment", e);
+ }
+ LOGGER.info("Successfully loaded segment: {} of size: {} bytes", segmentName,
+ segment.getSegmentSizeBytes());
+ segment.destroy();
+
+ return indexDir;
+ }
+
+ private SegmentGeneratorConfig getSegmentGeneratorConfig(File csvDataFile, Schema schema, String outDir) {
+ SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(_tableConfig, schema);
+ segmentGeneratorConfig.setInputFilePath(csvDataFile.getPath());
+ segmentGeneratorConfig.setFormat(FileFormat.CSV);
+ segmentGeneratorConfig.setOutDir(outDir);
+ segmentGeneratorConfig.setReaderConfig(new CSVRecordReaderConfig());
+ segmentGeneratorConfig.setTableName(_tableConfig.getTableName());
+ segmentGeneratorConfig.setSequenceId(0);
+ return segmentGeneratorConfig;
+ }
+
+ private String getOutputDir(Date date, String suffix) {
+ return Paths.get(System.getProperty("java.io.tmpdir"), DATE_FORMAT.format(date) + suffix).toString();
+ }
+ }
}
diff --git a/pinot-tools/src/test/java/org/apache/pinot/tools/data/generator/MultiValueGeneratorHelperTest.java b/pinot-tools/src/test/java/org/apache/pinot/tools/data/generator/MultiValueGeneratorHelperTest.java
new file mode 100644
index 0000000..6a98074
--- /dev/null
+++ b/pinot-tools/src/test/java/org/apache/pinot/tools/data/generator/MultiValueGeneratorHelperTest.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.tools.data.generator;
+
+import java.util.Arrays;
+import java.util.Random;
+import java.util.function.Supplier;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.*;
+import static org.testng.Assert.*;
+
+
+public class MultiValueGeneratorHelperTest {
+
+ @Test
+ public void testGenerateMultiValueEntries() {
+
+ Random rand = mock(Random.class);
+ when(rand.nextInt()).thenReturn(10, 20, 30, 40, 50, 60, 70);
+ when(rand.nextDouble()).thenReturn(0.4);
+ Supplier<Object> next = rand::nextInt;
+
+ double numberOfValuesPerEntry = 3.3;
+ assertEquals(MultiValueGeneratorHelper.generateMultiValueEntries(numberOfValuesPerEntry, rand, next),
+ Arrays.asList(10, 20, 30));
+
+ numberOfValuesPerEntry = 3.5;
+ assertEquals(MultiValueGeneratorHelper.generateMultiValueEntries(numberOfValuesPerEntry, rand, next),
+ Arrays.asList(40, 50, 60, 70));
+ }
+}
diff --git a/pinot-tools/src/test/java/org/apache/pinot/tools/data/generator/TimeGeneratorTest.java b/pinot-tools/src/test/java/org/apache/pinot/tools/data/generator/TimeGeneratorTest.java
new file mode 100644
index 0000000..b235a80
--- /dev/null
+++ b/pinot-tools/src/test/java/org/apache/pinot/tools/data/generator/TimeGeneratorTest.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.tools.data.generator;
+
+import java.util.Date;
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.*;
+
+
+public class TimeGeneratorTest {
+
+ @Test
+ public void testConvert() {
+ int oneHourInMillis = 60 * 60 * 1000;
+ Date date = new Date(oneHourInMillis + 1);
+
+ // seconds
+ Number convertedTime = TimeGenerator.convert(date, TimeUnit.SECONDS, FieldSpec.DataType.LONG);
+ assertTrue(convertedTime instanceof Long);
+ assertEquals(3600L, convertedTime);
+
+ // minutes
+ convertedTime = TimeGenerator.convert(date, TimeUnit.MINUTES, FieldSpec.DataType.INT);
+ assertTrue(convertedTime instanceof Integer);
+ assertEquals(60, convertedTime);
+
+ // check hours
+ convertedTime = TimeGenerator.convert(date, TimeUnit.HOURS, FieldSpec.DataType.INT);
+ assertTrue(convertedTime instanceof Integer);
+ assertEquals(1, convertedTime);
+ }
+}
diff --git a/pinot-tools/src/test/java/org/apache/pinot/tools/realtime/provisioning/MemoryEstimatorTest.java b/pinot-tools/src/test/java/org/apache/pinot/tools/realtime/provisioning/MemoryEstimatorTest.java
new file mode 100644
index 0000000..46f2727
--- /dev/null
+++ b/pinot-tools/src/test/java/org/apache/pinot/tools/realtime/provisioning/MemoryEstimatorTest.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.tools.realtime.provisioning;
+
+import java.io.File;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.function.Consumer;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+
+public class MemoryEstimatorTest {
+
+ @Test
+ public void testSegmentGenerator() throws Exception {
+ runTest("memory_estimation/schema-with-metadata.json", metadata -> {
+ assertEquals(extract(metadata, "segment.total.docs = (\\d+)"), "10000");
+ assertEquals(extract(metadata, "column.colInt.cardinality = (\\d+)"), "100");
+ assertEquals(extract(metadata, "column.colIntMV.cardinality = (\\d+)"), "150");
+ assertEquals(extract(metadata, "column.colFloat.cardinality = (\\d+)"), "200");
+ assertEquals(extract(metadata, "column.colFloatMV.cardinality = (\\d+)"), "250");
+ assertEquals(extract(metadata, "column.colString.cardinality = (\\d+)"), "300");
+ assertEquals(extract(metadata, "column.colStringMV.cardinality = (\\d+)"), "350");
+ assertEquals(extract(metadata, "column.colBytes.cardinality = (\\d+)"), "400");
+ assertEquals(extract(metadata, "column.colLong.cardinality = (\\d+)"), "500");
+ assertEquals(extract(metadata, "column.colLongMV.cardinality = (\\d+)"), "550");
+ assertEquals(extract(metadata, "column.colDouble.cardinality = (\\d+)"), "600");
+ assertEquals(extract(metadata, "column.colDoubleMV.cardinality = (\\d+)"), "650");
+ assertEquals(extract(metadata, "column.colDoubleMetric.cardinality = (\\d+)"), "700");
+ assertEquals(extract(metadata, "column.colFloatMetric.cardinality = (\\d+)"), "800");
+ assertEquals(extract(metadata, "column.colTime.cardinality = (\\d+)"), "900");
+ assertEquals(extract(metadata, "column.colInt.maxNumberOfMultiValues = (\\d+)"), "0");
+ assertEquals(extract(metadata, "column.colIntMV.maxNumberOfMultiValues = (\\d+)"), "3");
+ assertEquals(extract(metadata, "column.colFloat.maxNumberOfMultiValues = (\\d+)"), "0");
+ assertEquals(extract(metadata, "column.colFloatMV.maxNumberOfMultiValues = (\\d+)"), "2");
+ assertEquals(extract(metadata, "column.colString.maxNumberOfMultiValues = (\\d+)"), "0");
+ assertEquals(extract(metadata, "column.colStringMV.maxNumberOfMultiValues = (\\d+)"), "2");
+ assertEquals(extract(metadata, "column.colBytes.maxNumberOfMultiValues = (\\d+)"), "0");
+ assertEquals(extract(metadata, "column.colLong.maxNumberOfMultiValues = (\\d+)"), "0");
+ assertEquals(extract(metadata, "column.colLongMV.maxNumberOfMultiValues = (\\d+)"), "3");
+ assertEquals(extract(metadata, "column.colDouble.maxNumberOfMultiValues = (\\d+)"), "0");
+ assertEquals(extract(metadata, "column.colDoubleMV.maxNumberOfMultiValues = (\\d+)"), "4");
+ assertEquals(extract(metadata, "column.colDoubleMetric.maxNumberOfMultiValues = (\\d+)"), "0");
+ assertEquals(extract(metadata, "column.colFloatMetric.maxNumberOfMultiValues = (\\d+)"), "0");
+ assertEquals(extract(metadata, "column.colTime.maxNumberOfMultiValues = (\\d+)"), "0");
+ });
+ }
+
+ @Test
+ public void testSegmentGenerator_withDateTimeFieldSpec() throws Exception {
+ runTest("memory_estimation/schema-with-metadata__dateTimeFieldSpec.json", metadata -> {
+ assertEquals(extract(metadata, "segment.total.docs = (\\d+)"), "10000");
+ assertEquals(extract(metadata, "column.colInt.cardinality = (\\d+)"), "500");
+ assertEquals(extract(metadata, "column.colFloat.cardinality = (\\d+)"), "600");
+ assertEquals(extract(metadata, "column.colString.cardinality = (\\d+)"), "700");
+ assertEquals(extract(metadata, "column.colBytes.cardinality = (\\d+)"), "800");
+ assertEquals(extract(metadata, "column.colMetric.cardinality = (\\d+)"), "900");
+ assertEquals(extract(metadata, "column.colTime.cardinality = (\\d+)"), "250");
+ assertEquals(extract(metadata, "column.colTime2.cardinality = (\\d+)"), "750");
+ assertEquals(extract(metadata, "column.colInt.maxNumberOfMultiValues = (\\d+)"), "3");
+ assertEquals(extract(metadata, "column.colFloat.maxNumberOfMultiValues = (\\d+)"), "2");
+ assertEquals(extract(metadata, "column.colString.maxNumberOfMultiValues = (\\d+)"), "0");
+ assertEquals(extract(metadata, "column.colBytes.maxNumberOfMultiValues = (\\d+)"), "0");
+ assertEquals(extract(metadata, "column.colMetric.maxNumberOfMultiValues = (\\d+)"), "0");
+ assertEquals(extract(metadata, "column.colTime.maxNumberOfMultiValues = (\\d+)"), "0");
+ assertEquals(extract(metadata, "column.colTime2.maxNumberOfMultiValues = (\\d+)"), "0");
+ });
+ }
+
+ private void runTest(String schemaFileName, Consumer<String> assertFunc) throws Exception {
+
+ // arrange inputs
+ File schemaFile = readFile(schemaFileName);
+ File tableConfigFile = readFile("memory_estimation/table-config.json");
+ TableConfig tableConfig = JsonUtils.fileToObject(tableConfigFile, TableConfig.class);
+ int numberOfRows = 10_000;
+
+ // act
+ MemoryEstimator.SegmentGenerator
+ segmentGenerator = new MemoryEstimator.SegmentGenerator(schemaFile, tableConfig, numberOfRows, true);
+ File generatedSegment = segmentGenerator.generate();
+
+ // assert
+ Path metadataFile = Paths.get(generatedSegment.getPath(), "v3", "metadata.properties");
+ String metadata = new String(Files.readAllBytes(metadataFile));
+ assertFunc.accept(metadata);
+
+ // cleanup
+ FileUtils.deleteDirectory(generatedSegment);
+ }
+
+ private String extract(String metadataContent, String patternStr) {
+ Pattern pattern = Pattern.compile(patternStr);
+ Matcher matcher = pattern.matcher(metadataContent);
+ matcher.find();
+ return matcher.group(1);
+ }
+
+ private File readFile(String fileName) throws Exception {
+ URL resource = getClass().getClassLoader().getResource(fileName);
+ return new File(resource.toURI());
+ }
+}
diff --git a/pinot-tools/src/test/resources/memory_estimation/schema-with-metadata.json b/pinot-tools/src/test/resources/memory_estimation/schema-with-metadata.json
new file mode 100644
index 0000000..e048d87
--- /dev/null
+++ b/pinot-tools/src/test/resources/memory_estimation/schema-with-metadata.json
@@ -0,0 +1,93 @@
+{
+ "schemaName": "testTable",
+ "dimensionFieldSpecs": [
+ {
+ "dataType": "INT",
+ "name": "colInt",
+ "cardinality": 100
+ },
+ {
+ "dataType": "INT",
+ "name": "colIntMV",
+ "singleValueField": false,
+ "cardinality": 150,
+ "numValuesPerEntry": 3
+ },
+ {
+ "dataType": "FLOAT",
+ "name": "colFloat",
+ "cardinality": 200
+ },
+ {
+ "dataType": "FLOAT",
+ "name": "colFloatMV",
+ "singleValueField": false,
+ "cardinality": 250,
+ "numValuesPerEntry": 1.7
+ },
+ {
+ "dataType": "STRING",
+ "name": "colString",
+ "cardinality": 300,
+ "averageLength": 10
+ },
+ {
+ "dataType": "STRING",
+ "name": "colStringMV",
+ "singleValueField": false,
+ "cardinality": 350,
+ "averageLength": 10,
+ "numValuesPerEntry": 1.3
+ },
+ {
+ "dataType": "BYTES",
+ "name": "colBytes",
+ "cardinality": 400,
+ "averageLength": 5
+ },
+ {
+ "dataType": "LONG",
+ "name": "colLong",
+ "cardinality": 500
+ },
+ {
+ "dataType": "LONG",
+ "name": "colLongMV",
+ "singleValueField": false,
+ "cardinality": 550,
+ "numValuesPerEntry": 2.8
+ },
+ {
+ "dataType": "DOUBLE",
+ "name": "colDouble",
+ "cardinality": 600
+ },
+ {
+ "dataType": "DOUBLE",
+ "name": "colDoubleMV",
+ "singleValueField": false,
+ "cardinality": 650,
+ "numValuesPerEntry": 3.4
+ }
+ ],
+ "metricFieldSpecs": [
+ {
+ "dataType": "DOUBLE",
+ "name": "colDoubleMetric",
+ "cardinality": 700
+ },
+ {
+ "dataType": "FLOAT",
+ "name": "colFloatMetric",
+ "cardinality": 800
+ }
+ ],
+ "timeFieldSpec": {
+ "incomingGranularitySpec" : {
+ "dataType": "LONG",
+ "name": "colTime",
+ "timeType": "DAYS",
+ "cardinality": 900
+ }
+ }
+}
diff --git a/pinot-tools/src/test/resources/memory_estimation/schema-with-metadata__dateTimeFieldSpec.json b/pinot-tools/src/test/resources/memory_estimation/schema-with-metadata__dateTimeFieldSpec.json
new file mode 100644
index 0000000..c331a37
--- /dev/null
+++ b/pinot-tools/src/test/resources/memory_estimation/schema-with-metadata__dateTimeFieldSpec.json
@@ -0,0 +1,54 @@
+{
+ "schemaName": "testTable",
+ "dimensionFieldSpecs": [
+ {
+ "dataType": "INT",
+ "name": "colInt",
+ "singleValueField": false,
+ "cardinality": 500,
+ "numValuesPerEntry": 3
+ },
+ {
+ "dataType": "FLOAT",
+ "name": "colFloat",
+ "singleValueField": false,
+ "cardinality": 600,
+ "numValuesPerEntry": 1.7
+ },
+ {
+ "dataType": "STRING",
+ "name": "colString",
+ "cardinality": 700,
+ "averageLength": 10
+ },
+ {
+ "dataType": "BYTES",
+ "name": "colBytes",
+ "cardinality": 800,
+ "averageLength": 5
+ }
+ ],
+ "metricFieldSpecs": [
+ {
+ "dataType": "DOUBLE",
+ "name": "colMetric",
+ "cardinality": 900
+ }
+ ],
+ "dateTimeFieldSpecs": [
+ {
+ "name": "colTime",
+ "dataType": "INT",
+ "format": "1:DAYS:EPOCH",
+ "granularity": "1:DAYS",
+ "cardinality": 250
+ },
+ {
+ "name": "colTime2",
+ "dataType": "LONG",
+ "format": "1:MILLISECONDS:EPOCH",
+ "granularity": "1:MILLISECONDS",
+ "cardinality": 750
+ }
+ ]
+}
diff --git a/pinot-tools/src/test/resources/memory_estimation/table-config.json b/pinot-tools/src/test/resources/memory_estimation/table-config.json
new file mode 100644
index 0000000..5e06e3f
--- /dev/null
+++ b/pinot-tools/src/test/resources/memory_estimation/table-config.json
@@ -0,0 +1,49 @@
+{
+ "metadata": {},
+ "routing": {
+ "routingTableBuilderName": "PartitionAwareRealtime",
+ "routingTableBuilderOptions": {}
+ },
+ "segmentsConfig": {
+ "replicasPerPartition": 3,
+ "replication": "3",
+ "retentionTimeUnit": "DAYS",
+ "retentionTimeValue": "5",
+ "schemaName": "testTable",
+ "segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy",
+ "segmentPushFrequency": "daily",
+ "segmentPushType": "APPEND",
+ "timeColumnName": "colTime",
+ "timeType": "DAYS"
+ },
+ "tableIndexConfig": {
+ "aggregateMetrics": true,
+ "invertedIndexColumns": [
+ "colInt",
+ "colString"
+ ],
+ "loadMode": "MMAP",
+ "noDictionaryColumns": [
+ "colBytes"
+ ],
+ "segmentFormatVersion": "v3",
+ "sortedColumn": [],
+ "streamConfigs": {
+ "realtime.segment.flush.threshold.size": 100000000,
+ "realtime.segment.flush.threshold.time": "6h",
+ "stream.kafka.clusterGroup": "aggregate-tracking",
+ "stream.kafka.consumer.factory.class.name": "com.linkedin.pinot.v2.server.LiPinotKafkaConsumerFactory",
+ "stream.kafka.consumer.prop.auto.offset.reset": "largest",
+ "stream.kafka.consumer.type": "simple",
+ "stream.kafka.decoder.class.name": "com.linkedin.pinot.v2.server.LiKafkaDecoder",
+ "stream.kafka.topic.name": "UserGeneratedContentGestureCountEvent",
+ "streamType": "kafka"
+ }
+ },
+ "tableName": "testTable",
+ "tableType": "REALTIME",
+ "tenants": {
+ "broker": "test",
+ "server": "test"
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org