You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2023/12/26 07:51:21 UTC

(pinot) branch master updated: Data generator reorganisation (#12122)

This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new a20031528d Data generator reorganisation (#12122)
a20031528d is described below

commit a20031528da3c29371f3122eccf745061905a00d
Author: Shounak kulkarni <sh...@gmail.com>
AuthorDate: Tue Dec 26 13:21:16 2023 +0530

    Data generator reorganisation (#12122)
---
 .../recommender/data/DataGenerationHelpers.java    | 119 +++++++++++++++++++++
 .../recommender/data/generator/DataGenerator.java  |  88 +++------------
 .../data/generator/DataGeneratorSpec.java          |  28 ++++-
 .../data/{generator => writer}/AvroWriter.java     |  72 ++++++++++---
 .../recommender/data/writer/AvroWriterSpec.java    |  56 ++++++++++
 .../recommender/data/writer/CsvWriter.java         |  52 +++++++++
 .../recommender/data/writer/FileWriter.java        |  74 +++++++++++++
 .../recommender/data/writer/FileWriterSpec.java    |  47 ++++++++
 .../recommender/data/writer/JsonWriter.java        |  43 ++++++++
 .../controller/recommender/data/writer/Writer.java |  49 +++++++++
 .../recommender/data/writer/WriterSpec.java        |  34 ++++++
 .../realtime/provisioning/MemoryEstimator.java     |   5 +-
 .../tools/admin/command/GenerateDataCommand.java   |  60 ++---------
 13 files changed, 577 insertions(+), 150 deletions(-)

diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/DataGenerationHelpers.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/DataGenerationHelpers.java
new file mode 100644
index 0000000000..b4017abfa8
--- /dev/null
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/DataGenerationHelpers.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.controller.recommender.data;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.math.IntRange;
+import org.apache.pinot.controller.recommender.data.generator.DataGenerator;
+import org.apache.pinot.controller.recommender.data.generator.DataGeneratorSpec;
+import org.apache.pinot.controller.recommender.data.writer.AvroWriter;
+import org.apache.pinot.controller.recommender.data.writer.AvroWriterSpec;
+import org.apache.pinot.controller.recommender.data.writer.CsvWriter;
+import org.apache.pinot.controller.recommender.data.writer.FileWriterSpec;
+import org.apache.pinot.controller.recommender.data.writer.JsonWriter;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.TimeFieldSpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class DataGenerationHelpers {
+
+  private DataGenerationHelpers() {
+  }
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(DataGenerationHelpers.class);
+
+  public static void generateAvro(DataGenerator generator, long totalDocs, int numFiles, String outDir,
+      boolean isOverrideOutDir) throws Exception {
+    AvroWriter avroWriter = new AvroWriter();
+    avroWriter.init(new AvroWriterSpec(generator, handleOutDir(outDir, isOverrideOutDir), totalDocs, numFiles));
+    avroWriter.write();
+  }
+
+  public static void generateCsv(DataGenerator generator, long totalDocs, int numFiles, String outDir,
+      boolean isOverrideOutDir) throws Exception {
+    CsvWriter csvWriter = new CsvWriter();
+    csvWriter.init(new FileWriterSpec(generator, handleOutDir(outDir, isOverrideOutDir), totalDocs, numFiles));
+    csvWriter.write();
+  }
+
+  public static void generateJson(DataGenerator generator, long totalDocs, int numFiles, String outDir,
+      boolean isOverrideOutDir) throws Exception {
+    JsonWriter jsonWriter = new JsonWriter();
+    jsonWriter.init(new FileWriterSpec(generator, handleOutDir(outDir, isOverrideOutDir), totalDocs, numFiles));
+    jsonWriter.write();
+  }
+
+  private static File handleOutDir(String outDir, boolean isOverrideOutDir)
+      throws IOException {
+    File dir = new File(outDir);
+    if (dir.exists() && !isOverrideOutDir) {
+      LOGGER.error("output directory already exists, and override is set to false");
+      throw new RuntimeException("output directory exists");
+    }
+    if (dir.exists()) {
+      FileUtils.deleteDirectory(dir);
+    }
+    dir.mkdir();
+    return dir;
+  }
+
+  public static DataGeneratorSpec buildDataGeneratorSpec(Schema schema, List<String> columns,
+      HashMap<String, FieldSpec.DataType> dataTypes, HashMap<String, FieldSpec.FieldType> fieldTypes,
+      HashMap<String, TimeUnit> timeUnits, HashMap<String, Integer> cardinality, HashMap<String, IntRange> range,
+      HashMap<String, Map<String, Object>> pattern, Map<String, Double> mvCountMap, Map<String, Integer> lengthMap) {
+    for (final FieldSpec fs : schema.getAllFieldSpecs()) {
+      String col = fs.getName();
+      columns.add(col);
+      dataTypes.put(col, fs.getDataType());
+      fieldTypes.put(col, fs.getFieldType());
+
+      switch (fs.getFieldType()) {
+        case DIMENSION:
+          cardinality.putIfAbsent(col, 1000);
+          break;
+        case METRIC:
+          range.putIfAbsent(col, new IntRange(1, 1000));
+          break;
+        case TIME:
+          range.putIfAbsent(col, new IntRange(1, 1000));
+          TimeFieldSpec tfs = (TimeFieldSpec) fs;
+          timeUnits.put(col, tfs.getIncomingGranularitySpec().getTimeType());
+          break;
+
+        // forward compatibility with pattern generator
+        case DATE_TIME:
+        case COMPLEX:
+          break;
+        default:
+          throw new RuntimeException("Invalid field type.");
+      }
+    }
+    return new DataGeneratorSpec(columns, cardinality, range, pattern, mvCountMap, lengthMap, dataTypes, fieldTypes,
+        timeUnits);
+  }
+}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/generator/DataGenerator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/generator/DataGenerator.java
index 8069813a02..4c818846ed 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/generator/DataGenerator.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/generator/DataGenerator.java
@@ -18,19 +18,16 @@
  */
 package org.apache.pinot.controller.recommender.data.generator;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import java.io.File;
-import java.io.FileWriter;
 import java.io.IOException;
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.math.IntRange;
+import org.apache.pinot.controller.recommender.data.DataGenerationHelpers;
 import org.apache.pinot.spi.data.DimensionFieldSpec;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
@@ -39,7 +36,6 @@ import org.apache.pinot.spi.data.MetricFieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.TimeFieldSpec;
 import org.apache.pinot.spi.data.TimeGranularitySpec;
-import org.apache.pinot.spi.data.readers.FileFormat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -50,7 +46,6 @@ import org.slf4j.LoggerFactory;
 // TODO: add DATE_TIME to the data generator
 public class DataGenerator {
   private static final Logger LOGGER = LoggerFactory.getLogger(DataGenerator.class);
-  private File _outDir;
 
   DataGeneratorSpec _genSpec;
 
@@ -63,17 +58,6 @@ public class DataGenerator {
   public void init(DataGeneratorSpec spec)
       throws IOException {
     _genSpec = spec;
-    _outDir = new File(_genSpec.getOutputDir());
-    if (_outDir.exists() && !_genSpec.isOverrideOutDir()) {
-      LOGGER.error("output directory already exists, and override is set to false");
-      throw new RuntimeException("output directory exists");
-    }
-
-    if (_outDir.exists()) {
-      FileUtils.deleteDirectory(_outDir);
-    }
-
-    _outDir.mkdir();
 
     for (final String column : _genSpec.getColumns()) {
       DataType dataType = _genSpec.getDataTypeMap().get(column);
@@ -99,59 +83,17 @@ public class DataGenerator {
     }
   }
 
-  public void generateAvro(long totalDocs, int numFiles)
-      throws IOException {
-    final int numPerFiles = (int) (totalDocs / numFiles);
-    for (int i = 0; i < numFiles; i++) {
-      try (AvroWriter writer = new AvroWriter(_outDir, i, _generators, fetchSchema())) {
-        for (int j = 0; j < numPerFiles; j++) {
-          writer.writeNext();
-        }
-      }
-    }
-  }
-
-  public void generateCsv(long totalDocs, int numFiles)
-      throws IOException {
-    final int numPerFiles = (int) (totalDocs / numFiles);
-    for (int i = 0; i < numFiles; i++) {
-      try (FileWriter writer = new FileWriter(new File(_outDir, String.format("output_%d.csv", i)))) {
-        writer.append(StringUtils.join(_genSpec.getColumns(), ",")).append('\n');
-        for (int j = 0; j < numPerFiles; j++) {
-          Object[] values = new Object[_genSpec.getColumns().size()];
-          for (int k = 0; k < _genSpec.getColumns().size(); k++) {
-            Object next = _generators.get(_genSpec.getColumns().get(k)).next();
-            values[k] = serializeIfMultiValue(next);
-          }
-          writer.append(StringUtils.join(values, ",")).append('\n');
-        }
-      }
-    }
-  }
-
-  public void generateJson(long totalDocs, int numFiles)
-      throws IOException {
-    final int numPerFiles = (int) (totalDocs / numFiles);
-    final ObjectMapper mapper = new ObjectMapper();
-    for (int i = 0; i < numFiles; i++) {
-      try (FileWriter writer = new FileWriter(new File(_outDir, String.format("output_%d.json", i)))) {
-        for (int j = 0; j < numPerFiles; j++) {
-          Map<String, Object> row = new HashMap<>();
-          for (int k = 0; k < _genSpec.getColumns().size(); k++) {
-            String key = _genSpec.getColumns().get(k);
-            row.put(key, _generators.get(key).next());
-          }
-          writer.append(mapper.writeValueAsString(row)).append('\n');
-        }
-      }
-    }
-  }
-
-  private Object serializeIfMultiValue(Object obj) {
-    if (obj instanceof List) {
-      return StringUtils.join((List) obj, ";");
+  /*
+   * Returns a LinkedHashMap of columns and their respective generated values.
+   * This ensures that the entries are ordered as per the column list
+   *
+   * */
+  public Map<String, Object> nextRow() {
+    Map<String, Object> row = new LinkedHashMap<>();
+    for (String key : _genSpec.getColumns()) {
+      row.put(key, _generators.get(key).next());
     }
-    return obj;
+    return row;
   }
 
   public Schema fetchSchema() {
@@ -193,7 +135,7 @@ public class DataGenerator {
   }
 
   public static void main(String[] args)
-      throws IOException {
+      throws Exception {
 
     final Map<String, DataType> dataTypes = new HashMap<>();
     final Map<String, FieldType> fieldTypes = new HashMap<>();
@@ -257,11 +199,11 @@ public class DataGenerator {
     String outputDir = Paths.get(System.getProperty("java.io.tmpdir"), "csv-data").toString();
     final DataGeneratorSpec spec =
         new DataGeneratorSpec(columnNames, cardinality, range, template, mvCountMap, lengthMap, dataTypes, fieldTypes,
-            timeUnits, FileFormat.CSV, outputDir, true);
+            timeUnits);
 
     final DataGenerator gen = new DataGenerator();
     gen.init(spec);
-    gen.generateCsv(100, 1);
+    DataGenerationHelpers.generateCsv(gen, 100, 1, outputDir, true);
     System.out.println("CSV data is generated under: " + outputDir);
   }
 }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/generator/DataGeneratorSpec.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/generator/DataGeneratorSpec.java
index d89ea66968..f64a7a984b 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/generator/DataGeneratorSpec.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/generator/DataGeneratorSpec.java
@@ -42,15 +42,20 @@ public class DataGeneratorSpec {
   private final Map<String, FieldType> _fieldTypeMap;
   private final Map<String, TimeUnit> _timeUnitMap;
 
-  private final FileFormat _outputFileFormat;
-  private final String _outputDir;
-  private final boolean _overrideOutDir;
-
+  @Deprecated
+  private FileFormat _outputFileFormat;
+  @Deprecated
+  private String _outputDir;
+  @Deprecated
+  private boolean _overrideOutDir;
+
+  @Deprecated
   public DataGeneratorSpec() {
     this(new ArrayList<String>(), new HashMap<>(), new HashMap<>(), new HashMap<>(), new HashMap<>(), new HashMap<>(),
         new HashMap<>(), new HashMap<>(), new HashMap<>(), FileFormat.AVRO, "/tmp/dataGen", true);
   }
 
+  @Deprecated
   public DataGeneratorSpec(List<String> columns, Map<String, Integer> cardinalityMap, Map<String, IntRange> rangeMap,
       Map<String, Map<String, Object>> patternMap, Map<String, Double> mvCountMap, Map<String, Integer> lengthMap,
       Map<String, DataType> dataTypesMap, Map<String, FieldType> fieldTypesMap, Map<String, TimeUnit> timeUnitMap,
@@ -71,6 +76,21 @@ public class DataGeneratorSpec {
     _timeUnitMap = timeUnitMap;
   }
 
+  public DataGeneratorSpec(List<String> columns, Map<String, Integer> cardinalityMap, Map<String, IntRange> rangeMap,
+      Map<String, Map<String, Object>> patternMap, Map<String, Double> mvCountMap, Map<String, Integer> lengthMap,
+      Map<String, DataType> dataTypesMap, Map<String, FieldType> fieldTypesMap, Map<String, TimeUnit> timeUnitMap) {
+    _columns = columns;
+    _cardinalityMap = cardinalityMap;
+    _rangeMap = rangeMap;
+    _patternMap = patternMap;
+    _mvCountMap = mvCountMap;
+    _lengthMap = lengthMap;
+
+    _dataTypeMap = dataTypesMap;
+    _fieldTypeMap = fieldTypesMap;
+    _timeUnitMap = timeUnitMap;
+  }
+
   public Map<String, DataType> getDataTypeMap() {
     return _dataTypeMap;
   }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/generator/AvroWriter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/AvroWriter.java
similarity index 59%
rename from pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/generator/AvroWriter.java
rename to pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/AvroWriter.java
index 92bcf3be4d..1295b8991d 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/generator/AvroWriter.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/AvroWriter.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.controller.recommender.data.generator;
+package org.apache.pinot.controller.recommender.data.writer;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.node.ArrayNode;
@@ -25,6 +25,7 @@ import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
 import java.util.Map;
+import java.util.Objects;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericDatumWriter;
@@ -32,20 +33,13 @@ import org.apache.pinot.plugin.inputformat.avro.AvroSchemaUtil;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.utils.JsonUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
-public class AvroWriter implements Closeable {
-  private final Map<String, Generator> _generatorMap;
-  private final org.apache.avro.Schema _avroSchema;
-  private final DataFileWriter<GenericData.Record> _recordWriter;
-
-  public AvroWriter(File baseDir, int index, Map<String, Generator> generatorMap, Schema schema)
-      throws IOException {
-    _generatorMap = generatorMap;
-    _avroSchema = getAvroSchema(schema);
-    _recordWriter = new DataFileWriter<>(new GenericDatumWriter<GenericData.Record>(_avroSchema));
-    _recordWriter.create(_avroSchema, new File(baseDir, "part-" + index + ".avro"));
-  }
+public class AvroWriter implements Writer {
+  private static final Logger LOGGER = LoggerFactory.getLogger(AvroWriter.class);
+  private AvroWriterSpec _spec;
 
   public static org.apache.avro.Schema getAvroSchema(Schema schema) {
     ObjectNode avroSchema = JsonUtils.newObjectNode();
@@ -62,12 +56,56 @@ public class AvroWriter implements Closeable {
     return new org.apache.avro.Schema.Parser().parse(avroSchema.toString());
   }
 
-  public void writeNext()
+  @Override
+  public void init(WriterSpec spec) {
+    _spec = (AvroWriterSpec) spec;
+  }
+
+  @Override
+  public void write()
       throws IOException {
-    GenericData.Record nextRecord = new GenericData.Record(_avroSchema);
-    for (String column : _generatorMap.keySet()) {
-      nextRecord.put(column, _generatorMap.get(column).next());
+    final int numPerFiles = (int) (_spec.getTotalDocs() / _spec.getNumFiles());
+    for (int i = 0; i < _spec.getNumFiles(); i++) {
+      try (AvroRecordAppender appender = new AvroRecordAppender(
+          new File(_spec.getBaseDir(), "part-" + i + ".avro"), getAvroSchema(_spec.getSchema()))) {
+        for (int j = 0; j < numPerFiles; j++) {
+          appender.append(_spec.getGenerator().nextRow());
+        }
+      }
+    }
+  }
+
+  @Override
+  public void cleanup() {
+    File baseDir = new File(_spec.getBaseDir().toURI());
+    for (File file : Objects.requireNonNull(baseDir.listFiles())) {
+      if (!file.delete()) {
+        LOGGER.error("Unable to delete file {}", file.getAbsolutePath());
+      }
+    }
+    if (!baseDir.delete()) {
+      LOGGER.error("Unable to delete directory {}", baseDir.getAbsolutePath());
     }
+  }
+}
+
+class AvroRecordAppender implements Closeable {
+  private final DataFileWriter<GenericData.Record> _recordWriter;
+  private final org.apache.avro.Schema _avroSchema;
+
+  public AvroRecordAppender(File file, org.apache.avro.Schema avroSchema)
+      throws IOException {
+    _avroSchema = avroSchema;
+    _recordWriter = new DataFileWriter<>(new GenericDatumWriter<>(_avroSchema));
+    _recordWriter.create(_avroSchema, file);
+  }
+
+  public void append(Map<String, Object> record)
+      throws IOException {
+    GenericData.Record nextRecord = new GenericData.Record(_avroSchema);
+    record.forEach((column, value) -> {
+      nextRecord.put(column, record.get(column));
+    });
     _recordWriter.append(nextRecord);
   }
 
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/AvroWriterSpec.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/AvroWriterSpec.java
new file mode 100644
index 0000000000..a9ce271130
--- /dev/null
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/AvroWriterSpec.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.recommender.data.writer;
+
+import java.io.File;
+import org.apache.pinot.controller.recommender.data.generator.DataGenerator;
+import org.apache.pinot.spi.data.Schema;
+
+
+public class AvroWriterSpec extends WriterSpec {
+
+  private final File _baseDir;
+  private final long _totalDocs;
+  private final int _numFiles;
+  private final Schema _schema;
+
+  public AvroWriterSpec(DataGenerator generator, File baseDir, long totalDocs, int numFiles) {
+    super(generator);
+    _baseDir = baseDir;
+    _totalDocs = totalDocs;
+    _numFiles = numFiles;
+    _schema = generator.fetchSchema();
+  }
+
+  public Schema getSchema() {
+    return _schema;
+  }
+
+  public long getTotalDocs() {
+    return _totalDocs;
+  }
+
+  public int getNumFiles() {
+    return _numFiles;
+  }
+
+  public File getBaseDir() {
+    return _baseDir;
+  }
+}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/CsvWriter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/CsvWriter.java
new file mode 100644
index 0000000000..88547cc757
--- /dev/null
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/CsvWriter.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.recommender.data.writer;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.lang.StringUtils;
+import org.apache.pinot.controller.recommender.data.generator.DataGenerator;
+
+
+public class CsvWriter extends FileWriter {
+  @Override
+  protected String generateRow(DataGenerator generator) {
+    Map<String, Object> row = generator.nextRow();
+    int colCount = row.size();
+    Object[] values = new Object[colCount];
+    int index = 0;
+    for (String key : row.keySet()) {
+      values[index] = serializeIfMultiValue(row.get(key));
+      index++;
+    }
+    return StringUtils.join(values, ",");
+  }
+
+  private Object serializeIfMultiValue(Object obj) {
+    if (obj instanceof List) {
+      return StringUtils.join((List) obj, ";");
+    }
+    return obj;
+  }
+
+  @Override
+  protected String getExtension() {
+    return "csv";
+  }
+}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/FileWriter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/FileWriter.java
new file mode 100644
index 0000000000..b74c2ebbde
--- /dev/null
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/FileWriter.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.recommender.data.writer;
+
+import java.io.File;
+import java.util.Objects;
+import org.apache.commons.lang.StringUtils;
+import org.apache.pinot.controller.recommender.data.generator.DataGenerator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public abstract class FileWriter implements Writer {
+  private static final Logger LOGGER = LoggerFactory.getLogger(FileWriter.class);
+
+  private FileWriterSpec _spec;
+  @Override
+  public void init(WriterSpec spec) {
+    _spec = (FileWriterSpec) spec;
+  }
+
+  @Override
+  public void write()
+      throws Exception {
+    final int numPerFiles = (int) (_spec.getTotalDocs() / _spec.getNumFiles());
+    final String headers = StringUtils.join(_spec.getGenerator().nextRow().keySet(), ",");
+    final String extension = getExtension() == null ? "" : String.format(".%s", getExtension());
+    for (int i = 0; i < _spec.getNumFiles(); i++) {
+      try (java.io.FileWriter writer =
+          new java.io.FileWriter(new File(_spec.getBaseDir(), String.format("output_%d%s", i, extension)))) {
+        writer.append(headers).append('\n');
+        for (int j = 0; j < numPerFiles; j++) {
+          String appendString = generateRow(_spec.getGenerator());
+          writer.append(appendString).append('\n');
+        }
+      }
+    }
+  }
+
+  protected String getExtension() {
+    return null;
+  }
+
+  @Override
+  public void cleanup() {
+    File baseDir = new File(_spec.getBaseDir().toURI());
+    for (File file : Objects.requireNonNull(baseDir.listFiles())) {
+      if (!file.delete()) {
+        LOGGER.error("Unable to delete file {}", file.getAbsolutePath());
+      }
+    }
+    if (!baseDir.delete()) {
+      LOGGER.error("Unable to delete directory {}", baseDir.getAbsolutePath());
+    }
+  }
+
+  protected abstract String generateRow(DataGenerator generator);
+}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/FileWriterSpec.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/FileWriterSpec.java
new file mode 100644
index 0000000000..a7ef8cd496
--- /dev/null
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/FileWriterSpec.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.recommender.data.writer;
+
+import java.io.File;
+import org.apache.pinot.controller.recommender.data.generator.DataGenerator;
+
+
+public class FileWriterSpec extends WriterSpec {
+  private final File _baseDir;
+  private final long _totalDocs;
+  private final int _numFiles;
+  public FileWriterSpec(DataGenerator generator, File baseDir, long totalDocs, int numFiles) {
+    super(generator);
+    _baseDir = baseDir;
+    _totalDocs = totalDocs;
+    _numFiles = numFiles;
+  }
+
+  public File getBaseDir() {
+    return _baseDir;
+  }
+
+  public long getTotalDocs() {
+    return _totalDocs;
+  }
+
+  public int getNumFiles() {
+    return _numFiles;
+  }
+}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/JsonWriter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/JsonWriter.java
new file mode 100644
index 0000000000..f867df9f29
--- /dev/null
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/JsonWriter.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.recommender.data.writer;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.util.Map;
+import org.apache.pinot.controller.recommender.data.generator.DataGenerator;
+
+
+public class JsonWriter extends FileWriter {
+  @Override
+  protected String generateRow(DataGenerator generator) {
+    Map<String, Object> row = generator.nextRow();
+    final ObjectMapper mapper = new ObjectMapper();
+    try {
+      return mapper.writeValueAsString(row);
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException("Issue while processing the json entry.", e);
+    }
+  }
+
+  @Override
+  protected String getExtension() {
+    return "json";
+  }
+}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/Writer.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/Writer.java
new file mode 100644
index 0000000000..b64aef01ae
--- /dev/null
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/Writer.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.recommender.data.writer;
+
+import org.apache.pinot.controller.recommender.data.generator.DataGenerator;
+
+
+/**
+ * Interface to write data to a datasource.
+ * Implementations of this interface should use {@link DataGenerator} to generate new rows
+ * and write those to the respective datasource.
+ */
+public interface Writer {
+
+  /**
+   * Initialise the Writer
+   * @param spec {@link WriterSpec} object which contains {@link DataGenerator} object
+   *             used to generate rows to write to the data source
+   */
+  void init(WriterSpec spec);
+
+  /**
+   * Writes the generated rows to the specified datasource
+   * @throws Exception
+   */
+  void write()
+      throws Exception;
+
+  /**
+   * Cleanup the data written to the datasource
+   */
+  void cleanup();
+}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/WriterSpec.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/WriterSpec.java
new file mode 100644
index 0000000000..0a82601fa7
--- /dev/null
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/WriterSpec.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.recommender.data.writer;
+
+import org.apache.pinot.controller.recommender.data.generator.DataGenerator;
+
+
+public class WriterSpec {
+  private final DataGenerator _generator;
+
+  public WriterSpec(DataGenerator generator) {
+    _generator = generator;
+  }
+
+  public DataGenerator getGenerator() {
+    return _generator;
+  }
+}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/realtime/provisioning/MemoryEstimator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/realtime/provisioning/MemoryEstimator.java
index 9873d17ad8..3c19db74b6 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/realtime/provisioning/MemoryEstimator.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/realtime/provisioning/MemoryEstimator.java
@@ -31,6 +31,7 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.controller.recommender.data.DataGenerationHelpers;
 import org.apache.pinot.controller.recommender.data.generator.DataGenerator;
 import org.apache.pinot.controller.recommender.data.generator.DataGeneratorSpec;
 import org.apache.pinot.controller.recommender.io.metadata.DateTimeFieldSpecMetadata;
@@ -527,11 +528,11 @@ public class MemoryEstimator {
       String outputDir = new File(_workingDir, "csv").getAbsolutePath();
       DataGeneratorSpec spec =
           new DataGeneratorSpec(colNames, cardinalities, new HashMap<>(), new HashMap<>(), mvCounts, lengths, dataTypes,
-              fieldTypes, timeUnits, FileFormat.CSV, outputDir, true);
+              fieldTypes, timeUnits);
       DataGenerator dataGenerator = new DataGenerator();
       try {
         dataGenerator.init(spec);
-        dataGenerator.generateCsv(_numberOfRows, 1);
+        DataGenerationHelpers.generateCsv(dataGenerator, _numberOfRows, 1, outputDir, true);
         File outputFile = Paths.get(outputDir, "output_0.csv").toFile();
         LOGGER.info("Successfully generated data file: {}", outputFile);
         return outputFile;
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/GenerateDataCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/GenerateDataCommand.java
index faded1dd49..9a98f652fb 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/GenerateDataCommand.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/GenerateDataCommand.java
@@ -26,17 +26,15 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang.math.IntRange;
+import org.apache.pinot.controller.recommender.data.DataGenerationHelpers;
 import org.apache.pinot.controller.recommender.data.generator.DataGenerator;
 import org.apache.pinot.controller.recommender.data.generator.DataGeneratorSpec;
 import org.apache.pinot.controller.recommender.data.generator.SchemaAnnotation;
-import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.data.FieldSpec.FieldType;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.Schema.SchemaBuilder;
-import org.apache.pinot.spi.data.TimeFieldSpec;
 import org.apache.pinot.spi.data.TimeGranularitySpec;
-import org.apache.pinot.spi.data.readers.FileFormat;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.tools.Command;
 import org.slf4j.Logger;
@@ -141,18 +139,18 @@ public class GenerateDataCommand extends AbstractBaseAdminCommand implements Com
     buildCardinalityRangeMaps(_schemaAnnFile, cardinality, range, pattern);
 
     final DataGeneratorSpec spec =
-        buildDataGeneratorSpec(schema, columns, dataTypes, fieldTypes, timeUnits, cardinality, range, pattern,
-            mvCountMap, lengthMap);
+        DataGenerationHelpers.buildDataGeneratorSpec(schema, columns, dataTypes, fieldTypes, timeUnits, cardinality,
+            range, pattern, mvCountMap, lengthMap);
 
     final DataGenerator gen = new DataGenerator();
     gen.init(spec);
 
     if (FORMAT_AVRO.equalsIgnoreCase(_format)) {
-      gen.generateAvro(_numRecords, _numFiles);
+      DataGenerationHelpers.generateAvro(gen, _numRecords, _numFiles, _outDir, _overwrite);
     } else if (FORMAT_CSV.equalsIgnoreCase(_format)) {
-      gen.generateCsv(_numRecords, _numFiles);
+      DataGenerationHelpers.generateCsv(gen, _numRecords, _numFiles, _outDir, _overwrite);
     } else if (FORMAT_JSON.equalsIgnoreCase(_format)) {
-      gen.generateJson(_numRecords, _numFiles);
+      DataGenerationHelpers.generateJson(gen, _numRecords, _numFiles, _outDir, _overwrite);
     } else {
       throw new IllegalArgumentException(String.format("Invalid output format '%s'", _format));
     }
@@ -182,52 +180,6 @@ public class GenerateDataCommand extends AbstractBaseAdminCommand implements Com
     }
   }
 
-  private DataGeneratorSpec buildDataGeneratorSpec(Schema schema, List<String> columns,
-      HashMap<String, DataType> dataTypes, HashMap<String, FieldType> fieldTypes, HashMap<String, TimeUnit> timeUnits,
-      HashMap<String, Integer> cardinality, HashMap<String, IntRange> range,
-      HashMap<String, Map<String, Object>> pattern, Map<String, Double> mvCountMap, Map<String, Integer> lengthMap) {
-    for (final FieldSpec fs : schema.getAllFieldSpecs()) {
-      String col = fs.getName();
-
-      columns.add(col);
-      dataTypes.put(col, fs.getDataType());
-      fieldTypes.put(col, fs.getFieldType());
-
-      switch (fs.getFieldType()) {
-        case DIMENSION:
-          if (cardinality.get(col) == null) {
-            cardinality.put(col, 1000);
-          }
-          break;
-
-        case METRIC:
-          if (!range.containsKey(col)) {
-            range.put(col, new IntRange(1, 1000));
-          }
-          break;
-
-        case TIME:
-          if (!range.containsKey(col)) {
-            range.put(col, new IntRange(1, 1000));
-          }
-          TimeFieldSpec tfs = (TimeFieldSpec) fs;
-          timeUnits.put(col, tfs.getIncomingGranularitySpec().getTimeType());
-          break;
-
-        // forward compatibility with pattern generator
-        case DATE_TIME:
-        case COMPLEX:
-          break;
-
-        default:
-          throw new RuntimeException("Invalid field type.");
-      }
-    }
-
-    return new DataGeneratorSpec(columns, cardinality, range, pattern, mvCountMap, lengthMap, dataTypes, fieldTypes,
-        timeUnits, FileFormat.AVRO, _outDir, _overwrite);
-  }
-
   public static void main(String[] args)
       throws IOException {
     SchemaBuilder schemaBuilder = new SchemaBuilder();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org