You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2023/12/26 07:51:21 UTC
(pinot) branch master updated: Data generator reorganisation (#12122)
This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new a20031528d Data generator reorganisation (#12122)
a20031528d is described below
commit a20031528da3c29371f3122eccf745061905a00d
Author: Shounak kulkarni <sh...@gmail.com>
AuthorDate: Tue Dec 26 13:21:16 2023 +0530
Data generator reorganisation (#12122)
---
.../recommender/data/DataGenerationHelpers.java | 119 +++++++++++++++++++++
.../recommender/data/generator/DataGenerator.java | 88 +++------------
.../data/generator/DataGeneratorSpec.java | 28 ++++-
.../data/{generator => writer}/AvroWriter.java | 72 ++++++++++---
.../recommender/data/writer/AvroWriterSpec.java | 56 ++++++++++
.../recommender/data/writer/CsvWriter.java | 52 +++++++++
.../recommender/data/writer/FileWriter.java | 74 +++++++++++++
.../recommender/data/writer/FileWriterSpec.java | 47 ++++++++
.../recommender/data/writer/JsonWriter.java | 43 ++++++++
.../controller/recommender/data/writer/Writer.java | 49 +++++++++
.../recommender/data/writer/WriterSpec.java | 34 ++++++
.../realtime/provisioning/MemoryEstimator.java | 5 +-
.../tools/admin/command/GenerateDataCommand.java | 60 ++---------
13 files changed, 577 insertions(+), 150 deletions(-)
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/DataGenerationHelpers.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/DataGenerationHelpers.java
new file mode 100644
index 0000000000..b4017abfa8
--- /dev/null
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/DataGenerationHelpers.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.controller.recommender.data;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.math.IntRange;
+import org.apache.pinot.controller.recommender.data.generator.DataGenerator;
+import org.apache.pinot.controller.recommender.data.generator.DataGeneratorSpec;
+import org.apache.pinot.controller.recommender.data.writer.AvroWriter;
+import org.apache.pinot.controller.recommender.data.writer.AvroWriterSpec;
+import org.apache.pinot.controller.recommender.data.writer.CsvWriter;
+import org.apache.pinot.controller.recommender.data.writer.FileWriterSpec;
+import org.apache.pinot.controller.recommender.data.writer.JsonWriter;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.TimeFieldSpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class DataGenerationHelpers {
+
+ private DataGenerationHelpers() {
+ }
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(DataGenerationHelpers.class);
+
+ public static void generateAvro(DataGenerator generator, long totalDocs, int numFiles, String outDir,
+ boolean isOverrideOutDir) throws Exception {
+ AvroWriter avroWriter = new AvroWriter();
+ avroWriter.init(new AvroWriterSpec(generator, handleOutDir(outDir, isOverrideOutDir), totalDocs, numFiles));
+ avroWriter.write();
+ }
+
+ public static void generateCsv(DataGenerator generator, long totalDocs, int numFiles, String outDir,
+ boolean isOverrideOutDir) throws Exception {
+ CsvWriter csvWriter = new CsvWriter();
+ csvWriter.init(new FileWriterSpec(generator, handleOutDir(outDir, isOverrideOutDir), totalDocs, numFiles));
+ csvWriter.write();
+ }
+
+ public static void generateJson(DataGenerator generator, long totalDocs, int numFiles, String outDir,
+ boolean isOverrideOutDir) throws Exception {
+ JsonWriter jsonWriter = new JsonWriter();
+ jsonWriter.init(new FileWriterSpec(generator, handleOutDir(outDir, isOverrideOutDir), totalDocs, numFiles));
+ jsonWriter.write();
+ }
+
+ private static File handleOutDir(String outDir, boolean isOverrideOutDir)
+ throws IOException {
+ File dir = new File(outDir);
+ if (dir.exists() && !isOverrideOutDir) {
+ LOGGER.error("output directory already exists, and override is set to false");
+ throw new RuntimeException("output directory exists");
+ }
+ if (dir.exists()) {
+ FileUtils.deleteDirectory(dir);
+ }
+ dir.mkdir();
+ return dir;
+ }
+
+ public static DataGeneratorSpec buildDataGeneratorSpec(Schema schema, List<String> columns,
+ HashMap<String, FieldSpec.DataType> dataTypes, HashMap<String, FieldSpec.FieldType> fieldTypes,
+ HashMap<String, TimeUnit> timeUnits, HashMap<String, Integer> cardinality, HashMap<String, IntRange> range,
+ HashMap<String, Map<String, Object>> pattern, Map<String, Double> mvCountMap, Map<String, Integer> lengthMap) {
+ for (final FieldSpec fs : schema.getAllFieldSpecs()) {
+ String col = fs.getName();
+ columns.add(col);
+ dataTypes.put(col, fs.getDataType());
+ fieldTypes.put(col, fs.getFieldType());
+
+ switch (fs.getFieldType()) {
+ case DIMENSION:
+ cardinality.putIfAbsent(col, 1000);
+ break;
+ case METRIC:
+ range.putIfAbsent(col, new IntRange(1, 1000));
+ break;
+ case TIME:
+ range.putIfAbsent(col, new IntRange(1, 1000));
+ TimeFieldSpec tfs = (TimeFieldSpec) fs;
+ timeUnits.put(col, tfs.getIncomingGranularitySpec().getTimeType());
+ break;
+
+ // forward compatibility with pattern generator
+ case DATE_TIME:
+ case COMPLEX:
+ break;
+ default:
+ throw new RuntimeException("Invalid field type.");
+ }
+ }
+ return new DataGeneratorSpec(columns, cardinality, range, pattern, mvCountMap, lengthMap, dataTypes, fieldTypes,
+ timeUnits);
+ }
+}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/generator/DataGenerator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/generator/DataGenerator.java
index 8069813a02..4c818846ed 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/generator/DataGenerator.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/generator/DataGenerator.java
@@ -18,19 +18,16 @@
*/
package org.apache.pinot.controller.recommender.data.generator;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import java.io.File;
-import java.io.FileWriter;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.math.IntRange;
+import org.apache.pinot.controller.recommender.data.DataGenerationHelpers;
import org.apache.pinot.spi.data.DimensionFieldSpec;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.FieldSpec.DataType;
@@ -39,7 +36,6 @@ import org.apache.pinot.spi.data.MetricFieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.TimeFieldSpec;
import org.apache.pinot.spi.data.TimeGranularitySpec;
-import org.apache.pinot.spi.data.readers.FileFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,7 +46,6 @@ import org.slf4j.LoggerFactory;
// TODO: add DATE_TIME to the data generator
public class DataGenerator {
private static final Logger LOGGER = LoggerFactory.getLogger(DataGenerator.class);
- private File _outDir;
DataGeneratorSpec _genSpec;
@@ -63,17 +58,6 @@ public class DataGenerator {
public void init(DataGeneratorSpec spec)
throws IOException {
_genSpec = spec;
- _outDir = new File(_genSpec.getOutputDir());
- if (_outDir.exists() && !_genSpec.isOverrideOutDir()) {
- LOGGER.error("output directory already exists, and override is set to false");
- throw new RuntimeException("output directory exists");
- }
-
- if (_outDir.exists()) {
- FileUtils.deleteDirectory(_outDir);
- }
-
- _outDir.mkdir();
for (final String column : _genSpec.getColumns()) {
DataType dataType = _genSpec.getDataTypeMap().get(column);
@@ -99,59 +83,17 @@ public class DataGenerator {
}
}
- public void generateAvro(long totalDocs, int numFiles)
- throws IOException {
- final int numPerFiles = (int) (totalDocs / numFiles);
- for (int i = 0; i < numFiles; i++) {
- try (AvroWriter writer = new AvroWriter(_outDir, i, _generators, fetchSchema())) {
- for (int j = 0; j < numPerFiles; j++) {
- writer.writeNext();
- }
- }
- }
- }
-
- public void generateCsv(long totalDocs, int numFiles)
- throws IOException {
- final int numPerFiles = (int) (totalDocs / numFiles);
- for (int i = 0; i < numFiles; i++) {
- try (FileWriter writer = new FileWriter(new File(_outDir, String.format("output_%d.csv", i)))) {
- writer.append(StringUtils.join(_genSpec.getColumns(), ",")).append('\n');
- for (int j = 0; j < numPerFiles; j++) {
- Object[] values = new Object[_genSpec.getColumns().size()];
- for (int k = 0; k < _genSpec.getColumns().size(); k++) {
- Object next = _generators.get(_genSpec.getColumns().get(k)).next();
- values[k] = serializeIfMultiValue(next);
- }
- writer.append(StringUtils.join(values, ",")).append('\n');
- }
- }
- }
- }
-
- public void generateJson(long totalDocs, int numFiles)
- throws IOException {
- final int numPerFiles = (int) (totalDocs / numFiles);
- final ObjectMapper mapper = new ObjectMapper();
- for (int i = 0; i < numFiles; i++) {
- try (FileWriter writer = new FileWriter(new File(_outDir, String.format("output_%d.json", i)))) {
- for (int j = 0; j < numPerFiles; j++) {
- Map<String, Object> row = new HashMap<>();
- for (int k = 0; k < _genSpec.getColumns().size(); k++) {
- String key = _genSpec.getColumns().get(k);
- row.put(key, _generators.get(key).next());
- }
- writer.append(mapper.writeValueAsString(row)).append('\n');
- }
- }
- }
- }
-
- private Object serializeIfMultiValue(Object obj) {
- if (obj instanceof List) {
- return StringUtils.join((List) obj, ";");
+ /*
+ * Returns a LinkedHashMap of columns and their respective generated values.
+ * This ensures that the entries are ordered as per the column list
+ *
+ * */
+ public Map<String, Object> nextRow() {
+ Map<String, Object> row = new LinkedHashMap<>();
+ for (String key : _genSpec.getColumns()) {
+ row.put(key, _generators.get(key).next());
}
- return obj;
+ return row;
}
public Schema fetchSchema() {
@@ -193,7 +135,7 @@ public class DataGenerator {
}
public static void main(String[] args)
- throws IOException {
+ throws Exception {
final Map<String, DataType> dataTypes = new HashMap<>();
final Map<String, FieldType> fieldTypes = new HashMap<>();
@@ -257,11 +199,11 @@ public class DataGenerator {
String outputDir = Paths.get(System.getProperty("java.io.tmpdir"), "csv-data").toString();
final DataGeneratorSpec spec =
new DataGeneratorSpec(columnNames, cardinality, range, template, mvCountMap, lengthMap, dataTypes, fieldTypes,
- timeUnits, FileFormat.CSV, outputDir, true);
+ timeUnits);
final DataGenerator gen = new DataGenerator();
gen.init(spec);
- gen.generateCsv(100, 1);
+ DataGenerationHelpers.generateCsv(gen, 100, 1, outputDir, true);
System.out.println("CSV data is generated under: " + outputDir);
}
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/generator/DataGeneratorSpec.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/generator/DataGeneratorSpec.java
index d89ea66968..f64a7a984b 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/generator/DataGeneratorSpec.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/generator/DataGeneratorSpec.java
@@ -42,15 +42,20 @@ public class DataGeneratorSpec {
private final Map<String, FieldType> _fieldTypeMap;
private final Map<String, TimeUnit> _timeUnitMap;
- private final FileFormat _outputFileFormat;
- private final String _outputDir;
- private final boolean _overrideOutDir;
-
+ @Deprecated
+ private FileFormat _outputFileFormat;
+ @Deprecated
+ private String _outputDir;
+ @Deprecated
+ private boolean _overrideOutDir;
+
+ @Deprecated
public DataGeneratorSpec() {
this(new ArrayList<String>(), new HashMap<>(), new HashMap<>(), new HashMap<>(), new HashMap<>(), new HashMap<>(),
new HashMap<>(), new HashMap<>(), new HashMap<>(), FileFormat.AVRO, "/tmp/dataGen", true);
}
+ @Deprecated
public DataGeneratorSpec(List<String> columns, Map<String, Integer> cardinalityMap, Map<String, IntRange> rangeMap,
Map<String, Map<String, Object>> patternMap, Map<String, Double> mvCountMap, Map<String, Integer> lengthMap,
Map<String, DataType> dataTypesMap, Map<String, FieldType> fieldTypesMap, Map<String, TimeUnit> timeUnitMap,
@@ -71,6 +76,21 @@ public class DataGeneratorSpec {
_timeUnitMap = timeUnitMap;
}
+ public DataGeneratorSpec(List<String> columns, Map<String, Integer> cardinalityMap, Map<String, IntRange> rangeMap,
+ Map<String, Map<String, Object>> patternMap, Map<String, Double> mvCountMap, Map<String, Integer> lengthMap,
+ Map<String, DataType> dataTypesMap, Map<String, FieldType> fieldTypesMap, Map<String, TimeUnit> timeUnitMap) {
+ _columns = columns;
+ _cardinalityMap = cardinalityMap;
+ _rangeMap = rangeMap;
+ _patternMap = patternMap;
+ _mvCountMap = mvCountMap;
+ _lengthMap = lengthMap;
+
+ _dataTypeMap = dataTypesMap;
+ _fieldTypeMap = fieldTypesMap;
+ _timeUnitMap = timeUnitMap;
+ }
+
public Map<String, DataType> getDataTypeMap() {
return _dataTypeMap;
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/generator/AvroWriter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/AvroWriter.java
similarity index 59%
rename from pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/generator/AvroWriter.java
rename to pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/AvroWriter.java
index 92bcf3be4d..1295b8991d 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/generator/AvroWriter.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/AvroWriter.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.controller.recommender.data.generator;
+package org.apache.pinot.controller.recommender.data.writer;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
@@ -25,6 +25,7 @@ import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.Map;
+import java.util.Objects;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
@@ -32,20 +33,13 @@ import org.apache.pinot.plugin.inputformat.avro.AvroSchemaUtil;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.JsonUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-public class AvroWriter implements Closeable {
- private final Map<String, Generator> _generatorMap;
- private final org.apache.avro.Schema _avroSchema;
- private final DataFileWriter<GenericData.Record> _recordWriter;
-
- public AvroWriter(File baseDir, int index, Map<String, Generator> generatorMap, Schema schema)
- throws IOException {
- _generatorMap = generatorMap;
- _avroSchema = getAvroSchema(schema);
- _recordWriter = new DataFileWriter<>(new GenericDatumWriter<GenericData.Record>(_avroSchema));
- _recordWriter.create(_avroSchema, new File(baseDir, "part-" + index + ".avro"));
- }
+public class AvroWriter implements Writer {
+ private static final Logger LOGGER = LoggerFactory.getLogger(AvroWriter.class);
+ private AvroWriterSpec _spec;
public static org.apache.avro.Schema getAvroSchema(Schema schema) {
ObjectNode avroSchema = JsonUtils.newObjectNode();
@@ -62,12 +56,56 @@ public class AvroWriter implements Closeable {
return new org.apache.avro.Schema.Parser().parse(avroSchema.toString());
}
- public void writeNext()
+ @Override
+ public void init(WriterSpec spec) {
+ _spec = (AvroWriterSpec) spec;
+ }
+
+ @Override
+ public void write()
throws IOException {
- GenericData.Record nextRecord = new GenericData.Record(_avroSchema);
- for (String column : _generatorMap.keySet()) {
- nextRecord.put(column, _generatorMap.get(column).next());
+ final int numPerFiles = (int) (_spec.getTotalDocs() / _spec.getNumFiles());
+ for (int i = 0; i < _spec.getNumFiles(); i++) {
+ try (AvroRecordAppender appender = new AvroRecordAppender(
+ new File(_spec.getBaseDir(), "part-" + i + ".avro"), getAvroSchema(_spec.getSchema()))) {
+ for (int j = 0; j < numPerFiles; j++) {
+ appender.append(_spec.getGenerator().nextRow());
+ }
+ }
+ }
+ }
+
+ @Override
+ public void cleanup() {
+ File baseDir = new File(_spec.getBaseDir().toURI());
+ for (File file : Objects.requireNonNull(baseDir.listFiles())) {
+ if (!file.delete()) {
+ LOGGER.error("Unable to delete file {}", file.getAbsolutePath());
+ }
+ }
+ if (!baseDir.delete()) {
+ LOGGER.error("Unable to delete directory {}", baseDir.getAbsolutePath());
}
+ }
+}
+
+class AvroRecordAppender implements Closeable {
+ private final DataFileWriter<GenericData.Record> _recordWriter;
+ private final org.apache.avro.Schema _avroSchema;
+
+ public AvroRecordAppender(File file, org.apache.avro.Schema avroSchema)
+ throws IOException {
+ _avroSchema = avroSchema;
+ _recordWriter = new DataFileWriter<>(new GenericDatumWriter<>(_avroSchema));
+ _recordWriter.create(_avroSchema, file);
+ }
+
+ public void append(Map<String, Object> record)
+ throws IOException {
+ GenericData.Record nextRecord = new GenericData.Record(_avroSchema);
+ record.forEach((column, value) -> {
+ nextRecord.put(column, record.get(column));
+ });
_recordWriter.append(nextRecord);
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/AvroWriterSpec.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/AvroWriterSpec.java
new file mode 100644
index 0000000000..a9ce271130
--- /dev/null
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/AvroWriterSpec.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.recommender.data.writer;
+
+import java.io.File;
+import org.apache.pinot.controller.recommender.data.generator.DataGenerator;
+import org.apache.pinot.spi.data.Schema;
+
+
+public class AvroWriterSpec extends WriterSpec {
+
+ private final File _baseDir;
+ private final long _totalDocs;
+ private final int _numFiles;
+ private final Schema _schema;
+
+ public AvroWriterSpec(DataGenerator generator, File baseDir, long totalDocs, int numFiles) {
+ super(generator);
+ _baseDir = baseDir;
+ _totalDocs = totalDocs;
+ _numFiles = numFiles;
+ _schema = generator.fetchSchema();
+ }
+
+ public Schema getSchema() {
+ return _schema;
+ }
+
+ public long getTotalDocs() {
+ return _totalDocs;
+ }
+
+ public int getNumFiles() {
+ return _numFiles;
+ }
+
+ public File getBaseDir() {
+ return _baseDir;
+ }
+}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/CsvWriter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/CsvWriter.java
new file mode 100644
index 0000000000..88547cc757
--- /dev/null
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/CsvWriter.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.recommender.data.writer;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.lang.StringUtils;
+import org.apache.pinot.controller.recommender.data.generator.DataGenerator;
+
+
+public class CsvWriter extends FileWriter {
+ @Override
+ protected String generateRow(DataGenerator generator) {
+ Map<String, Object> row = generator.nextRow();
+ int colCount = row.size();
+ Object[] values = new Object[colCount];
+ int index = 0;
+ for (String key : row.keySet()) {
+ values[index] = serializeIfMultiValue(row.get(key));
+ index++;
+ }
+ return StringUtils.join(values, ",");
+ }
+
+ private Object serializeIfMultiValue(Object obj) {
+ if (obj instanceof List) {
+ return StringUtils.join((List) obj, ";");
+ }
+ return obj;
+ }
+
+ @Override
+ protected String getExtension() {
+ return "csv";
+ }
+}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/FileWriter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/FileWriter.java
new file mode 100644
index 0000000000..b74c2ebbde
--- /dev/null
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/FileWriter.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.recommender.data.writer;
+
+import java.io.File;
+import java.util.Objects;
+import org.apache.commons.lang.StringUtils;
+import org.apache.pinot.controller.recommender.data.generator.DataGenerator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public abstract class FileWriter implements Writer {
+ private static final Logger LOGGER = LoggerFactory.getLogger(FileWriter.class);
+
+ private FileWriterSpec _spec;
+ @Override
+ public void init(WriterSpec spec) {
+ _spec = (FileWriterSpec) spec;
+ }
+
+ @Override
+ public void write()
+ throws Exception {
+ final int numPerFiles = (int) (_spec.getTotalDocs() / _spec.getNumFiles());
+ final String headers = StringUtils.join(_spec.getGenerator().nextRow().keySet(), ",");
+ final String extension = getExtension() == null ? "" : String.format(".%s", getExtension());
+ for (int i = 0; i < _spec.getNumFiles(); i++) {
+ try (java.io.FileWriter writer =
+ new java.io.FileWriter(new File(_spec.getBaseDir(), String.format("output_%d%s", i, extension)))) {
+ writer.append(headers).append('\n');
+ for (int j = 0; j < numPerFiles; j++) {
+ String appendString = generateRow(_spec.getGenerator());
+ writer.append(appendString).append('\n');
+ }
+ }
+ }
+ }
+
+ protected String getExtension() {
+ return null;
+ }
+
+ @Override
+ public void cleanup() {
+ File baseDir = new File(_spec.getBaseDir().toURI());
+ for (File file : Objects.requireNonNull(baseDir.listFiles())) {
+ if (!file.delete()) {
+ LOGGER.error("Unable to delete file {}", file.getAbsolutePath());
+ }
+ }
+ if (!baseDir.delete()) {
+ LOGGER.error("Unable to delete directory {}", baseDir.getAbsolutePath());
+ }
+ }
+
+ protected abstract String generateRow(DataGenerator generator);
+}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/FileWriterSpec.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/FileWriterSpec.java
new file mode 100644
index 0000000000..a7ef8cd496
--- /dev/null
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/FileWriterSpec.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.recommender.data.writer;
+
+import java.io.File;
+import org.apache.pinot.controller.recommender.data.generator.DataGenerator;
+
+
+public class FileWriterSpec extends WriterSpec {
+ private final File _baseDir;
+ private final long _totalDocs;
+ private final int _numFiles;
+ public FileWriterSpec(DataGenerator generator, File baseDir, long totalDocs, int numFiles) {
+ super(generator);
+ _baseDir = baseDir;
+ _totalDocs = totalDocs;
+ _numFiles = numFiles;
+ }
+
+ public File getBaseDir() {
+ return _baseDir;
+ }
+
+ public long getTotalDocs() {
+ return _totalDocs;
+ }
+
+ public int getNumFiles() {
+ return _numFiles;
+ }
+}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/JsonWriter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/JsonWriter.java
new file mode 100644
index 0000000000..f867df9f29
--- /dev/null
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/JsonWriter.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.recommender.data.writer;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.util.Map;
+import org.apache.pinot.controller.recommender.data.generator.DataGenerator;
+
+
+public class JsonWriter extends FileWriter {
+ @Override
+ protected String generateRow(DataGenerator generator) {
+ Map<String, Object> row = generator.nextRow();
+ final ObjectMapper mapper = new ObjectMapper();
+ try {
+ return mapper.writeValueAsString(row);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException("Issue while processing the json entry.", e);
+ }
+ }
+
+ @Override
+ protected String getExtension() {
+ return "json";
+ }
+}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/Writer.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/Writer.java
new file mode 100644
index 0000000000..b64aef01ae
--- /dev/null
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/Writer.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.recommender.data.writer;
+
+import org.apache.pinot.controller.recommender.data.generator.DataGenerator;
+
+
+/**
+ * Interface to write data to a datasource.
+ * Implementations of this interface should use {@link DataGenerator} to generate new rows
+ * and write those to the respective datasource.
+ */
+public interface Writer {
+
+ /**
+ * Initialise the Writer
+ * @param spec {@link WriterSpec} object which contains {@link DataGenerator} object
+ * used to generate rows to write to the data source
+ */
+ void init(WriterSpec spec);
+
+ /**
+ * Writes the generated rows to the specified datasource
+ * @throws Exception
+ */
+ void write()
+ throws Exception;
+
+ /**
+ * Cleanup the data written to the datasource
+ */
+ void cleanup();
+}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/WriterSpec.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/WriterSpec.java
new file mode 100644
index 0000000000..0a82601fa7
--- /dev/null
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/WriterSpec.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.recommender.data.writer;
+
+import org.apache.pinot.controller.recommender.data.generator.DataGenerator;
+
+
+public class WriterSpec {
+ private final DataGenerator _generator;
+
+ public WriterSpec(DataGenerator generator) {
+ _generator = generator;
+ }
+
+ public DataGenerator getGenerator() {
+ return _generator;
+ }
+}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/realtime/provisioning/MemoryEstimator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/realtime/provisioning/MemoryEstimator.java
index 9873d17ad8..3c19db74b6 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/realtime/provisioning/MemoryEstimator.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/realtime/provisioning/MemoryEstimator.java
@@ -31,6 +31,7 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.controller.recommender.data.DataGenerationHelpers;
import org.apache.pinot.controller.recommender.data.generator.DataGenerator;
import org.apache.pinot.controller.recommender.data.generator.DataGeneratorSpec;
import org.apache.pinot.controller.recommender.io.metadata.DateTimeFieldSpecMetadata;
@@ -527,11 +528,11 @@ public class MemoryEstimator {
String outputDir = new File(_workingDir, "csv").getAbsolutePath();
DataGeneratorSpec spec =
new DataGeneratorSpec(colNames, cardinalities, new HashMap<>(), new HashMap<>(), mvCounts, lengths, dataTypes,
- fieldTypes, timeUnits, FileFormat.CSV, outputDir, true);
+ fieldTypes, timeUnits);
DataGenerator dataGenerator = new DataGenerator();
try {
dataGenerator.init(spec);
- dataGenerator.generateCsv(_numberOfRows, 1);
+ DataGenerationHelpers.generateCsv(dataGenerator, _numberOfRows, 1, outputDir, true);
File outputFile = Paths.get(outputDir, "output_0.csv").toFile();
LOGGER.info("Successfully generated data file: {}", outputFile);
return outputFile;
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/GenerateDataCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/GenerateDataCommand.java
index faded1dd49..9a98f652fb 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/GenerateDataCommand.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/GenerateDataCommand.java
@@ -26,17 +26,15 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.math.IntRange;
+import org.apache.pinot.controller.recommender.data.DataGenerationHelpers;
import org.apache.pinot.controller.recommender.data.generator.DataGenerator;
import org.apache.pinot.controller.recommender.data.generator.DataGeneratorSpec;
import org.apache.pinot.controller.recommender.data.generator.SchemaAnnotation;
-import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.data.FieldSpec.FieldType;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.Schema.SchemaBuilder;
-import org.apache.pinot.spi.data.TimeFieldSpec;
import org.apache.pinot.spi.data.TimeGranularitySpec;
-import org.apache.pinot.spi.data.readers.FileFormat;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.tools.Command;
import org.slf4j.Logger;
@@ -141,18 +139,18 @@ public class GenerateDataCommand extends AbstractBaseAdminCommand implements Com
buildCardinalityRangeMaps(_schemaAnnFile, cardinality, range, pattern);
final DataGeneratorSpec spec =
- buildDataGeneratorSpec(schema, columns, dataTypes, fieldTypes, timeUnits, cardinality, range, pattern,
- mvCountMap, lengthMap);
+ DataGenerationHelpers.buildDataGeneratorSpec(schema, columns, dataTypes, fieldTypes, timeUnits, cardinality,
+ range, pattern, mvCountMap, lengthMap);
final DataGenerator gen = new DataGenerator();
gen.init(spec);
if (FORMAT_AVRO.equalsIgnoreCase(_format)) {
- gen.generateAvro(_numRecords, _numFiles);
+ DataGenerationHelpers.generateAvro(gen, _numRecords, _numFiles, _outDir, _overwrite);
} else if (FORMAT_CSV.equalsIgnoreCase(_format)) {
- gen.generateCsv(_numRecords, _numFiles);
+ DataGenerationHelpers.generateCsv(gen, _numRecords, _numFiles, _outDir, _overwrite);
} else if (FORMAT_JSON.equalsIgnoreCase(_format)) {
- gen.generateJson(_numRecords, _numFiles);
+ DataGenerationHelpers.generateJson(gen, _numRecords, _numFiles, _outDir, _overwrite);
} else {
throw new IllegalArgumentException(String.format("Invalid output format '%s'", _format));
}
@@ -182,52 +180,6 @@ public class GenerateDataCommand extends AbstractBaseAdminCommand implements Com
}
}
- private DataGeneratorSpec buildDataGeneratorSpec(Schema schema, List<String> columns,
- HashMap<String, DataType> dataTypes, HashMap<String, FieldType> fieldTypes, HashMap<String, TimeUnit> timeUnits,
- HashMap<String, Integer> cardinality, HashMap<String, IntRange> range,
- HashMap<String, Map<String, Object>> pattern, Map<String, Double> mvCountMap, Map<String, Integer> lengthMap) {
- for (final FieldSpec fs : schema.getAllFieldSpecs()) {
- String col = fs.getName();
-
- columns.add(col);
- dataTypes.put(col, fs.getDataType());
- fieldTypes.put(col, fs.getFieldType());
-
- switch (fs.getFieldType()) {
- case DIMENSION:
- if (cardinality.get(col) == null) {
- cardinality.put(col, 1000);
- }
- break;
-
- case METRIC:
- if (!range.containsKey(col)) {
- range.put(col, new IntRange(1, 1000));
- }
- break;
-
- case TIME:
- if (!range.containsKey(col)) {
- range.put(col, new IntRange(1, 1000));
- }
- TimeFieldSpec tfs = (TimeFieldSpec) fs;
- timeUnits.put(col, tfs.getIncomingGranularitySpec().getTimeType());
- break;
-
- // forward compatibility with pattern generator
- case DATE_TIME:
- case COMPLEX:
- break;
-
- default:
- throw new RuntimeException("Invalid field type.");
- }
- }
-
- return new DataGeneratorSpec(columns, cardinality, range, pattern, mvCountMap, lengthMap, dataTypes, fieldTypes,
- timeUnits, FileFormat.AVRO, _outDir, _overwrite);
- }
-
public static void main(String[] args)
throws IOException {
SchemaBuilder schemaBuilder = new SchemaBuilder();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org