You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2020/12/09 08:34:01 UTC

[incubator-pinot] branch adding_pinot_minion_segment_creation_tasks_2 created (now f7c9ddb)

This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a change to branch adding_pinot_minion_segment_creation_tasks_2
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git.


      at f7c9ddb  Adding pinot minion segment creation task

This branch includes the following new commits:

     new 33028d5  simplify batch config and corresponding utils
     new f7c9ddb  Adding pinot minion segment creation task

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 01/02: simplify batch config and corresponding utils

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch adding_pinot_minion_segment_creation_tasks_2
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 33028d53943fbfcf934c401aca670ddf61b8593c
Author: Xiang Fu <fx...@gmail.com>
AuthorDate: Mon Dec 7 22:40:40 2020 -0800

    simplify batch config and corresponding utils
---
 .../pinot/core/util/TableConfigUtilsTest.java      |  19 ++--
 .../pinot/spi/filesystem/PinotFSFactory.java       |   2 +-
 .../pinot/spi/ingestion/batch/BatchConfig.java     |  82 +++++++-------
 .../spi/ingestion/batch/BatchConfigProperties.java |  49 ++++++---
 .../pinot/spi/utils/IngestionConfigUtils.java      |  42 +++++++-
 .../pinot/spi/ingestion/batch/BatchConfigTest.java | 119 ++++++---------------
 6 files changed, 150 insertions(+), 163 deletions(-)

diff --git a/pinot-core/src/test/java/org/apache/pinot/core/util/TableConfigUtilsTest.java b/pinot-core/src/test/java/org/apache/pinot/core/util/TableConfigUtilsTest.java
index ae313c0..9e1828d 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/util/TableConfigUtilsTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/util/TableConfigUtilsTest.java
@@ -377,16 +377,12 @@ public class TableConfigUtilsTest {
   @Test
   public void ingestionBatchConfigsTest() {
     Map<String, String> batchConfigMap = new HashMap<>();
-    batchConfigMap.put(BatchConfigProperties.BATCH_TYPE, "s3");
-    batchConfigMap
-        .put(BatchConfigProperties.constructBatchProperty("s3", BatchConfigProperties.INPUT_DIR_URI), "s3://foo");
-    batchConfigMap
-        .put(BatchConfigProperties.constructBatchProperty("s3", BatchConfigProperties.OUTPUT_DIR_URI), "s3://bar");
-    batchConfigMap
-        .put(BatchConfigProperties.constructBatchProperty("s3", BatchConfigProperties.FS_CLASS), "org.foo.S3FS");
-    batchConfigMap.put(BatchConfigProperties.constructBatchProperty("s3", BatchConfigProperties.INPUT_FORMAT), "avro");
-    batchConfigMap.put(BatchConfigProperties.constructBatchProperty("s3", BatchConfigProperties.RECORD_READER_CLASS),
-        "org.foo.Reader");
+    batchConfigMap.put(BatchConfigProperties.INPUT_DIR_URI, "s3://foo");
+    batchConfigMap.put(BatchConfigProperties.OUTPUT_DIR_URI, "gs://bar");
+    batchConfigMap.put(BatchConfigProperties.INPUT_FS_CLASS, "org.foo.S3FS");
+    batchConfigMap.put(BatchConfigProperties.OUTPUT_FS_CLASS, "org.foo.GcsFS");
+    batchConfigMap.put(BatchConfigProperties.INPUT_FORMAT, "avro");
+    batchConfigMap.put(BatchConfigProperties.RECORD_READER_CLASS, "org.foo.Reader");
 
     IngestionConfig ingestionConfig =
         new IngestionConfig(new BatchIngestionConfig(Lists.newArrayList(batchConfigMap, batchConfigMap), null, null),
@@ -395,8 +391,7 @@ public class TableConfigUtilsTest {
         new TableConfigBuilder(TableType.OFFLINE).setTableName("myTable_OFFLINE").setIngestionConfig(ingestionConfig)
             .build();
     TableConfigUtils.validateIngestionConfig(tableConfig, null);
-
-    batchConfigMap.remove(BatchConfigProperties.BATCH_TYPE);
+    batchConfigMap.remove(BatchConfigProperties.INPUT_DIR_URI);
     try {
       TableConfigUtils.validateIngestionConfig(tableConfig, null);
       Assert.fail("Should fail for invalid batch config map");
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/PinotFSFactory.java b/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/PinotFSFactory.java
index b163f5c..6366d95 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/PinotFSFactory.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/PinotFSFactory.java
@@ -38,8 +38,8 @@ public class PinotFSFactory {
   private PinotFSFactory() {
   }
 
+  public static final String LOCAL_PINOT_FS_SCHEME = "file";
   private static final Logger LOGGER = LoggerFactory.getLogger(PinotFSFactory.class);
-  private static final String LOCAL_PINOT_FS_SCHEME = "file";
   private static final String CLASS = "class";
   private static final Map<String, PinotFS> PINOT_FS_MAP = new HashMap<String, PinotFS>() {
     {
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfig.java
index 965eb9a..eeb4001 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfig.java
@@ -28,62 +28,50 @@ import org.apache.pinot.spi.data.readers.FileFormat;
  * Provides all config related to the batch data source, as configured in the table config's ingestion config
  */
 public class BatchConfig {
+  private final Map<String, String> _batchConfigMap;
+
   private final String _tableNameWithType;
-  private final String _type;
   private final String _inputDirURI;
   private final String _outputDirURI;
-  private final String _fsClassName;
-  private final Map<String, String> _fsProps = new HashMap<>();
+  private final String _inputFsClassName;
+  private final Map<String, String> _inputFsProps = new HashMap<>();
+  private final String _outputFsClassName;
+  private final Map<String, String> _outputFsProps = new HashMap<>();
   private final FileFormat _inputFormat;
   private final String _recordReaderClassName;
   private final String _recordReaderConfigClassName;
   private final Map<String, String> _recordReaderProps = new HashMap<>();
 
-  private final Map<String, String> _batchConfigMap = new HashMap<>();
-
   public BatchConfig(String tableNameWithType, Map<String, String> batchConfigsMap) {
-    _tableNameWithType = tableNameWithType;
-
-    _type = batchConfigsMap.get(BatchConfigProperties.BATCH_TYPE);
-    Preconditions.checkState(_type != null, "Property: %s cannot be null", BatchConfigProperties.BATCH_TYPE);
+    _batchConfigMap = batchConfigsMap;
 
-    String inputDirURIKey = BatchConfigProperties.constructBatchProperty(_type, BatchConfigProperties.INPUT_DIR_URI);
-    _inputDirURI = batchConfigsMap.get(inputDirURIKey);
-    Preconditions.checkState(_inputDirURI != null, "Property: %s cannot be null", inputDirURIKey);
+    _tableNameWithType = tableNameWithType;
 
-    String outputDirURIKey = BatchConfigProperties.constructBatchProperty(_type, BatchConfigProperties.OUTPUT_DIR_URI);
-    _outputDirURI = batchConfigsMap.get(outputDirURIKey);
-    Preconditions.checkState(_outputDirURI != null, "Property: %s cannot be null", outputDirURIKey);
+    _inputDirURI = batchConfigsMap.get(BatchConfigProperties.INPUT_DIR_URI);
+    Preconditions.checkState(_inputDirURI != null, "Property: %s cannot be null", BatchConfigProperties.INPUT_DIR_URI);
 
-    String fsClassNameKey = BatchConfigProperties.constructBatchProperty(_type, BatchConfigProperties.FS_CLASS);
-    _fsClassName = batchConfigsMap.get(fsClassNameKey);
-    Preconditions.checkState(_fsClassName != null, "Property: %s cannot be null", fsClassNameKey);
+    _outputDirURI = batchConfigsMap.get(BatchConfigProperties.OUTPUT_DIR_URI);
+    Preconditions.checkState(_outputDirURI != null, "Property: %s cannot be null", BatchConfigProperties.OUTPUT_DIR_URI);
 
-    String inputFormatKey = BatchConfigProperties.constructBatchProperty(_type, BatchConfigProperties.INPUT_FORMAT);
-    String inputFormat = batchConfigsMap.get(inputFormatKey);
-    Preconditions.checkState(inputFormat != null, "Property: %s cannot be null", inputFormat);
+    _inputFsClassName = batchConfigsMap.get(BatchConfigProperties.INPUT_FS_CLASS);
+    _outputFsClassName = batchConfigsMap.get(BatchConfigProperties.OUTPUT_FS_CLASS);
+    String inputFormat = batchConfigsMap.get(BatchConfigProperties.INPUT_FORMAT);
+    Preconditions.checkState(inputFormat != null, "Property: %s cannot be null", BatchConfigProperties.INPUT_FORMAT);
     _inputFormat = FileFormat.valueOf(inputFormat.toUpperCase());
 
-    String recordReaderClassNameKey =
-        BatchConfigProperties.constructBatchProperty(_type, BatchConfigProperties.RECORD_READER_CLASS);
-    _recordReaderClassName = batchConfigsMap.get(recordReaderClassNameKey);
-    Preconditions.checkState(_recordReaderClassName != null, "Property: %s cannot be null", recordReaderClassNameKey);
-
-    String recordReaderConfigClassNameKey =
-        BatchConfigProperties.constructBatchProperty(_type, BatchConfigProperties.RECORD_READER_CONFIG_CLASS);
-    _recordReaderConfigClassName = batchConfigsMap.get(recordReaderConfigClassNameKey);
-
-    String fsPropPrefix = BatchConfigProperties.constructBatchProperty(_type, BatchConfigProperties.FS_PROP_PREFIX);
-    String recordReaderPropPrefix =
-        BatchConfigProperties.constructBatchProperty(_type, BatchConfigProperties.RECORD_READER_PROP_PREFIX);
+    _recordReaderClassName = batchConfigsMap.get(BatchConfigProperties.RECORD_READER_CLASS);
+    _recordReaderConfigClassName = batchConfigsMap.get(BatchConfigProperties.RECORD_READER_CONFIG_CLASS);
     for (Map.Entry<String, String> entry : batchConfigsMap.entrySet()) {
       String key = entry.getKey();
-      if (key.startsWith(fsPropPrefix)) {
-        _fsProps.put(key, entry.getValue());
-      } else if (key.startsWith(recordReaderPropPrefix)) {
+      if (key.startsWith(BatchConfigProperties.INPUT_FS_PROP_PREFIX)) {
+        _inputFsProps.put(key, entry.getValue());
+      }
+      if (key.startsWith(BatchConfigProperties.OUTPUT_FS_PROP_PREFIX)) {
+        _outputFsProps.put(key, entry.getValue());
+      }
+      if (key.startsWith(BatchConfigProperties.RECORD_READER_PROP_PREFIX)) {
         _recordReaderProps.put(key, entry.getValue());
       }
-      _batchConfigMap.put(key, entry.getValue());
     }
   }
 
@@ -91,10 +79,6 @@ public class BatchConfig {
     return _tableNameWithType;
   }
 
-  public String getType() {
-    return _type;
-  }
-
   public String getInputDirURI() {
     return _inputDirURI;
   }
@@ -103,12 +87,20 @@ public class BatchConfig {
     return _outputDirURI;
   }
 
-  public String getFsClassName() {
-    return _fsClassName;
+  public String getInputFsClassName() {
+    return _inputFsClassName;
+  }
+
+  public Map<String, String> getInputFsProps() {
+    return _inputFsProps;
+  }
+
+  public String getOutputFsClassName() {
+    return _outputFsClassName;
   }
 
-  public Map<String, String> getFsProps() {
-    return _fsProps;
+  public Map<String, String> getOutputFsProps() {
+    return _outputFsProps;
   }
 
   public FileFormat getInputFormat() {
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfigProperties.java b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfigProperties.java
index 5d276aa..2d83335 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfigProperties.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfigProperties.java
@@ -18,33 +18,52 @@
  */
 package org.apache.pinot.spi.ingestion.batch;
 
-import org.apache.commons.lang3.StringUtils;
-
-
 /**
  * Defines all the keys used in the batch configs map
  */
 public class BatchConfigProperties {
 
-  public static final String DOT_SEPARATOR = ".";
-  public static final String BATCH_PREFIX = "batch";
+  public static final String TABLE_CONFIGS = "tableConfigs";
+  public static final String TABLE_NAME = "tableName";
 
-  public static final String BATCH_TYPE = "batchType";
   public static final String INPUT_DIR_URI = "inputDirURI";
   public static final String OUTPUT_DIR_URI = "outputDirURI";
-  public static final String FS_CLASS = "fs.className";
-  public static final String FS_PROP_PREFIX = "fs.prop";
+  public static final String INPUT_FS_CLASS = "input.fs.className";
+  public static final String INPUT_FS_PROP_PREFIX = "input.fs.prop";
+  public static final String OUTPUT_FS_CLASS = "output.fs.className";
+  public static final String OUTPUT_FS_PROP_PREFIX = "output.fs.prop";
   public static final String INPUT_FORMAT = "inputFormat";
+  public static final String INCLUDE_FILE_NAME_PATTERN = "includeFileNamePattern";
+  public static final String EXCLUDE_FILE_NAME_PATTERN = "excludeFileNamePattern";
   public static final String RECORD_READER_CLASS = "recordReader.className";
-  public static final String RECORD_READER_CONFIG_CLASS = "recordReader.config.className";
+  public static final String RECORD_READER_CONFIG_CLASS = "recordReader.configClassName";
   public static final String RECORD_READER_PROP_PREFIX = "recordReader.prop";
-
+  public static final String SCHEMA = "schema";
+  public static final String SCHEMA_URI = "schemaURI";
+  public static final String SEQUENCE_ID = "sequenceId";
+  public static final String SEGMENT_NAME_GENERATOR_TYPE = "segmentNameGenerator.type";
+  public static final String SEGMENT_NAME_GENERATOR_CONFIGS = "segmentNameGenerator.configs";
+  public static final String OVERWRITE_OUTPUT = "overwriteOutput";
   public static final String INPUT_DATA_FILE_URI_KEY = "input.data.file.uri";
-  /**
-   * Helper method to create a batch config property
-   */
-  public static String constructBatchProperty(String batchType, String property) {
-    return StringUtils.join(BATCH_PREFIX, batchType, property, DOT_SEPARATOR);
+  public static final String PUSH_MODE = "push.mode";
+  public static final String PUSH_CONTROLLER_URI = "push.controllerUri";
+  public static final String PUSH_SEGMENT_URI_PREFIX = "push.segmentUriPrefix";
+  public static final String PUSH_SEGMENT_URI_SUFFIX = "push.segmentUriSuffix";
+
+  public static final String INPUT_FILE_URI = "input.file.uri";
+  public static final String OUTPUT_SEGMENT_DIR_URI = "output.segment.dir.uri";
+
+  public enum SegmentIngestionType {
+    APPEND, REPLACE
+  }
+
+  public class SegmentNameGeneratorType {
+    public static final String SIMPLE = "simple";
+    public static final String NORMALIZED_DATE = "normalizedDate";
+    public static final String FIXED = "fixed";
   }
 
+  public enum SegmentPushType {
+    TAR, URI, METADATA
+  }
 }
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java
index 62499dd..84e0dbe 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java
@@ -19,17 +19,23 @@
 package org.apache.pinot.spi.utils;
 
 import com.google.common.base.Preconditions;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
 
 
 /**
  * Helper methods for extracting fields from IngestionConfig in a backward compatible manner
  */
 public final class IngestionConfigUtils {
+  public static final String DOT_SEPARATOR = ".";
+  private static final String DEFAULT_PUSH_MODE = "metadata";
+
   /**
    * Fetches the streamConfig from the given realtime table.
    * First, the ingestionConfigs->stream->streamConfigs will be checked.
@@ -42,8 +48,10 @@ public final class IngestionConfigUtils {
     Preconditions.checkState(tableConfig.getTableType() == TableType.REALTIME,
         "Cannot fetch streamConfigs for OFFLINE table: %s", tableNameWithType);
     Map<String, String> streamConfigMap = null;
-    if (tableConfig.getIngestionConfig() != null && tableConfig.getIngestionConfig().getStreamIngestionConfig() != null) {
-      List<Map<String, String>> streamConfigMaps = tableConfig.getIngestionConfig().getStreamIngestionConfig().getStreamConfigMaps();
+    if (tableConfig.getIngestionConfig() != null
+        && tableConfig.getIngestionConfig().getStreamIngestionConfig() != null) {
+      List<Map<String, String>> streamConfigMaps =
+          tableConfig.getIngestionConfig().getStreamIngestionConfig().getStreamConfigMaps();
       Preconditions.checkState(streamConfigMaps.size() == 1, "Only 1 stream supported per table");
       streamConfigMap = streamConfigMaps.get(0);
     }
@@ -92,4 +100,34 @@ public final class IngestionConfigUtils {
     return segmentIngestionFrequency;
   }
 
+  public static PinotConfiguration getFsProps(Map<String, String> batchConfigMap) {
+    return new PinotConfiguration(getPropsWithPrefix(batchConfigMap, BatchConfigProperties.INPUT_FS_PROP_PREFIX + DOT_SEPARATOR));
+  }
+
+  public static Map<String, Object> getPropsWithPrefix(Map<String, String> batchConfigMap, String prefix) {
+    Map<String, Object> props = new HashMap<>();
+    props.putAll(getConfigMapWithPrefix(batchConfigMap, prefix));
+    return props;
+  }
+
+  public static Map<String, String> getConfigMapWithPrefix(Map<String, String> batchConfigMap, String prefix) {
+    Map<String, String> props = new HashMap<>();
+    for (String configKey : batchConfigMap.keySet()) {
+      if (configKey.startsWith(prefix)) {
+        String[] splits = configKey.split(prefix, 2);
+        if (splits.length > 1) {
+          props.put(splits[1], batchConfigMap.get(configKey));
+        }
+      }
+    }
+    return props;
+  }
+
+  public static String getPushMode(Map<String, String> batchConfigMap) {
+    String pushMode = batchConfigMap.get(BatchConfigProperties.PUSH_MODE);
+    if (pushMode == null) {
+      pushMode = DEFAULT_PUSH_MODE;
+    }
+    return pushMode;
+  }
 }
diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/ingestion/batch/BatchConfigTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/ingestion/batch/BatchConfigTest.java
index bc34f1a..cd71177 100644
--- a/pinot-spi/src/test/java/org/apache/pinot/spi/ingestion/batch/BatchConfigTest.java
+++ b/pinot-spi/src/test/java/org/apache/pinot/spi/ingestion/batch/BatchConfigTest.java
@@ -36,76 +36,56 @@ public class BatchConfigTest {
   public void testBatchConfig() {
     Map<String, String> batchConfigMap = new HashMap<>();
     String tableName = "foo_REALTIME";
-    String batchType = "s3";
     String inputDir = "s3://foo/input";
     String outputDir = "s3://foo/output";
-    String fsClass = "org.apache.S3FS";
+    String inputFsClass = "org.apache.S3FS";
+    String outputFsClass = "org.apache.GcsGS";
     String region = "us-west";
     String username = "foo";
+    String accessKey = "${ACCESS_KEY}";
+    String secretKey = "${SECRET_KEY}";
     String inputFormat = "csv";
     String recordReaderClass = "org.foo.CSVRecordReader";
     String recordReaderConfigClass = "org.foo.CSVRecordReaderConfig";
     String separator = "|";
-    batchConfigMap.put(BatchConfigProperties.BATCH_TYPE, batchType);
-    batchConfigMap
-        .put(BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.INPUT_DIR_URI), inputDir);
-    batchConfigMap
-        .put(BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.OUTPUT_DIR_URI), outputDir);
-    batchConfigMap
-        .put(BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.FS_CLASS), fsClass);
-    batchConfigMap
-        .put(BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.INPUT_FORMAT), inputFormat);
-    batchConfigMap
-        .put(BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.RECORD_READER_CLASS),
-            recordReaderClass);
-    batchConfigMap
-        .put(BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.RECORD_READER_CONFIG_CLASS),
-            recordReaderConfigClass);
-    batchConfigMap
-        .put(BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.FS_PROP_PREFIX) + ".region",
-            region);
-    batchConfigMap.put(
-        BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.FS_PROP_PREFIX) + ".username",
-        username);
-    batchConfigMap.put(
-        BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.RECORD_READER_PROP_PREFIX)
-            + ".separator", separator);
+    batchConfigMap.put(BatchConfigProperties.INPUT_DIR_URI, inputDir);
+    batchConfigMap.put(BatchConfigProperties.OUTPUT_DIR_URI, outputDir);
+    batchConfigMap.put(BatchConfigProperties.INPUT_FS_CLASS, inputFsClass);
+    batchConfigMap.put(BatchConfigProperties.OUTPUT_FS_CLASS, outputFsClass);
+    batchConfigMap.put(BatchConfigProperties.INPUT_FORMAT, inputFormat);
+    batchConfigMap.put(BatchConfigProperties.RECORD_READER_CLASS, recordReaderClass);
+    batchConfigMap.put(BatchConfigProperties.RECORD_READER_CONFIG_CLASS, recordReaderConfigClass);
+    batchConfigMap.put(BatchConfigProperties.INPUT_FS_PROP_PREFIX + ".region", region);
+    batchConfigMap.put(BatchConfigProperties.INPUT_FS_PROP_PREFIX + ".username", username);
+    batchConfigMap.put(BatchConfigProperties.OUTPUT_FS_PROP_PREFIX + ".region", region);
+    batchConfigMap.put(BatchConfigProperties.OUTPUT_FS_PROP_PREFIX + ".accessKey", accessKey);
+    batchConfigMap.put(BatchConfigProperties.OUTPUT_FS_PROP_PREFIX + ".secretKey", secretKey);
+    batchConfigMap.put(BatchConfigProperties.RECORD_READER_PROP_PREFIX + ".separator", separator);
 
     // config with all the right properties
     BatchConfig batchConfig = new BatchConfig(tableName, batchConfigMap);
-    assertEquals(batchConfig.getType(), batchType);
     assertEquals(batchConfig.getInputDirURI(), inputDir);
     assertEquals(batchConfig.getOutputDirURI(), outputDir);
-    assertEquals(batchConfig.getFsClassName(), fsClass);
+    assertEquals(batchConfig.getInputFsClassName(), inputFsClass);
+    assertEquals(batchConfig.getOutputFsClassName(), outputFsClass);
     assertEquals(batchConfig.getInputFormat(), FileFormat.CSV);
     assertEquals(batchConfig.getRecordReaderClassName(), recordReaderClass);
     assertEquals(batchConfig.getRecordReaderConfigClassName(), recordReaderConfigClass);
-    assertEquals(batchConfig.getFsProps().size(), 2);
-    assertEquals(batchConfig.getFsProps().get(
-        BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.FS_PROP_PREFIX) + ".region"),
-        region);
-    assertEquals(batchConfig.getFsProps().get(
-        BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.FS_PROP_PREFIX) + ".username"),
-        username);
+    assertEquals(batchConfig.getInputFsProps().size(), 2);
+    assertEquals(batchConfig.getInputFsProps().get(BatchConfigProperties.INPUT_FS_PROP_PREFIX + ".region"), region);
+    assertEquals(batchConfig.getInputFsProps().get(BatchConfigProperties.INPUT_FS_PROP_PREFIX + ".username"), username);
+    assertEquals(batchConfig.getOutputFsProps().size(), 3);
+    assertEquals(batchConfig.getOutputFsProps().get(BatchConfigProperties.OUTPUT_FS_PROP_PREFIX + ".region"), region);
+    assertEquals(batchConfig.getOutputFsProps().get(BatchConfigProperties.OUTPUT_FS_PROP_PREFIX + ".accessKey"), accessKey);
+    assertEquals(batchConfig.getOutputFsProps().get(BatchConfigProperties.OUTPUT_FS_PROP_PREFIX + ".secretKey"), secretKey);
     assertEquals(batchConfig.getRecordReaderProps().size(), 1);
-    assertEquals(batchConfig.getRecordReaderProps().get(
-        BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.RECORD_READER_PROP_PREFIX)
-            + ".separator"), separator);
+    assertEquals(batchConfig.getRecordReaderProps().get(BatchConfigProperties.RECORD_READER_PROP_PREFIX + ".separator"),
+        separator);
     assertEquals(batchConfig.getTableNameWithType(), tableName);
 
     // Missing props
     Map<String, String> testBatchConfigMap = new HashMap<>(batchConfigMap);
-    testBatchConfigMap.remove(BatchConfigProperties.BATCH_TYPE);
-    try {
-      new BatchConfig(tableName, testBatchConfigMap);
-      Assert.fail("Should fail for missing 'batchType");
-    } catch (IllegalStateException e) {
-      // expected
-    }
-
-    testBatchConfigMap = new HashMap<>(batchConfigMap);
-    testBatchConfigMap
-        .remove(BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.INPUT_DIR_URI));
+    testBatchConfigMap.remove(BatchConfigProperties.INPUT_DIR_URI);
     try {
       new BatchConfig(tableName, testBatchConfigMap);
       Assert.fail("Should fail for missing 'inputDirURI");
@@ -114,8 +94,7 @@ public class BatchConfigTest {
     }
 
     testBatchConfigMap = new HashMap<>(batchConfigMap);
-    testBatchConfigMap
-        .remove(BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.OUTPUT_DIR_URI));
+    testBatchConfigMap.remove(BatchConfigProperties.OUTPUT_DIR_URI);
     try {
       new BatchConfig(tableName, testBatchConfigMap);
       Assert.fail("Should fail for missing 'outputDirURI");
@@ -124,48 +103,12 @@ public class BatchConfigTest {
     }
 
     testBatchConfigMap = new HashMap<>(batchConfigMap);
-    testBatchConfigMap
-        .remove(BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.INPUT_FORMAT));
+    testBatchConfigMap.remove(BatchConfigProperties.INPUT_FORMAT);
     try {
       new BatchConfig(tableName, testBatchConfigMap);
       Assert.fail("Should fail for missing 'inputFormat");
     } catch (IllegalStateException e) {
       // expected
     }
-
-    testBatchConfigMap = new HashMap<>(batchConfigMap);
-    testBatchConfigMap
-        .put(BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.INPUT_FORMAT), "moo");
-    try {
-      new BatchConfig(tableName, testBatchConfigMap);
-      Assert.fail("Should fail for incorrect 'inputFormat");
-    } catch (IllegalArgumentException e) {
-      // expected
-    }
-
-    testBatchConfigMap = new HashMap<>(batchConfigMap);
-    testBatchConfigMap
-        .remove(BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.RECORD_READER_CLASS));
-    try {
-      new BatchConfig(tableName, testBatchConfigMap);
-      Assert.fail("Should fail for missing 'recordReaderClassName");
-    } catch (IllegalStateException e) {
-      // expected
-    }
-
-    testBatchConfigMap = new HashMap<>(batchConfigMap);
-    testBatchConfigMap
-        .remove(BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.RECORD_READER_CONFIG_CLASS));
-    new BatchConfig(tableName, testBatchConfigMap);
-
-    testBatchConfigMap = new HashMap<>(batchConfigMap);
-    testBatchConfigMap
-        .remove(BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.FS_CLASS));
-    try {
-      new BatchConfig(tableName, testBatchConfigMap);
-      Assert.fail("Should fail for missing 'fsClassName");
-    } catch (IllegalStateException e) {
-      // expected
-    }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 02/02: Adding pinot minion segment creation task

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch adding_pinot_minion_segment_creation_tasks_2
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit f7c9ddb50d9e24cd0f8487162833c478f37a6329
Author: Xiang Fu <fx...@gmail.com>
AuthorDate: Wed Dec 9 00:33:26 2020 -0800

    Adding pinot minion segment creation task
---
 .../SegmentGenerationAndPushTaskGenerator.java     | 241 +++++++++++++++++++++
 .../minion/generator/TaskGeneratorRegistry.java    |   1 +
 .../apache/pinot/core/common/MinionConstants.java  |   6 +
 pinot-minion/pom.xml                               |   5 +
 .../executor/SegmentGenerationAndPushResult.java   |  91 ++++++++
 .../SegmentGenerationAndPushTaskExecutor.java      | 187 ++++++++++++++++
 ...egmentGenerationAndPushTaskExecutorFactory.java |   8 +
 .../executor/TaskExecutorFactoryRegistry.java      |   1 +
 .../pinot/tools/BatchQuickstartWithMinion.java     |  35 +++
 .../org/apache/pinot/tools/BootstrapTableTool.java | 136 ++++++++++--
 .../java/org/apache/pinot/tools/Quickstart.java    |  17 +-
 .../tools/admin/command/QuickstartRunner.java      |  21 ++
 .../tools/admin/command/StartMinionCommand.java    |  20 ++
 13 files changed, 743 insertions(+), 26 deletions(-)

diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/SegmentGenerationAndPushTaskGenerator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/SegmentGenerationAndPushTaskGenerator.java
new file mode 100644
index 0000000..0d05472
--- /dev/null
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/SegmentGenerationAndPushTaskGenerator.java
@@ -0,0 +1,241 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.minion.generator;
+
+import com.google.common.base.Preconditions;
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.FileSystems;
+import java.nio.file.PathMatcher;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
+import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableTaskConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
+import org.apache.pinot.spi.utils.IngestionConfigUtils;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class SegmentGenerationAndPushTaskGenerator implements PinotTaskGenerator {
+  private static final Logger LOGGER = LoggerFactory.getLogger(SegmentGenerationAndPushTaskGenerator.class);
+
+  private final ClusterInfoAccessor _clusterInfoAccessor;
+
+  public SegmentGenerationAndPushTaskGenerator(ClusterInfoAccessor clusterInfoAccessor) {
+    _clusterInfoAccessor = clusterInfoAccessor;
+  }
+
+  @Override
+  public String getTaskType() {
+    return MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE;
+  }
+
+  @Override
+  public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
+    List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>();
+
+    for (TableConfig tableConfig : tableConfigs) {
+      // Only generate tasks for OFFLINE tables
+      String offlineTableName = tableConfig.getTableName();
+      if (tableConfig.getTableType() != TableType.OFFLINE) {
+        LOGGER.warn("Skip generating SegmentGenerationAndPushTask for non-OFFLINE table: {}", offlineTableName);
+        continue;
+      }
+
+      TableTaskConfig tableTaskConfig = tableConfig.getTaskConfig();
+      Preconditions.checkNotNull(tableTaskConfig);
+      Map<String, String> taskConfigs =
+          tableTaskConfig.getConfigsForTaskType(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE);
+      Preconditions.checkNotNull(taskConfigs, "Task config shouldn't be null for Table: {}", offlineTableName);
+
+      // Get max number of tasks for this table
+      int tableMaxNumTasks;
+      String tableMaxNumTasksConfig = taskConfigs.get(MinionConstants.TABLE_MAX_NUM_TASKS_KEY);
+      if (tableMaxNumTasksConfig != null) {
+        try {
+          tableMaxNumTasks = Integer.parseInt(tableMaxNumTasksConfig);
+        } catch (Exception e) {
+          tableMaxNumTasks = Integer.MAX_VALUE;
+        }
+      } else {
+        tableMaxNumTasks = Integer.MAX_VALUE;
+      }
+
+      // Generate tasks
+      int tableNumTasks = 0;
+      // Generate up to tableMaxNumTasks tasks each time for each table
+      if (tableNumTasks == tableMaxNumTasks) {
+        break;
+      }
+      String batchSegmentIngestionType = IngestionConfigUtils.getBatchSegmentIngestionType(tableConfig);
+      String batchSegmentIngestionFrequency = IngestionConfigUtils.getBatchSegmentIngestionFrequency(tableConfig);
+      BatchIngestionConfig batchIngestionConfig = tableConfig.getIngestionConfig().getBatchIngestionConfig();
+      List<Map<String, String>> batchConfigMaps = batchIngestionConfig.getBatchConfigMaps();
+      for (Map<String, String> batchConfigMap : batchConfigMaps) {
+        try {
+          URI inputDirURI = getDirectoryUri(batchConfigMap.get(BatchConfigProperties.INPUT_DIR_URI));
+          URI outputDirURI = getDirectoryUri(batchConfigMap.get(BatchConfigProperties.OUTPUT_DIR_URI));
+
+          List<OfflineSegmentZKMetadata> offlineSegmentsMetadata = Collections.emptyList();
+          // For append mode, we don't create segments for input file URIs already created.
+          if (BatchConfigProperties.SegmentIngestionType.APPEND.name().equalsIgnoreCase(batchSegmentIngestionType)) {
+            offlineSegmentsMetadata = this._clusterInfoAccessor.getOfflineSegmentsMetadata(offlineTableName);
+          }
+          List<URI> inputFileURIs = getInputFilesFromDirectory(batchConfigMap, inputDirURI,
+              getExistingSegmentInputFiles(offlineSegmentsMetadata));
+
+          String pushMode = IngestionConfigUtils.getPushMode(batchConfigMap);
+          for (URI inputFileURI : inputFileURIs) {
+            Map<String, String> singleFileGenerationTaskConfig = new HashMap<>(batchConfigMap);
+            singleFileGenerationTaskConfig.put(BatchConfigProperties.INPUT_FILE_URI, inputFileURI.toString());
+            URI outputSegmentDirURI = getRelativeOutputPath(inputDirURI, inputFileURI, outputDirURI);
+            singleFileGenerationTaskConfig.put(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI, outputSegmentDirURI.toString());
+            singleFileGenerationTaskConfig
+                .put(BatchConfigProperties.SCHEMA, JsonUtils.objectToString(_clusterInfoAccessor.getTableSchema(offlineTableName)));
+            singleFileGenerationTaskConfig
+                .put(BatchConfigProperties.TABLE_CONFIGS, JsonUtils.objectToString(_clusterInfoAccessor.getTableConfig(offlineTableName)));
+            singleFileGenerationTaskConfig.put(BatchConfigProperties.SEQUENCE_ID, String.valueOf(tableNumTasks));
+            singleFileGenerationTaskConfig.put(BatchConfigProperties.SEGMENT_NAME_GENERATOR_TYPE, BatchConfigProperties.SegmentNameGeneratorType.SIMPLE);
+            singleFileGenerationTaskConfig.put(BatchConfigProperties.PUSH_MODE, pushMode);
+            singleFileGenerationTaskConfig.put(BatchConfigProperties.PUSH_CONTROLLER_URI, _clusterInfoAccessor.getVipUrl());
+            // Only submit raw data files with timestamp larger than checkpoint
+            pinotTaskConfigs.add(new PinotTaskConfig(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE,
+                singleFileGenerationTaskConfig));
+            tableNumTasks++;
+
+            // Generate up to tableMaxNumTasks tasks each time for each table
+            if (tableNumTasks == tableMaxNumTasks) {
+              break;
+            }
+          }
+        } catch (Exception e) {
+          LOGGER.error("Unable to generate the SegmentGenerationAndPush task. [ table configs: {}, task configs: {} ]",
+              tableConfig, taskConfigs, e);
+        }
+      }
+    }
+    return pinotTaskConfigs;
+  }
+
+  private List<URI> getInputFilesFromDirectory(Map<String, String> batchConfigMap, URI inputDirURI,
+      Set<String> existingSegmentInputFileURIs) {
+    String inputDirURIScheme = inputDirURI.getScheme();
+    if (!PinotFSFactory.isSchemeSupported(inputDirURIScheme)) {
+      String fsClass = batchConfigMap.get(BatchConfigProperties.INPUT_FS_CLASS);
+      PinotConfiguration fsProps = IngestionConfigUtils.getFsProps(batchConfigMap);
+      PinotFSFactory.register(inputDirURIScheme, fsClass, fsProps);
+    }
+    PinotFS inputDirFS = PinotFSFactory.create(inputDirURIScheme);
+
+    String includeFileNamePattern = batchConfigMap.get(BatchConfigProperties.INCLUDE_FILE_NAME_PATTERN);
+    String excludeFileNamePattern = batchConfigMap.get(BatchConfigProperties.EXCLUDE_FILE_NAME_PATTERN);
+
+    //Get list of files to process
+    String[] files;
+    try {
+      files = inputDirFS.listFiles(inputDirURI, true);
+    } catch (IOException e) {
+      LOGGER.error("Unable to list files under URI: " + inputDirURI, e);
+      return Collections.emptyList();
+    }
+    PathMatcher includeFilePathMatcher = null;
+    if (includeFileNamePattern != null) {
+      includeFilePathMatcher = FileSystems.getDefault().getPathMatcher(includeFileNamePattern);
+    }
+    PathMatcher excludeFilePathMatcher = null;
+    if (excludeFileNamePattern != null) {
+      excludeFilePathMatcher = FileSystems.getDefault().getPathMatcher(excludeFileNamePattern);
+    }
+    List<URI> inputFileURIs = new ArrayList<>();
+    for (String file : files) {
+      if (includeFilePathMatcher != null) {
+        if (!includeFilePathMatcher.matches(Paths.get(file))) {
+          continue;
+        }
+      }
+      if (excludeFilePathMatcher != null) {
+        if (excludeFilePathMatcher.matches(Paths.get(file))) {
+          continue;
+        }
+      }
+      try {
+        URI inputFileURI = new URI(file);
+        if (inputFileURI.getScheme() == null) {
+          inputFileURI = new File(file).toURI();
+        }
+        if (inputDirFS.isDirectory(inputFileURI) || existingSegmentInputFileURIs.contains(inputFileURI.toString())) {
+          continue;
+        }
+        inputFileURIs.add(inputFileURI);
+      } catch (Exception e) {
+        continue;
+      }
+    }
+    return inputFileURIs;
+  }
+
+  private Set<String> getExistingSegmentInputFiles(List<OfflineSegmentZKMetadata> offlineSegmentsMetadata) {
+    Set<String> existingSegmentInputFiles = new HashSet<>();
+    for (OfflineSegmentZKMetadata metadata : offlineSegmentsMetadata) {
+      if ((metadata.getCustomMap() != null) && metadata.getCustomMap()
+          .containsKey(BatchConfigProperties.INPUT_DATA_FILE_URI_KEY)) {
+        existingSegmentInputFiles.add(metadata.getCustomMap().get(BatchConfigProperties.INPUT_DATA_FILE_URI_KEY));
+      }
+    }
+    return existingSegmentInputFiles;
+  }
+
+  private URI getDirectoryUri(String uriStr)
+      throws URISyntaxException {
+    URI uri = new URI(uriStr);
+    if (uri.getScheme() == null) {
+      uri = new File(uriStr).toURI();
+    }
+    return uri;
+  }
+
+  public static URI getRelativeOutputPath(URI baseInputDir, URI inputFile, URI outputDir) {
+    URI relativePath = baseInputDir.relativize(inputFile);
+    Preconditions.checkState(relativePath.getPath().length() > 0 && !relativePath.equals(inputFile),
+        "Unable to extract out the relative path based on base input path: " + baseInputDir);
+    String outputDirStr = outputDir.toString();
+    outputDir = !outputDirStr.endsWith("/") ? URI.create(outputDirStr.concat("/")) : outputDir;
+    URI relativeOutputURI = outputDir.resolve(relativePath).resolve(".");
+    return relativeOutputURI;
+  }
+}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/TaskGeneratorRegistry.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/TaskGeneratorRegistry.java
index f112d8b..21070d3 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/TaskGeneratorRegistry.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/TaskGeneratorRegistry.java
@@ -36,6 +36,7 @@ public class TaskGeneratorRegistry {
   public TaskGeneratorRegistry(@Nonnull ClusterInfoAccessor clusterInfoAccessor) {
     registerTaskGenerator(new ConvertToRawIndexTaskGenerator(clusterInfoAccessor));
     registerTaskGenerator(new RealtimeToOfflineSegmentsTaskGenerator(clusterInfoAccessor));
+    registerTaskGenerator(new SegmentGenerationAndPushTaskGenerator(clusterInfoAccessor));
   }
 
   /**
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
index cd98833..546a9fb 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
@@ -88,4 +88,10 @@ public class MinionConstants {
     public static final String AGGREGATION_TYPE_KEY_SUFFIX = ".aggregationType";
     public static final String MAX_NUM_RECORDS_PER_SEGMENT_KEY = "maxNumRecordsPerSegment";
   }
+
+  // Generate segment and push to controller based on batch ingestion configs
+  public static class SegmentGenerationAndPushTask {
+    public static final String TASK_TYPE = "SegmentGenerationAndPushTask";
+  }
+
 }
diff --git a/pinot-minion/pom.xml b/pinot-minion/pom.xml
index cc3608f..f9a442c 100644
--- a/pinot-minion/pom.xml
+++ b/pinot-minion/pom.xml
@@ -81,6 +81,11 @@
       <artifactId>pinot-core</artifactId>
     </dependency>
     <dependency>
+      <groupId>org.apache.pinot</groupId>
+      <artifactId>pinot-batch-ingestion-common</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
       <groupId>org.apache.helix</groupId>
       <artifactId>helix-core</artifactId>
       <exclusions>
diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/SegmentGenerationAndPushResult.java b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/SegmentGenerationAndPushResult.java
new file mode 100644
index 0000000..d1cabcb
--- /dev/null
+++ b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/SegmentGenerationAndPushResult.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.minion.executor;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pinot.core.minion.PinotTaskConfig;
+
+
+/**
+ * The class <code>SegmentGenerationAndPushResult</code> wraps the segment generation and push
+ * results.
+ */
+public class SegmentGenerationAndPushResult {
+  private final boolean _succeed;
+  private final String _segmentName;
+  private final Exception _exception;
+  private final Map<String, Object> _customProperties;
+
+  private SegmentGenerationAndPushResult(boolean succeed, String segmentName, Exception exception,
+      Map<String, Object> customProperties) {
+    _succeed = succeed;
+    _segmentName = segmentName;
+    _exception = exception;
+    _customProperties = customProperties;
+  }
+
+  @SuppressWarnings("unchecked")
+  public <T> T getCustomProperty(String key) {
+    return (T) _customProperties.get(key);
+  }
+
+  public Exception getException() {
+    return _exception;
+  }
+
+  public boolean isSucceed() {
+    return _succeed;
+  }
+
+  public String getSegmentName() {
+    return _segmentName;
+  }
+
+  public static class Builder {
+    private boolean _succeed;
+    private String _segmentName;
+    private Exception _exception;
+    private final Map<String, Object> _customProperties = new HashMap<>();
+
+    public Builder setSucceed(boolean succeed) {
+      _succeed = succeed;
+      return this;
+    }
+
+    public void setSegmentName(String segmentName) {
+      _segmentName = segmentName;
+    }
+
+    public Builder setException(Exception exception) {
+      _exception = exception;
+      return this;
+    }
+
+    public Builder setCustomProperty(String key, Object property) {
+      _customProperties.put(key, property);
+      return this;
+    }
+
+    public SegmentGenerationAndPushResult build() {
+      return new SegmentGenerationAndPushResult(_succeed, _segmentName, _exception, _customProperties);
+    }
+  }
+}
diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/SegmentGenerationAndPushTaskExecutor.java b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/SegmentGenerationAndPushTaskExecutor.java
new file mode 100644
index 0000000..0edb584
--- /dev/null
+++ b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/SegmentGenerationAndPushTaskExecutor.java
@@ -0,0 +1,187 @@
+package org.apache.pinot.minion.executor;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.File;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.utils.TarGzCompressionUtils;
+import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationTaskRunner;
+import org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationUtils;
+import org.apache.pinot.plugin.ingestion.batch.common.SegmentPushUtils;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.filesystem.LocalPinotFS;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
+import org.apache.pinot.spi.ingestion.batch.spec.Constants;
+import org.apache.pinot.spi.ingestion.batch.spec.PinotClusterSpec;
+import org.apache.pinot.spi.ingestion.batch.spec.PushJobSpec;
+import org.apache.pinot.spi.ingestion.batch.spec.RecordReaderSpec;
+import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
+import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationTaskSpec;
+import org.apache.pinot.spi.ingestion.batch.spec.SegmentNameGeneratorSpec;
+import org.apache.pinot.spi.ingestion.batch.spec.TableSpec;
+import org.apache.pinot.spi.utils.DataSizeUtils;
+import org.apache.pinot.spi.utils.IngestionConfigUtils;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.retry.AttemptsExceededException;
+import org.apache.pinot.spi.utils.retry.RetriableOperationException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class SegmentGenerationAndPushTaskExecutor extends BaseTaskExecutor {
+  private static final Logger LOGGER = LoggerFactory.getLogger(SegmentGenerationAndPushTaskExecutor.class);
+
+  private static final PinotFS LOCAL_PINOT_FS = new LocalPinotFS();
+
+  @Override
+  public Object executeTask(PinotTaskConfig pinotTaskConfig)
+      throws Exception {
+    Map<String, String> taskConfigs = pinotTaskConfig.getConfigs();
+    SegmentGenerationAndPushResult.Builder resultBuilder = new SegmentGenerationAndPushResult.Builder();
+    File localTempDir = new File(FileUtils.getTempDirectory(), "pinot-" + UUID.randomUUID());
+    try {
+      // Generate Pinot Segment
+      SegmentGenerationTaskSpec taskSpec = new SegmentGenerationTaskSpec();
+      URI inputFileURI = URI.create(taskConfigs.get(BatchConfigProperties.INPUT_FILE_URI));
+      File localInputTempDir = new File(localTempDir, "input");
+      FileUtils.forceMkdir(localInputTempDir);
+      File localOutputTempDir = new File(localTempDir, "output");
+      FileUtils.forceMkdir(localOutputTempDir);
+      String inputFileURIScheme = inputFileURI.getScheme();
+      if (inputFileURIScheme == null) {
+        inputFileURIScheme = PinotFSFactory.LOCAL_PINOT_FS_SCHEME;
+      }
+      if (!PinotFSFactory.isSchemeSupported(inputFileURIScheme)) {
+        String fsClass = taskConfigs.get(BatchConfigProperties.INPUT_FS_CLASS);
+        PinotConfiguration fsProps = IngestionConfigUtils.getFsProps(taskConfigs);
+        PinotFSFactory.register(inputFileURIScheme, fsClass, fsProps);
+      }
+      PinotFS inputFileFS = PinotFSFactory.create(inputFileURIScheme);
+      URI outputSegmentDirURI = URI.create(taskConfigs.get(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI));
+      String outputURIScheme = outputSegmentDirURI.getScheme();
+      if (outputURIScheme == null) {
+        outputURIScheme = PinotFSFactory.LOCAL_PINOT_FS_SCHEME;
+      }
+      PinotFS outputFileFS = PinotFSFactory.create(outputURIScheme);
+      //copy input path to local
+      File localInputDataFile = new File(localInputTempDir, new File(inputFileURI.getPath()).getName());
+      inputFileFS.copyToLocalFile(inputFileURI, localInputDataFile);
+      taskSpec.setInputFilePath(localInputDataFile.getAbsolutePath());
+      taskSpec.setOutputDirectoryPath(localOutputTempDir.getAbsolutePath());
+
+      RecordReaderSpec recordReaderSpec = new RecordReaderSpec();
+      recordReaderSpec.setDataFormat(taskConfigs.get(BatchConfigProperties.INPUT_FORMAT));
+      recordReaderSpec.setClassName(taskConfigs.get(BatchConfigProperties.RECORD_READER_CLASS));
+      recordReaderSpec.setConfigClassName(taskConfigs.get(BatchConfigProperties.RECORD_READER_CONFIG_CLASS));
+      taskSpec.setRecordReaderSpec(recordReaderSpec);
+      Schema schema;
+      if (taskConfigs.containsKey(BatchConfigProperties.SCHEMA)) {
+        schema = JsonUtils
+            .stringToObject(JsonUtils.objectToString(taskConfigs.get(BatchConfigProperties.SCHEMA)), Schema.class);
+      } else if (taskConfigs.containsKey(BatchConfigProperties.SCHEMA_URI)) {
+        schema = SegmentGenerationUtils.getSchema(taskConfigs.get(BatchConfigProperties.SCHEMA_URI));
+      } else {
+        throw new RuntimeException(
+            "Missing schema for segment generation job: please set `schema` or `schemaURI` in task config.");
+      }
+      taskSpec.setSchema(schema);
+      JsonNode tableConfig = JsonUtils.stringToJsonNode(taskConfigs.get(BatchConfigProperties.TABLE_CONFIGS));
+      taskSpec.setTableConfig(tableConfig);
+      taskSpec.setSequenceId(Integer.parseInt(taskConfigs.get(BatchConfigProperties.SEQUENCE_ID)));
+      SegmentNameGeneratorSpec segmentNameGeneratorSpec = new SegmentNameGeneratorSpec();
+      segmentNameGeneratorSpec.setType(taskConfigs.get(BatchConfigProperties.SEGMENT_NAME_GENERATOR_TYPE));
+      segmentNameGeneratorSpec.setConfigs(IngestionConfigUtils
+          .getConfigMapWithPrefix(taskConfigs, BatchConfigProperties.SEGMENT_NAME_GENERATOR_CONFIGS));
+      taskSpec.setSegmentNameGeneratorSpec(segmentNameGeneratorSpec);
+      taskSpec.setCustomProperty(BatchConfigProperties.INPUT_DATA_FILE_URI_KEY, inputFileURI.toString());
+      SegmentGenerationTaskRunner taskRunner = new SegmentGenerationTaskRunner(taskSpec);
+      String segmentName = taskRunner.run();
+      // Tar segment directory to compress file
+      File localSegmentDir = new File(localOutputTempDir, segmentName);
+      String segmentTarFileName = segmentName + Constants.TAR_GZ_FILE_EXT;
+      File localSegmentTarFile = new File(localOutputTempDir, segmentTarFileName);
+      LOGGER.info("Tarring segment from: {} to: {}", localSegmentDir, localSegmentTarFile);
+      TarGzCompressionUtils.createTarGzFile(localSegmentDir, localSegmentTarFile);
+      long uncompressedSegmentSize = FileUtils.sizeOf(localSegmentDir);
+      long compressedSegmentSize = FileUtils.sizeOf(localSegmentTarFile);
+      LOGGER.info("Size for segment: {}, uncompressed: {}, compressed: {}", segmentName,
+          DataSizeUtils.fromBytes(uncompressedSegmentSize), DataSizeUtils.fromBytes(compressedSegmentSize));
+      //move segment to output PinotFS
+      URI outputSegmentTarURI = URI.create(outputSegmentDirURI + segmentTarFileName);
+      if (!Boolean.parseBoolean(taskConfigs.get(BatchConfigProperties.OVERWRITE_OUTPUT)) && outputFileFS
+          .exists(outputSegmentDirURI)) {
+        LOGGER.warn("Not overwrite existing output segment tar file: {}", outputFileFS.exists(outputSegmentDirURI));
+      } else {
+        outputFileFS.copyFromLocalFile(localSegmentTarFile, outputSegmentTarURI);
+      }
+      resultBuilder.setSegmentName(segmentName);
+      // Segment push task
+      //Get list of files to process
+      String pushMode = taskConfigs.get(BatchConfigProperties.PUSH_MODE);
+      PushJobSpec pushJobSpec = new PushJobSpec();
+      pushJobSpec.setPushAttempts(5);
+      pushJobSpec.setPushParallelism(1);
+      pushJobSpec.setPushRetryIntervalMillis(1000);
+      pushJobSpec.setSegmentUriPrefix(taskConfigs.get(BatchConfigProperties.PUSH_SEGMENT_URI_PREFIX));
+      pushJobSpec.setSegmentUriSuffix(taskConfigs.get(BatchConfigProperties.PUSH_SEGMENT_URI_SUFFIX));
+      SegmentGenerationJobSpec spec = new SegmentGenerationJobSpec();
+      spec.setPushJobSpec(pushJobSpec);
+      TableSpec tableSpec = new TableSpec();
+      tableSpec.setTableName(tableConfig.get(BatchConfigProperties.TABLE_NAME).asText());
+      spec.setTableSpec(tableSpec);
+      PinotClusterSpec pinotClusterSpec = new PinotClusterSpec();
+      pinotClusterSpec.setControllerURI(taskConfigs.get(BatchConfigProperties.PUSH_CONTROLLER_URI));
+      PinotClusterSpec[] pinotClusterSpecs = new PinotClusterSpec[]{pinotClusterSpec};
+      spec.setPinotClusterSpecs(pinotClusterSpecs);
+      switch (BatchConfigProperties.SegmentPushType.valueOf(pushMode.toUpperCase())) {
+        case TAR:
+          try {
+            SegmentPushUtils.pushSegments(spec, LOCAL_PINOT_FS, Arrays.asList(outputSegmentTarURI.toString()));
+          } catch (RetriableOperationException | AttemptsExceededException e) {
+            throw new RuntimeException(e);
+          }
+          break;
+        case URI:
+          try {
+            List<String> segmentUris = new ArrayList<>();
+            URI updatedURI = SegmentPushUtils
+                .generateSegmentTarURI(outputSegmentDirURI, outputSegmentTarURI, pushJobSpec.getSegmentUriPrefix(),
+                    pushJobSpec.getSegmentUriSuffix());
+            segmentUris.add(updatedURI.toString());
+            SegmentPushUtils.sendSegmentUris(spec, segmentUris);
+          } catch (RetriableOperationException | AttemptsExceededException e) {
+            throw new RuntimeException(e);
+          }
+          break;
+        case METADATA:
+          try {
+            Map<String, String> segmentUriToTarPathMap = SegmentPushUtils
+                .getSegmentUriToTarPathMap(outputSegmentDirURI, pushJobSpec.getSegmentUriPrefix(),
+                    pushJobSpec.getSegmentUriSuffix(), new String[]{outputSegmentTarURI.toString()});
+            SegmentPushUtils.sendSegmentUriAndMetadata(spec, outputFileFS, segmentUriToTarPathMap);
+          } catch (RetriableOperationException | AttemptsExceededException e) {
+            throw new RuntimeException(e);
+          }
+          break;
+        default:
+          throw new Exception("Unrecognized push mode - " + pushMode);
+      }
+      resultBuilder.setSucceed(true);
+    } catch (Exception e) {
+      resultBuilder.setException(e);
+    } finally {
+      // Cleanup output dir
+      FileUtils.deleteQuietly(localTempDir);
+    }
+    return resultBuilder.build();
+  }
+}
diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/SegmentGenerationAndPushTaskExecutorFactory.java b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/SegmentGenerationAndPushTaskExecutorFactory.java
new file mode 100644
index 0000000..e4b6447
--- /dev/null
+++ b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/SegmentGenerationAndPushTaskExecutorFactory.java
@@ -0,0 +1,8 @@
+package org.apache.pinot.minion.executor;
+
+public class SegmentGenerationAndPushTaskExecutorFactory implements PinotTaskExecutorFactory {
+  @Override
+  public PinotTaskExecutor create() {
+    return new SegmentGenerationAndPushTaskExecutor();
+  }
+}
diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/TaskExecutorFactoryRegistry.java b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/TaskExecutorFactoryRegistry.java
index 1b783dc..a86a39b 100644
--- a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/TaskExecutorFactoryRegistry.java
+++ b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/TaskExecutorFactoryRegistry.java
@@ -38,6 +38,7 @@ public class TaskExecutorFactoryRegistry {
     registerTaskExecutorFactory(MinionConstants.MergeRollupTask.TASK_TYPE, new MergeRollupTaskExecutorFactory());
     registerTaskExecutorFactory(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE,
         new RealtimeToOfflineSegmentsTaskExecutorFactory(minionTaskZkMetadataManager));
+    registerTaskExecutorFactory(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE, new SegmentGenerationAndPushTaskExecutorFactory());
   }
 
   /**
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/BatchQuickstartWithMinion.java b/pinot-tools/src/main/java/org/apache/pinot/tools/BatchQuickstartWithMinion.java
new file mode 100644
index 0000000..9dc11b6
--- /dev/null
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/BatchQuickstartWithMinion.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tools;
+
+import org.apache.pinot.spi.plugin.PluginManager;
+
+
+public class BatchQuickstartWithMinion extends Quickstart {
+
+  public String getBootstrapDataDir() {
+    return "examples/minions/batch/baseballStats";
+  }
+
+  public static void main(String[] args)
+      throws Exception {
+    PluginManager.get().init();
+    new BatchQuickstartWithMinion().execute();
+  }
+}
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/BootstrapTableTool.java b/pinot-tools/src/main/java/org/apache/pinot/tools/BootstrapTableTool.java
index 0723b06..fa70e8a 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/BootstrapTableTool.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/BootstrapTableTool.java
@@ -21,12 +21,24 @@ package org.apache.pinot.tools;
 import com.google.common.base.Preconditions;
 import java.io.BufferedReader;
 import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
 import java.io.FileReader;
 import java.io.Reader;
+import java.net.URI;
 import java.net.URL;
+import java.util.List;
+import java.util.Map;
 import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.minion.MinionClient;
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.ingestion.batch.BatchConfig;
+import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
 import org.apache.pinot.spi.ingestion.batch.IngestionJobLauncher;
 import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.apache.pinot.tools.admin.command.AddTableCommand;
 import org.apache.pinot.tools.utils.JarUtils;
 import org.slf4j.Logger;
@@ -36,9 +48,11 @@ import org.yaml.snakeyaml.Yaml;
 
 public class BootstrapTableTool {
   private static final Logger LOGGER = LoggerFactory.getLogger(BootstrapTableTool.class);
+  private static final String COMPLETED = "COMPLETED";
   private final String _controllerHost;
   private final int _controllerPort;
   private final String _tableDir;
+  private final MinionClient _minionClient;
 
   public BootstrapTableTool(String controllerHost, int controllerPort, String tableDir) {
     Preconditions.checkNotNull(controllerHost);
@@ -46,11 +60,13 @@ public class BootstrapTableTool {
     _controllerHost = controllerHost;
     _controllerPort = controllerPort;
     _tableDir = tableDir;
+    _minionClient = new MinionClient(controllerHost, String.valueOf(controllerPort));
   }
 
   public boolean execute()
       throws Exception {
     File setupTableTmpDir = new File(FileUtils.getTempDirectory(), String.valueOf(System.currentTimeMillis()));
+    setupTableTmpDir.mkdirs();
 
     File tableDir = new File(_tableDir);
     String tableName = tableDir.getName();
@@ -95,41 +111,121 @@ public class BootstrapTableTool {
         .setTableConfigFile(tableConfigFile.getAbsolutePath()).setControllerHost(_controllerHost)
         .setControllerPort(String.valueOf(_controllerPort)).setExecute(true).execute();
   }
+
   private boolean bootstrapOfflineTable(File setupTableTmpDir, String tableName, File schemaFile,
       File offlineTableConfigFile, File ingestionJobSpecFile)
       throws Exception {
-    LOGGER.info("Adding offline table: {}", tableName);
-    boolean tableCreationResult = createTable(schemaFile, offlineTableConfigFile);
+    TableConfig tableConfig =
+        JsonUtils.inputStreamToObject(new FileInputStream(offlineTableConfigFile), TableConfig.class);
+    if (tableConfig.getIngestionConfig() != null
+        && tableConfig.getIngestionConfig().getBatchIngestionConfig() != null) {
+      updatedTableConfig(tableConfig, tableName, setupTableTmpDir);
+    }
 
+    LOGGER.info("Adding offline table: {}", tableName);
+    File updatedTableConfigFile =
+        new File(setupTableTmpDir, String.format("%s_%d.config", tableName, System.currentTimeMillis()));
+    FileOutputStream outputStream = new FileOutputStream(updatedTableConfigFile);
+    outputStream.write(JsonUtils.objectToPrettyString(tableConfig).getBytes());
+    outputStream.close();
+    boolean tableCreationResult = createTable(schemaFile, updatedTableConfigFile);
     if (!tableCreationResult) {
       throw new RuntimeException(String
           .format("Unable to create offline table - %s from schema file [%s] and table conf file [%s].", tableName,
               schemaFile, offlineTableConfigFile));
     }
+    if (tableConfig.getTaskConfig() != null) {
+      _minionClient.scheduleMinionTasks();
+      waitForMinionTaskToFinish(30_000L);
+    }
+    if (ingestionJobSpecFile != null) {
+      if (ingestionJobSpecFile.exists()) {
+        LOGGER.info("Launch data ingestion job to build index segment for table {} and push to controller [{}:{}]",
+            tableName, _controllerHost, _controllerPort);
+        try (Reader reader = new BufferedReader(new FileReader(ingestionJobSpecFile.getAbsolutePath()))) {
+          SegmentGenerationJobSpec spec = new Yaml().loadAs(reader, SegmentGenerationJobSpec.class);
+          String inputDirURI = spec.getInputDirURI();
+          if (!new File(inputDirURI).exists()) {
+            URL resolvedInputDirURI = BootstrapTableTool.class.getClassLoader().getResource(inputDirURI);
+            if (resolvedInputDirURI.getProtocol().equals("jar")) {
+              String[] splits = resolvedInputDirURI.getFile().split("!");
+              String inputDir = new File(setupTableTmpDir, "inputData").toString();
+              JarUtils.copyResourcesToDirectory(splits[0], splits[1].substring(1), inputDir);
+              spec.setInputDirURI(inputDir);
+            } else {
+              spec.setInputDirURI(resolvedInputDirURI.toString());
+            }
+          }
+          IngestionJobLauncher.runIngestionJob(spec);
+        }
+      } else {
+        LOGGER.info("Not found ingestionJobSpec.yaml at location [{}], skipping data ingestion",
+            ingestionJobSpecFile.getAbsolutePath());
+      }
+    }
+    return true;
+  }
 
-    if (ingestionJobSpecFile.exists()) {
-      LOGGER.info("Launch data ingestion job to build index segment for table {} and push to controller [{}:{}]",
-          tableName, _controllerHost, _controllerPort);
-      try (Reader reader = new BufferedReader(new FileReader(ingestionJobSpecFile.getAbsolutePath()))) {
-        SegmentGenerationJobSpec spec = new Yaml().loadAs(reader, SegmentGenerationJobSpec.class);
-        String inputDirURI = spec.getInputDirURI();
-        if (!new File(inputDirURI).exists()) {
-          URL resolvedInputDirURI = BootstrapTableTool.class.getClassLoader().getResource(inputDirURI);
-          if (resolvedInputDirURI.getProtocol().equals("jar")) {
+  private void updatedTableConfig(TableConfig tableConfig, String tableName, File setupTableTmpDir)
+      throws Exception {
+    final List<Map<String, String>> batchConfigsMaps =
+        tableConfig.getIngestionConfig().getBatchIngestionConfig().getBatchConfigMaps();
+    for (Map<String, String> batchConfigsMap : batchConfigsMaps) {
+      BatchConfig batchConfig = new BatchConfig(TableNameBuilder.OFFLINE.tableNameWithType(tableName), batchConfigsMap);
+      String inputDirURI = batchConfig.getInputDirURI();
+      if (!new File(inputDirURI).exists()) {
+        URL resolvedInputDirURI = BootstrapTableTool.class.getClassLoader().getResource(inputDirURI);
+        if (resolvedInputDirURI != null) {
+          if ("jar".equals(resolvedInputDirURI.getProtocol())) {
             String[] splits = resolvedInputDirURI.getFile().split("!");
-            String inputDir = new File(setupTableTmpDir, "inputData").toString();
-            JarUtils.copyResourcesToDirectory(splits[0], splits[1].substring(1), inputDir);
-            spec.setInputDirURI(inputDir);
+            File inputDir = new File(setupTableTmpDir, "inputData");
+            JarUtils.copyResourcesToDirectory(splits[0], splits[1].substring(1), inputDir.toString());
+            batchConfigsMap.put(BatchConfigProperties.INPUT_DIR_URI, inputDir.toURI().toString());
+            batchConfigsMap.put(BatchConfigProperties.OUTPUT_DIR_URI,
+                new File(inputDir.getParent(), "segments").toURI().toString());
           } else {
-            spec.setInputDirURI(resolvedInputDirURI.toString());
+            final URI inputURI = resolvedInputDirURI.toURI();
+            batchConfigsMap.put(BatchConfigProperties.INPUT_DIR_URI, inputURI.toString());
+            URI outputURI =
+                inputURI.getPath().endsWith("/") ? inputURI.resolve("../segments") : inputURI.resolve("./segments");
+            batchConfigsMap.put(BatchConfigProperties.OUTPUT_DIR_URI, outputURI.toString());
           }
         }
-        IngestionJobLauncher.runIngestionJob(spec);
       }
-    } else {
-      LOGGER.info("Not found ingestionJobSpec.yaml at location [{}], skipping data ingestion",
-          ingestionJobSpecFile.getAbsolutePath());
     }
-    return true;
+  }
+
+  private boolean waitForMinionTaskToFinish(long timeoutInMillis) {
+    long startTime = System.currentTimeMillis();
+    while (System.currentTimeMillis() - startTime < timeoutInMillis) {
+      try {
+        Thread.sleep(500L);
+      } catch (InterruptedException e) {
+        // Swallow the exception
+      }
+      try {
+        final Map<String, String> taskStatesMap =
+            _minionClient.getTasksStates(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE);
+        if (taskStatesMap.isEmpty()) {
+          LOGGER.info("No scheduled tasks yet, sleep 500 millis seconds");
+          continue;
+        }
+        boolean allCompleted = true;
+        for (String taskId : taskStatesMap.keySet()) {
+          if (!COMPLETED.equalsIgnoreCase(taskStatesMap.get(taskId))) {
+            allCompleted = false;
+            break;
+          }
+        }
+        if (allCompleted) {
+          LOGGER.info("All minion tasks are completed.");
+          return true;
+        }
+      } catch (Exception e) {
+        LOGGER.error("Failed to query task endpoint", e);
+        continue;
+      }
+    }
+    return false;
   }
 }
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/Quickstart.java b/pinot-tools/src/main/java/org/apache/pinot/tools/Quickstart.java
index 69f5941..475a8ec 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/Quickstart.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/Quickstart.java
@@ -43,6 +43,10 @@ public class Quickstart {
     }
   }
 
+  public String getBootstrapDataDir() {
+    return "examples/batch/baseballStats";
+  }
+
   public static void printStatus(Color color, String message) {
     System.out.println(color._code + message + Color.RESET._code);
   }
@@ -144,16 +148,17 @@ public class Quickstart {
     File dataFile = new File(dataDir, "baseballStats_data.csv");
 
     ClassLoader classLoader = Quickstart.class.getClassLoader();
-    URL resource = classLoader.getResource("examples/batch/baseballStats/baseballStats_schema.json");
+    URL resource = classLoader.getResource(getBootstrapDataDir() + "/baseballStats_schema.json");
     com.google.common.base.Preconditions.checkNotNull(resource);
     FileUtils.copyURLToFile(resource, schemaFile);
-    resource = classLoader.getResource("examples/batch/baseballStats/rawdata/baseballStats_data.csv");
+    resource = classLoader.getResource(getBootstrapDataDir() + "/rawdata/baseballStats_data.csv");
     com.google.common.base.Preconditions.checkNotNull(resource);
     FileUtils.copyURLToFile(resource, dataFile);
-    resource = classLoader.getResource("examples/batch/baseballStats/ingestionJobSpec.yaml");
-    com.google.common.base.Preconditions.checkNotNull(resource);
-    FileUtils.copyURLToFile(resource, ingestionJobSpecFile);
-    resource = classLoader.getResource("examples/batch/baseballStats/baseballStats_offline_table_config.json");
+    resource = classLoader.getResource(getBootstrapDataDir() + "/ingestionJobSpec.yaml");
+    if (resource != null) {
+      FileUtils.copyURLToFile(resource, ingestionJobSpecFile);
+    }
+    resource = classLoader.getResource(getBootstrapDataDir() + "/baseballStats_offline_table_config.json");
     com.google.common.base.Preconditions.checkNotNull(resource);
     FileUtils.copyURLToFile(resource, tableConfigFile);
 
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java
index e2e3e38..072effd 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java
@@ -53,6 +53,8 @@ public class QuickstartRunner {
   private static final int DEFAULT_SERVER_ADMIN_API_PORT = 7500;
   private static final int DEFAULT_BROKER_PORT = 8000;
   private static final int DEFAULT_CONTROLLER_PORT = 9000;
+  private static final int DEFAULT_MINION_PORT = 6000;
+
 
   private static final String DEFAULT_ZK_DIR = "PinotZkDir";
   private static final String DEFAULT_CONTROLLER_DIR = "PinotControllerDir";
@@ -63,6 +65,7 @@ public class QuickstartRunner {
   private final int _numServers;
   private final int _numBrokers;
   private final int _numControllers;
+  private final int _numMinions;
   private final File _tempDir;
   private final boolean _enableTenantIsolation;
 
@@ -74,10 +77,17 @@ public class QuickstartRunner {
   public QuickstartRunner(List<QuickstartTableRequest> tableRequests, int numServers, int numBrokers,
       int numControllers, File tempDir, boolean enableIsolation)
       throws Exception {
+    this(tableRequests, numServers, numBrokers, numControllers, 1, tempDir, enableIsolation);
+  }
+
+  public QuickstartRunner(List<QuickstartTableRequest> tableRequests, int numServers, int numBrokers,
+      int numControllers, int numMinions, File tempDir, boolean enableIsolation)
+      throws Exception {
     _tableRequests = tableRequests;
     _numServers = numServers;
     _numBrokers = numBrokers;
     _numControllers = numControllers;
+    _numMinions = numMinions;
     _tempDir = tempDir;
     _enableTenantIsolation = enableIsolation;
     clean();
@@ -131,6 +141,16 @@ public class QuickstartRunner {
     }
   }
 
+  private void startMinions()
+      throws Exception {
+    for (int i = 0; i < _numMinions; i++) {
+      StartMinionCommand minionStarter = new StartMinionCommand();
+      minionStarter.setMinionPort(DEFAULT_MINION_PORT + i)
+          .setZkAddress(ZK_ADDRESS).setClusterName(CLUSTER_NAME);
+      minionStarter.execute();
+    }
+  }
+
   private void clean()
       throws Exception {
     FileUtils.cleanDirectory(_tempDir);
@@ -142,6 +162,7 @@ public class QuickstartRunner {
     startControllers();
     startBrokers();
     startServers();
+    startMinions();
   }
 
   public void stop()
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartMinionCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartMinionCommand.java
index 4ba95bc..74cb81f 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartMinionCommand.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartMinionCommand.java
@@ -110,4 +110,24 @@ public class StartMinionCommand extends AbstractBaseAdminCommand implements Comm
     }
     return PinotConfigUtils.generateMinionConf(_minionHost, _minionPort);
   }
+
+  public StartMinionCommand setMinionHost(String minionHost) {
+    _minionHost = minionHost;
+    return this;
+  }
+
+  public StartMinionCommand setMinionPort(int minionPort) {
+    _minionPort = minionPort;
+    return this;
+  }
+
+  public StartMinionCommand setZkAddress(String zkAddress) {
+    _zkAddress = zkAddress;
+    return this;
+  }
+
+  public StartMinionCommand setClusterName(String clusterName) {
+    _clusterName = clusterName;
+    return this;
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org