You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2020/12/09 08:34:03 UTC

[incubator-pinot] 02/02: Adding pinot minion segment creation task

This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch adding_pinot_minion_segment_creation_tasks_2
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit f7c9ddb50d9e24cd0f8487162833c478f37a6329
Author: Xiang Fu <fx...@gmail.com>
AuthorDate: Wed Dec 9 00:33:26 2020 -0800

    Adding pinot minion segment creation task
---
 .../SegmentGenerationAndPushTaskGenerator.java     | 241 +++++++++++++++++++++
 .../minion/generator/TaskGeneratorRegistry.java    |   1 +
 .../apache/pinot/core/common/MinionConstants.java  |   6 +
 pinot-minion/pom.xml                               |   5 +
 .../executor/SegmentGenerationAndPushResult.java   |  91 ++++++++
 .../SegmentGenerationAndPushTaskExecutor.java      | 187 ++++++++++++++++
 ...egmentGenerationAndPushTaskExecutorFactory.java |   8 +
 .../executor/TaskExecutorFactoryRegistry.java      |   1 +
 .../pinot/tools/BatchQuickstartWithMinion.java     |  35 +++
 .../org/apache/pinot/tools/BootstrapTableTool.java | 136 ++++++++++--
 .../java/org/apache/pinot/tools/Quickstart.java    |  17 +-
 .../tools/admin/command/QuickstartRunner.java      |  21 ++
 .../tools/admin/command/StartMinionCommand.java    |  20 ++
 13 files changed, 743 insertions(+), 26 deletions(-)

diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/SegmentGenerationAndPushTaskGenerator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/SegmentGenerationAndPushTaskGenerator.java
new file mode 100644
index 0000000..0d05472
--- /dev/null
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/SegmentGenerationAndPushTaskGenerator.java
@@ -0,0 +1,241 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.minion.generator;
+
+import com.google.common.base.Preconditions;
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.FileSystems;
+import java.nio.file.PathMatcher;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
+import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableTaskConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
+import org.apache.pinot.spi.utils.IngestionConfigUtils;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class SegmentGenerationAndPushTaskGenerator implements PinotTaskGenerator {
+  private static final Logger LOGGER = LoggerFactory.getLogger(SegmentGenerationAndPushTaskGenerator.class);
+
+  private final ClusterInfoAccessor _clusterInfoAccessor;
+
+  public SegmentGenerationAndPushTaskGenerator(ClusterInfoAccessor clusterInfoAccessor) {
+    _clusterInfoAccessor = clusterInfoAccessor;
+  }
+
+  @Override
+  public String getTaskType() {
+    return MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE;
+  }
+
+  @Override
+  public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
+    List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>();
+
+    for (TableConfig tableConfig : tableConfigs) {
+      // Only generate tasks for OFFLINE tables
+      String offlineTableName = tableConfig.getTableName();
+      if (tableConfig.getTableType() != TableType.OFFLINE) {
+        LOGGER.warn("Skip generating SegmentGenerationAndPushTask for non-OFFLINE table: {}", offlineTableName);
+        continue;
+      }
+
+      TableTaskConfig tableTaskConfig = tableConfig.getTaskConfig();
+      Preconditions.checkNotNull(tableTaskConfig);
+      Map<String, String> taskConfigs =
+          tableTaskConfig.getConfigsForTaskType(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE);
+      Preconditions.checkNotNull(taskConfigs, "Task config shouldn't be null for Table: {}", offlineTableName);
+
+      // Get max number of tasks for this table
+      int tableMaxNumTasks;
+      String tableMaxNumTasksConfig = taskConfigs.get(MinionConstants.TABLE_MAX_NUM_TASKS_KEY);
+      if (tableMaxNumTasksConfig != null) {
+        try {
+          tableMaxNumTasks = Integer.parseInt(tableMaxNumTasksConfig);
+        } catch (Exception e) {
+          tableMaxNumTasks = Integer.MAX_VALUE;
+        }
+      } else {
+        tableMaxNumTasks = Integer.MAX_VALUE;
+      }
+
+      // Generate tasks
+      int tableNumTasks = 0;
+      // Generate up to tableMaxNumTasks tasks each time for each table
+      if (tableNumTasks == tableMaxNumTasks) {
+        break;
+      }
+      String batchSegmentIngestionType = IngestionConfigUtils.getBatchSegmentIngestionType(tableConfig);
+      String batchSegmentIngestionFrequency = IngestionConfigUtils.getBatchSegmentIngestionFrequency(tableConfig);
+      BatchIngestionConfig batchIngestionConfig = tableConfig.getIngestionConfig().getBatchIngestionConfig();
+      List<Map<String, String>> batchConfigMaps = batchIngestionConfig.getBatchConfigMaps();
+      for (Map<String, String> batchConfigMap : batchConfigMaps) {
+        try {
+          URI inputDirURI = getDirectoryUri(batchConfigMap.get(BatchConfigProperties.INPUT_DIR_URI));
+          URI outputDirURI = getDirectoryUri(batchConfigMap.get(BatchConfigProperties.OUTPUT_DIR_URI));
+
+          List<OfflineSegmentZKMetadata> offlineSegmentsMetadata = Collections.emptyList();
+          // For append mode, we don't create segments for input file URIs already created.
+          if (BatchConfigProperties.SegmentIngestionType.APPEND.name().equalsIgnoreCase(batchSegmentIngestionType)) {
+            offlineSegmentsMetadata = this._clusterInfoAccessor.getOfflineSegmentsMetadata(offlineTableName);
+          }
+          List<URI> inputFileURIs = getInputFilesFromDirectory(batchConfigMap, inputDirURI,
+              getExistingSegmentInputFiles(offlineSegmentsMetadata));
+
+          String pushMode = IngestionConfigUtils.getPushMode(batchConfigMap);
+          for (URI inputFileURI : inputFileURIs) {
+            Map<String, String> singleFileGenerationTaskConfig = new HashMap<>(batchConfigMap);
+            singleFileGenerationTaskConfig.put(BatchConfigProperties.INPUT_FILE_URI, inputFileURI.toString());
+            URI outputSegmentDirURI = getRelativeOutputPath(inputDirURI, inputFileURI, outputDirURI);
+            singleFileGenerationTaskConfig.put(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI, outputSegmentDirURI.toString());
+            singleFileGenerationTaskConfig
+                .put(BatchConfigProperties.SCHEMA, JsonUtils.objectToString(_clusterInfoAccessor.getTableSchema(offlineTableName)));
+            singleFileGenerationTaskConfig
+                .put(BatchConfigProperties.TABLE_CONFIGS, JsonUtils.objectToString(_clusterInfoAccessor.getTableConfig(offlineTableName)));
+            singleFileGenerationTaskConfig.put(BatchConfigProperties.SEQUENCE_ID, String.valueOf(tableNumTasks));
+            singleFileGenerationTaskConfig.put(BatchConfigProperties.SEGMENT_NAME_GENERATOR_TYPE, BatchConfigProperties.SegmentNameGeneratorType.SIMPLE);
+            singleFileGenerationTaskConfig.put(BatchConfigProperties.PUSH_MODE, pushMode);
+            singleFileGenerationTaskConfig.put(BatchConfigProperties.PUSH_CONTROLLER_URI, _clusterInfoAccessor.getVipUrl());
+            // Only submit raw data files with timestamp larger than checkpoint
+            pinotTaskConfigs.add(new PinotTaskConfig(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE,
+                singleFileGenerationTaskConfig));
+            tableNumTasks++;
+
+            // Generate up to tableMaxNumTasks tasks each time for each table
+            if (tableNumTasks == tableMaxNumTasks) {
+              break;
+            }
+          }
+        } catch (Exception e) {
+          LOGGER.error("Unable to generate the SegmentGenerationAndPush task. [ table configs: {}, task configs: {} ]",
+              tableConfig, taskConfigs, e);
+        }
+      }
+    }
+    return pinotTaskConfigs;
+  }
+
+  private List<URI> getInputFilesFromDirectory(Map<String, String> batchConfigMap, URI inputDirURI,
+      Set<String> existingSegmentInputFileURIs) {
+    String inputDirURIScheme = inputDirURI.getScheme();
+    if (!PinotFSFactory.isSchemeSupported(inputDirURIScheme)) {
+      String fsClass = batchConfigMap.get(BatchConfigProperties.INPUT_FS_CLASS);
+      PinotConfiguration fsProps = IngestionConfigUtils.getFsProps(batchConfigMap);
+      PinotFSFactory.register(inputDirURIScheme, fsClass, fsProps);
+    }
+    PinotFS inputDirFS = PinotFSFactory.create(inputDirURIScheme);
+
+    String includeFileNamePattern = batchConfigMap.get(BatchConfigProperties.INCLUDE_FILE_NAME_PATTERN);
+    String excludeFileNamePattern = batchConfigMap.get(BatchConfigProperties.EXCLUDE_FILE_NAME_PATTERN);
+
+    //Get list of files to process
+    String[] files;
+    try {
+      files = inputDirFS.listFiles(inputDirURI, true);
+    } catch (IOException e) {
+      LOGGER.error("Unable to list files under URI: " + inputDirURI, e);
+      return Collections.emptyList();
+    }
+    PathMatcher includeFilePathMatcher = null;
+    if (includeFileNamePattern != null) {
+      includeFilePathMatcher = FileSystems.getDefault().getPathMatcher(includeFileNamePattern);
+    }
+    PathMatcher excludeFilePathMatcher = null;
+    if (excludeFileNamePattern != null) {
+      excludeFilePathMatcher = FileSystems.getDefault().getPathMatcher(excludeFileNamePattern);
+    }
+    List<URI> inputFileURIs = new ArrayList<>();
+    for (String file : files) {
+      if (includeFilePathMatcher != null) {
+        if (!includeFilePathMatcher.matches(Paths.get(file))) {
+          continue;
+        }
+      }
+      if (excludeFilePathMatcher != null) {
+        if (excludeFilePathMatcher.matches(Paths.get(file))) {
+          continue;
+        }
+      }
+      try {
+        URI inputFileURI = new URI(file);
+        if (inputFileURI.getScheme() == null) {
+          inputFileURI = new File(file).toURI();
+        }
+        if (inputDirFS.isDirectory(inputFileURI) || existingSegmentInputFileURIs.contains(inputFileURI.toString())) {
+          continue;
+        }
+        inputFileURIs.add(inputFileURI);
+      } catch (Exception e) {
+        continue;
+      }
+    }
+    return inputFileURIs;
+  }
+
+  private Set<String> getExistingSegmentInputFiles(List<OfflineSegmentZKMetadata> offlineSegmentsMetadata) {
+    Set<String> existingSegmentInputFiles = new HashSet<>();
+    for (OfflineSegmentZKMetadata metadata : offlineSegmentsMetadata) {
+      if ((metadata.getCustomMap() != null) && metadata.getCustomMap()
+          .containsKey(BatchConfigProperties.INPUT_DATA_FILE_URI_KEY)) {
+        existingSegmentInputFiles.add(metadata.getCustomMap().get(BatchConfigProperties.INPUT_DATA_FILE_URI_KEY));
+      }
+    }
+    return existingSegmentInputFiles;
+  }
+
+  private URI getDirectoryUri(String uriStr)
+      throws URISyntaxException {
+    URI uri = new URI(uriStr);
+    if (uri.getScheme() == null) {
+      uri = new File(uriStr).toURI();
+    }
+    return uri;
+  }
+
+  public static URI getRelativeOutputPath(URI baseInputDir, URI inputFile, URI outputDir) {
+    URI relativePath = baseInputDir.relativize(inputFile);
+    Preconditions.checkState(relativePath.getPath().length() > 0 && !relativePath.equals(inputFile),
+        "Unable to extract out the relative path based on base input path: " + baseInputDir);
+    String outputDirStr = outputDir.toString();
+    outputDir = !outputDirStr.endsWith("/") ? URI.create(outputDirStr.concat("/")) : outputDir;
+    URI relativeOutputURI = outputDir.resolve(relativePath).resolve(".");
+    return relativeOutputURI;
+  }
+}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/TaskGeneratorRegistry.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/TaskGeneratorRegistry.java
index f112d8b..21070d3 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/TaskGeneratorRegistry.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/TaskGeneratorRegistry.java
@@ -36,6 +36,7 @@ public class TaskGeneratorRegistry {
   public TaskGeneratorRegistry(@Nonnull ClusterInfoAccessor clusterInfoAccessor) {
     registerTaskGenerator(new ConvertToRawIndexTaskGenerator(clusterInfoAccessor));
     registerTaskGenerator(new RealtimeToOfflineSegmentsTaskGenerator(clusterInfoAccessor));
+    registerTaskGenerator(new SegmentGenerationAndPushTaskGenerator(clusterInfoAccessor));
   }
 
   /**
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
index cd98833..546a9fb 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
@@ -88,4 +88,10 @@ public class MinionConstants {
     public static final String AGGREGATION_TYPE_KEY_SUFFIX = ".aggregationType";
     public static final String MAX_NUM_RECORDS_PER_SEGMENT_KEY = "maxNumRecordsPerSegment";
   }
+
+  // Generate segment and push to controller based on batch ingestion configs
+  public static class SegmentGenerationAndPushTask {
+    public static final String TASK_TYPE = "SegmentGenerationAndPushTask";
+  }
+
 }
diff --git a/pinot-minion/pom.xml b/pinot-minion/pom.xml
index cc3608f..f9a442c 100644
--- a/pinot-minion/pom.xml
+++ b/pinot-minion/pom.xml
@@ -81,6 +81,11 @@
       <artifactId>pinot-core</artifactId>
     </dependency>
     <dependency>
+      <groupId>org.apache.pinot</groupId>
+      <artifactId>pinot-batch-ingestion-common</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
       <groupId>org.apache.helix</groupId>
       <artifactId>helix-core</artifactId>
       <exclusions>
diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/SegmentGenerationAndPushResult.java b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/SegmentGenerationAndPushResult.java
new file mode 100644
index 0000000..d1cabcb
--- /dev/null
+++ b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/SegmentGenerationAndPushResult.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.minion.executor;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pinot.core.minion.PinotTaskConfig;
+
+
+/**
+ * The class <code>SegmentGenerationAndPushResult</code> wraps the segment generation and push
+ * results.
+ */
+public class SegmentGenerationAndPushResult {
+  private final boolean _succeed;
+  private final String _segmentName;
+  private final Exception _exception;
+  private final Map<String, Object> _customProperties;
+
+  private SegmentGenerationAndPushResult(boolean succeed, String segmentName, Exception exception,
+      Map<String, Object> customProperties) {
+    _succeed = succeed;
+    _segmentName = segmentName;
+    _exception = exception;
+    _customProperties = customProperties;
+  }
+
+  @SuppressWarnings("unchecked")
+  public <T> T getCustomProperty(String key) {
+    return (T) _customProperties.get(key);
+  }
+
+  public Exception getException() {
+    return _exception;
+  }
+
+  public boolean isSucceed() {
+    return _succeed;
+  }
+
+  public String getSegmentName() {
+    return _segmentName;
+  }
+
+  public static class Builder {
+    private boolean _succeed;
+    private String _segmentName;
+    private Exception _exception;
+    private final Map<String, Object> _customProperties = new HashMap<>();
+
+    public Builder setSucceed(boolean succeed) {
+      _succeed = succeed;
+      return this;
+    }
+
+    public void setSegmentName(String segmentName) {
+      _segmentName = segmentName;
+    }
+
+    public Builder setException(Exception exception) {
+      _exception = exception;
+      return this;
+    }
+
+    public Builder setCustomProperty(String key, Object property) {
+      _customProperties.put(key, property);
+      return this;
+    }
+
+    public SegmentGenerationAndPushResult build() {
+      return new SegmentGenerationAndPushResult(_succeed, _segmentName, _exception, _customProperties);
+    }
+  }
+}
diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/SegmentGenerationAndPushTaskExecutor.java b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/SegmentGenerationAndPushTaskExecutor.java
new file mode 100644
index 0000000..0edb584
--- /dev/null
+++ b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/SegmentGenerationAndPushTaskExecutor.java
@@ -0,0 +1,187 @@
+package org.apache.pinot.minion.executor;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.File;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.utils.TarGzCompressionUtils;
+import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationTaskRunner;
+import org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationUtils;
+import org.apache.pinot.plugin.ingestion.batch.common.SegmentPushUtils;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.filesystem.LocalPinotFS;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
+import org.apache.pinot.spi.ingestion.batch.spec.Constants;
+import org.apache.pinot.spi.ingestion.batch.spec.PinotClusterSpec;
+import org.apache.pinot.spi.ingestion.batch.spec.PushJobSpec;
+import org.apache.pinot.spi.ingestion.batch.spec.RecordReaderSpec;
+import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
+import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationTaskSpec;
+import org.apache.pinot.spi.ingestion.batch.spec.SegmentNameGeneratorSpec;
+import org.apache.pinot.spi.ingestion.batch.spec.TableSpec;
+import org.apache.pinot.spi.utils.DataSizeUtils;
+import org.apache.pinot.spi.utils.IngestionConfigUtils;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.retry.AttemptsExceededException;
+import org.apache.pinot.spi.utils.retry.RetriableOperationException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class SegmentGenerationAndPushTaskExecutor extends BaseTaskExecutor {
+  private static final Logger LOGGER = LoggerFactory.getLogger(SegmentGenerationAndPushTaskExecutor.class);
+
+  private static final PinotFS LOCAL_PINOT_FS = new LocalPinotFS();
+
+  @Override
+  public Object executeTask(PinotTaskConfig pinotTaskConfig)
+      throws Exception {
+    Map<String, String> taskConfigs = pinotTaskConfig.getConfigs();
+    SegmentGenerationAndPushResult.Builder resultBuilder = new SegmentGenerationAndPushResult.Builder();
+    File localTempDir = new File(FileUtils.getTempDirectory(), "pinot-" + UUID.randomUUID());
+    try {
+      // Generate Pinot Segment
+      SegmentGenerationTaskSpec taskSpec = new SegmentGenerationTaskSpec();
+      URI inputFileURI = URI.create(taskConfigs.get(BatchConfigProperties.INPUT_FILE_URI));
+      File localInputTempDir = new File(localTempDir, "input");
+      FileUtils.forceMkdir(localInputTempDir);
+      File localOutputTempDir = new File(localTempDir, "output");
+      FileUtils.forceMkdir(localOutputTempDir);
+      String inputFileURIScheme = inputFileURI.getScheme();
+      if (inputFileURIScheme == null) {
+        inputFileURIScheme = PinotFSFactory.LOCAL_PINOT_FS_SCHEME;
+      }
+      if (!PinotFSFactory.isSchemeSupported(inputFileURIScheme)) {
+        String fsClass = taskConfigs.get(BatchConfigProperties.INPUT_FS_CLASS);
+        PinotConfiguration fsProps = IngestionConfigUtils.getFsProps(taskConfigs);
+        PinotFSFactory.register(inputFileURIScheme, fsClass, fsProps);
+      }
+      PinotFS inputFileFS = PinotFSFactory.create(inputFileURIScheme);
+      URI outputSegmentDirURI = URI.create(taskConfigs.get(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI));
+      String outputURIScheme = outputSegmentDirURI.getScheme();
+      if (outputURIScheme == null) {
+        outputURIScheme = PinotFSFactory.LOCAL_PINOT_FS_SCHEME;
+      }
+      PinotFS outputFileFS = PinotFSFactory.create(outputURIScheme);
+      //copy input path to local
+      File localInputDataFile = new File(localInputTempDir, new File(inputFileURI.getPath()).getName());
+      inputFileFS.copyToLocalFile(inputFileURI, localInputDataFile);
+      taskSpec.setInputFilePath(localInputDataFile.getAbsolutePath());
+      taskSpec.setOutputDirectoryPath(localOutputTempDir.getAbsolutePath());
+
+      RecordReaderSpec recordReaderSpec = new RecordReaderSpec();
+      recordReaderSpec.setDataFormat(taskConfigs.get(BatchConfigProperties.INPUT_FORMAT));
+      recordReaderSpec.setClassName(taskConfigs.get(BatchConfigProperties.RECORD_READER_CLASS));
+      recordReaderSpec.setConfigClassName(taskConfigs.get(BatchConfigProperties.RECORD_READER_CONFIG_CLASS));
+      taskSpec.setRecordReaderSpec(recordReaderSpec);
+      Schema schema;
+      if (taskConfigs.containsKey(BatchConfigProperties.SCHEMA)) {
+        schema = JsonUtils
+            .stringToObject(JsonUtils.objectToString(taskConfigs.get(BatchConfigProperties.SCHEMA)), Schema.class);
+      } else if (taskConfigs.containsKey(BatchConfigProperties.SCHEMA_URI)) {
+        schema = SegmentGenerationUtils.getSchema(taskConfigs.get(BatchConfigProperties.SCHEMA_URI));
+      } else {
+        throw new RuntimeException(
+            "Missing schema for segment generation job: please set `schema` or `schemaURI` in task config.");
+      }
+      taskSpec.setSchema(schema);
+      JsonNode tableConfig = JsonUtils.stringToJsonNode(taskConfigs.get(BatchConfigProperties.TABLE_CONFIGS));
+      taskSpec.setTableConfig(tableConfig);
+      taskSpec.setSequenceId(Integer.parseInt(taskConfigs.get(BatchConfigProperties.SEQUENCE_ID)));
+      SegmentNameGeneratorSpec segmentNameGeneratorSpec = new SegmentNameGeneratorSpec();
+      segmentNameGeneratorSpec.setType(taskConfigs.get(BatchConfigProperties.SEGMENT_NAME_GENERATOR_TYPE));
+      segmentNameGeneratorSpec.setConfigs(IngestionConfigUtils
+          .getConfigMapWithPrefix(taskConfigs, BatchConfigProperties.SEGMENT_NAME_GENERATOR_CONFIGS));
+      taskSpec.setSegmentNameGeneratorSpec(segmentNameGeneratorSpec);
+      taskSpec.setCustomProperty(BatchConfigProperties.INPUT_DATA_FILE_URI_KEY, inputFileURI.toString());
+      SegmentGenerationTaskRunner taskRunner = new SegmentGenerationTaskRunner(taskSpec);
+      String segmentName = taskRunner.run();
+      // Tar segment directory to compress file
+      File localSegmentDir = new File(localOutputTempDir, segmentName);
+      String segmentTarFileName = segmentName + Constants.TAR_GZ_FILE_EXT;
+      File localSegmentTarFile = new File(localOutputTempDir, segmentTarFileName);
+      LOGGER.info("Tarring segment from: {} to: {}", localSegmentDir, localSegmentTarFile);
+      TarGzCompressionUtils.createTarGzFile(localSegmentDir, localSegmentTarFile);
+      long uncompressedSegmentSize = FileUtils.sizeOf(localSegmentDir);
+      long compressedSegmentSize = FileUtils.sizeOf(localSegmentTarFile);
+      LOGGER.info("Size for segment: {}, uncompressed: {}, compressed: {}", segmentName,
+          DataSizeUtils.fromBytes(uncompressedSegmentSize), DataSizeUtils.fromBytes(compressedSegmentSize));
+      //move segment to output PinotFS
+      URI outputSegmentTarURI = URI.create(outputSegmentDirURI + segmentTarFileName);
+      if (!Boolean.parseBoolean(taskConfigs.get(BatchConfigProperties.OVERWRITE_OUTPUT)) && outputFileFS
+          .exists(outputSegmentDirURI)) {
+        LOGGER.warn("Not overwrite existing output segment tar file: {}", outputFileFS.exists(outputSegmentDirURI));
+      } else {
+        outputFileFS.copyFromLocalFile(localSegmentTarFile, outputSegmentTarURI);
+      }
+      resultBuilder.setSegmentName(segmentName);
+      // Segment push task
+      //Get list of files to process
+      String pushMode = taskConfigs.get(BatchConfigProperties.PUSH_MODE);
+      PushJobSpec pushJobSpec = new PushJobSpec();
+      pushJobSpec.setPushAttempts(5);
+      pushJobSpec.setPushParallelism(1);
+      pushJobSpec.setPushRetryIntervalMillis(1000);
+      pushJobSpec.setSegmentUriPrefix(taskConfigs.get(BatchConfigProperties.PUSH_SEGMENT_URI_PREFIX));
+      pushJobSpec.setSegmentUriSuffix(taskConfigs.get(BatchConfigProperties.PUSH_SEGMENT_URI_SUFFIX));
+      SegmentGenerationJobSpec spec = new SegmentGenerationJobSpec();
+      spec.setPushJobSpec(pushJobSpec);
+      TableSpec tableSpec = new TableSpec();
+      tableSpec.setTableName(tableConfig.get(BatchConfigProperties.TABLE_NAME).asText());
+      spec.setTableSpec(tableSpec);
+      PinotClusterSpec pinotClusterSpec = new PinotClusterSpec();
+      pinotClusterSpec.setControllerURI(taskConfigs.get(BatchConfigProperties.PUSH_CONTROLLER_URI));
+      PinotClusterSpec[] pinotClusterSpecs = new PinotClusterSpec[]{pinotClusterSpec};
+      spec.setPinotClusterSpecs(pinotClusterSpecs);
+      switch (BatchConfigProperties.SegmentPushType.valueOf(pushMode.toUpperCase())) {
+        case TAR:
+          try {
+            SegmentPushUtils.pushSegments(spec, LOCAL_PINOT_FS, Arrays.asList(outputSegmentTarURI.toString()));
+          } catch (RetriableOperationException | AttemptsExceededException e) {
+            throw new RuntimeException(e);
+          }
+          break;
+        case URI:
+          try {
+            List<String> segmentUris = new ArrayList<>();
+            URI updatedURI = SegmentPushUtils
+                .generateSegmentTarURI(outputSegmentDirURI, outputSegmentTarURI, pushJobSpec.getSegmentUriPrefix(),
+                    pushJobSpec.getSegmentUriSuffix());
+            segmentUris.add(updatedURI.toString());
+            SegmentPushUtils.sendSegmentUris(spec, segmentUris);
+          } catch (RetriableOperationException | AttemptsExceededException e) {
+            throw new RuntimeException(e);
+          }
+          break;
+        case METADATA:
+          try {
+            Map<String, String> segmentUriToTarPathMap = SegmentPushUtils
+                .getSegmentUriToTarPathMap(outputSegmentDirURI, pushJobSpec.getSegmentUriPrefix(),
+                    pushJobSpec.getSegmentUriSuffix(), new String[]{outputSegmentTarURI.toString()});
+            SegmentPushUtils.sendSegmentUriAndMetadata(spec, outputFileFS, segmentUriToTarPathMap);
+          } catch (RetriableOperationException | AttemptsExceededException e) {
+            throw new RuntimeException(e);
+          }
+          break;
+        default:
+          throw new Exception("Unrecognized push mode - " + pushMode);
+      }
+      resultBuilder.setSucceed(true);
+    } catch (Exception e) {
+      resultBuilder.setException(e);
+    } finally {
+      // Cleanup output dir
+      FileUtils.deleteQuietly(localTempDir);
+    }
+    return resultBuilder.build();
+  }
+}
diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/SegmentGenerationAndPushTaskExecutorFactory.java b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/SegmentGenerationAndPushTaskExecutorFactory.java
new file mode 100644
index 0000000..e4b6447
--- /dev/null
+++ b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/SegmentGenerationAndPushTaskExecutorFactory.java
@@ -0,0 +1,8 @@
+package org.apache.pinot.minion.executor;
+
+public class SegmentGenerationAndPushTaskExecutorFactory implements PinotTaskExecutorFactory {
+  @Override
+  public PinotTaskExecutor create() {
+    return new SegmentGenerationAndPushTaskExecutor();
+  }
+}
diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/TaskExecutorFactoryRegistry.java b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/TaskExecutorFactoryRegistry.java
index 1b783dc..a86a39b 100644
--- a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/TaskExecutorFactoryRegistry.java
+++ b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/TaskExecutorFactoryRegistry.java
@@ -38,6 +38,7 @@ public class TaskExecutorFactoryRegistry {
     registerTaskExecutorFactory(MinionConstants.MergeRollupTask.TASK_TYPE, new MergeRollupTaskExecutorFactory());
     registerTaskExecutorFactory(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE,
         new RealtimeToOfflineSegmentsTaskExecutorFactory(minionTaskZkMetadataManager));
+    registerTaskExecutorFactory(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE, new SegmentGenerationAndPushTaskExecutorFactory());
   }
 
   /**
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/BatchQuickstartWithMinion.java b/pinot-tools/src/main/java/org/apache/pinot/tools/BatchQuickstartWithMinion.java
new file mode 100644
index 0000000..9dc11b6
--- /dev/null
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/BatchQuickstartWithMinion.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tools;
+
+import org.apache.pinot.spi.plugin.PluginManager;
+
+
+public class BatchQuickstartWithMinion extends Quickstart {
+
+  public String getBootstrapDataDir() {
+    return "examples/minions/batch/baseballStats";
+  }
+
+  public static void main(String[] args)
+      throws Exception {
+    PluginManager.get().init();
+    new BatchQuickstartWithMinion().execute();
+  }
+}
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/BootstrapTableTool.java b/pinot-tools/src/main/java/org/apache/pinot/tools/BootstrapTableTool.java
index 0723b06..fa70e8a 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/BootstrapTableTool.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/BootstrapTableTool.java
@@ -21,12 +21,24 @@ package org.apache.pinot.tools;
 import com.google.common.base.Preconditions;
 import java.io.BufferedReader;
 import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
 import java.io.FileReader;
 import java.io.Reader;
+import java.net.URI;
 import java.net.URL;
+import java.util.List;
+import java.util.Map;
 import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.minion.MinionClient;
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.ingestion.batch.BatchConfig;
+import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
 import org.apache.pinot.spi.ingestion.batch.IngestionJobLauncher;
 import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.apache.pinot.tools.admin.command.AddTableCommand;
 import org.apache.pinot.tools.utils.JarUtils;
 import org.slf4j.Logger;
@@ -36,9 +48,11 @@ import org.yaml.snakeyaml.Yaml;
 
 public class BootstrapTableTool {
   private static final Logger LOGGER = LoggerFactory.getLogger(BootstrapTableTool.class);
+  private static final String COMPLETED = "COMPLETED";
   private final String _controllerHost;
   private final int _controllerPort;
   private final String _tableDir;
+  private final MinionClient _minionClient;
 
   public BootstrapTableTool(String controllerHost, int controllerPort, String tableDir) {
     Preconditions.checkNotNull(controllerHost);
@@ -46,11 +60,13 @@ public class BootstrapTableTool {
     _controllerHost = controllerHost;
     _controllerPort = controllerPort;
     _tableDir = tableDir;
+    _minionClient = new MinionClient(controllerHost, String.valueOf(controllerPort));
   }
 
   public boolean execute()
       throws Exception {
     File setupTableTmpDir = new File(FileUtils.getTempDirectory(), String.valueOf(System.currentTimeMillis()));
+    setupTableTmpDir.mkdirs();
 
     File tableDir = new File(_tableDir);
     String tableName = tableDir.getName();
@@ -95,41 +111,121 @@ public class BootstrapTableTool {
         .setTableConfigFile(tableConfigFile.getAbsolutePath()).setControllerHost(_controllerHost)
         .setControllerPort(String.valueOf(_controllerPort)).setExecute(true).execute();
   }
+
   private boolean bootstrapOfflineTable(File setupTableTmpDir, String tableName, File schemaFile,
       File offlineTableConfigFile, File ingestionJobSpecFile)
       throws Exception {
-    LOGGER.info("Adding offline table: {}", tableName);
-    boolean tableCreationResult = createTable(schemaFile, offlineTableConfigFile);
+    TableConfig tableConfig =
+        JsonUtils.inputStreamToObject(new FileInputStream(offlineTableConfigFile), TableConfig.class);
+    if (tableConfig.getIngestionConfig() != null
+        && tableConfig.getIngestionConfig().getBatchIngestionConfig() != null) {
+      updatedTableConfig(tableConfig, tableName, setupTableTmpDir);
+    }
 
+    LOGGER.info("Adding offline table: {}", tableName);
+    File updatedTableConfigFile =
+        new File(setupTableTmpDir, String.format("%s_%d.config", tableName, System.currentTimeMillis()));
+    FileOutputStream outputStream = new FileOutputStream(updatedTableConfigFile);
+    outputStream.write(JsonUtils.objectToPrettyString(tableConfig).getBytes());
+    outputStream.close();
+    boolean tableCreationResult = createTable(schemaFile, updatedTableConfigFile);
     if (!tableCreationResult) {
       throw new RuntimeException(String
           .format("Unable to create offline table - %s from schema file [%s] and table conf file [%s].", tableName,
               schemaFile, offlineTableConfigFile));
     }
+    if (tableConfig.getTaskConfig() != null) {
+      _minionClient.scheduleMinionTasks();
+      waitForMinionTaskToFinish(30_000L);
+    }
+    if (ingestionJobSpecFile != null) {
+      if (ingestionJobSpecFile.exists()) {
+        LOGGER.info("Launch data ingestion job to build index segment for table {} and push to controller [{}:{}]",
+            tableName, _controllerHost, _controllerPort);
+        try (Reader reader = new BufferedReader(new FileReader(ingestionJobSpecFile.getAbsolutePath()))) {
+          SegmentGenerationJobSpec spec = new Yaml().loadAs(reader, SegmentGenerationJobSpec.class);
+          String inputDirURI = spec.getInputDirURI();
+          if (!new File(inputDirURI).exists()) {
+            URL resolvedInputDirURI = BootstrapTableTool.class.getClassLoader().getResource(inputDirURI);
+            if (resolvedInputDirURI.getProtocol().equals("jar")) {
+              String[] splits = resolvedInputDirURI.getFile().split("!");
+              String inputDir = new File(setupTableTmpDir, "inputData").toString();
+              JarUtils.copyResourcesToDirectory(splits[0], splits[1].substring(1), inputDir);
+              spec.setInputDirURI(inputDir);
+            } else {
+              spec.setInputDirURI(resolvedInputDirURI.toString());
+            }
+          }
+          IngestionJobLauncher.runIngestionJob(spec);
+        }
+      } else {
+        LOGGER.info("Not found ingestionJobSpec.yaml at location [{}], skipping data ingestion",
+            ingestionJobSpecFile.getAbsolutePath());
+      }
+    }
+    return true;
+  }
 
-    if (ingestionJobSpecFile.exists()) {
-      LOGGER.info("Launch data ingestion job to build index segment for table {} and push to controller [{}:{}]",
-          tableName, _controllerHost, _controllerPort);
-      try (Reader reader = new BufferedReader(new FileReader(ingestionJobSpecFile.getAbsolutePath()))) {
-        SegmentGenerationJobSpec spec = new Yaml().loadAs(reader, SegmentGenerationJobSpec.class);
-        String inputDirURI = spec.getInputDirURI();
-        if (!new File(inputDirURI).exists()) {
-          URL resolvedInputDirURI = BootstrapTableTool.class.getClassLoader().getResource(inputDirURI);
-          if (resolvedInputDirURI.getProtocol().equals("jar")) {
+  private void updatedTableConfig(TableConfig tableConfig, String tableName, File setupTableTmpDir)
+      throws Exception {
+    final List<Map<String, String>> batchConfigsMaps =
+        tableConfig.getIngestionConfig().getBatchIngestionConfig().getBatchConfigMaps();
+    for (Map<String, String> batchConfigsMap : batchConfigsMaps) {
+      BatchConfig batchConfig = new BatchConfig(TableNameBuilder.OFFLINE.tableNameWithType(tableName), batchConfigsMap);
+      String inputDirURI = batchConfig.getInputDirURI();
+      if (!new File(inputDirURI).exists()) {
+        URL resolvedInputDirURI = BootstrapTableTool.class.getClassLoader().getResource(inputDirURI);
+        if (resolvedInputDirURI != null) {
+          if ("jar".equals(resolvedInputDirURI.getProtocol())) {
             String[] splits = resolvedInputDirURI.getFile().split("!");
-            String inputDir = new File(setupTableTmpDir, "inputData").toString();
-            JarUtils.copyResourcesToDirectory(splits[0], splits[1].substring(1), inputDir);
-            spec.setInputDirURI(inputDir);
+            File inputDir = new File(setupTableTmpDir, "inputData");
+            JarUtils.copyResourcesToDirectory(splits[0], splits[1].substring(1), inputDir.toString());
+            batchConfigsMap.put(BatchConfigProperties.INPUT_DIR_URI, inputDir.toURI().toString());
+            batchConfigsMap.put(BatchConfigProperties.OUTPUT_DIR_URI,
+                new File(inputDir.getParent(), "segments").toURI().toString());
           } else {
-            spec.setInputDirURI(resolvedInputDirURI.toString());
+            final URI inputURI = resolvedInputDirURI.toURI();
+            batchConfigsMap.put(BatchConfigProperties.INPUT_DIR_URI, inputURI.toString());
+            URI outputURI =
+                inputURI.getPath().endsWith("/") ? inputURI.resolve("../segments") : inputURI.resolve("./segments");
+            batchConfigsMap.put(BatchConfigProperties.OUTPUT_DIR_URI, outputURI.toString());
           }
         }
-        IngestionJobLauncher.runIngestionJob(spec);
       }
-    } else {
-      LOGGER.info("Not found ingestionJobSpec.yaml at location [{}], skipping data ingestion",
-          ingestionJobSpecFile.getAbsolutePath());
     }
-    return true;
+  }
+
+  private boolean waitForMinionTaskToFinish(long timeoutInMillis) {
+    long startTime = System.currentTimeMillis();
+    while (System.currentTimeMillis() - startTime < timeoutInMillis) {
+      try {
+        Thread.sleep(500L);
+      } catch (InterruptedException e) {
+        // Swallow the exception
+      }
+      try {
+        final Map<String, String> taskStatesMap =
+            _minionClient.getTasksStates(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE);
+        if (taskStatesMap.isEmpty()) {
+          LOGGER.info("No scheduled tasks yet, sleep 500 millis seconds");
+          continue;
+        }
+        boolean allCompleted = true;
+        for (String taskId : taskStatesMap.keySet()) {
+          if (!COMPLETED.equalsIgnoreCase(taskStatesMap.get(taskId))) {
+            allCompleted = false;
+            break;
+          }
+        }
+        if (allCompleted) {
+          LOGGER.info("All minion tasks are completed.");
+          return true;
+        }
+      } catch (Exception e) {
+        LOGGER.error("Failed to query task endpoint", e);
+        continue;
+      }
+    }
+    return false;
   }
 }
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/Quickstart.java b/pinot-tools/src/main/java/org/apache/pinot/tools/Quickstart.java
index 69f5941..475a8ec 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/Quickstart.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/Quickstart.java
@@ -43,6 +43,10 @@ public class Quickstart {
     }
   }
 
+  public String getBootstrapDataDir() {
+    return "examples/batch/baseballStats";
+  }
+
   public static void printStatus(Color color, String message) {
     System.out.println(color._code + message + Color.RESET._code);
   }
@@ -144,16 +148,17 @@ public class Quickstart {
     File dataFile = new File(dataDir, "baseballStats_data.csv");
 
     ClassLoader classLoader = Quickstart.class.getClassLoader();
-    URL resource = classLoader.getResource("examples/batch/baseballStats/baseballStats_schema.json");
+    URL resource = classLoader.getResource(getBootstrapDataDir() + "/baseballStats_schema.json");
     com.google.common.base.Preconditions.checkNotNull(resource);
     FileUtils.copyURLToFile(resource, schemaFile);
-    resource = classLoader.getResource("examples/batch/baseballStats/rawdata/baseballStats_data.csv");
+    resource = classLoader.getResource(getBootstrapDataDir() + "/rawdata/baseballStats_data.csv");
     com.google.common.base.Preconditions.checkNotNull(resource);
     FileUtils.copyURLToFile(resource, dataFile);
-    resource = classLoader.getResource("examples/batch/baseballStats/ingestionJobSpec.yaml");
-    com.google.common.base.Preconditions.checkNotNull(resource);
-    FileUtils.copyURLToFile(resource, ingestionJobSpecFile);
-    resource = classLoader.getResource("examples/batch/baseballStats/baseballStats_offline_table_config.json");
+    resource = classLoader.getResource(getBootstrapDataDir() + "/ingestionJobSpec.yaml");
+    if (resource != null) {
+      FileUtils.copyURLToFile(resource, ingestionJobSpecFile);
+    }
+    resource = classLoader.getResource(getBootstrapDataDir() + "/baseballStats_offline_table_config.json");
     com.google.common.base.Preconditions.checkNotNull(resource);
     FileUtils.copyURLToFile(resource, tableConfigFile);
 
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java
index e2e3e38..072effd 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java
@@ -53,6 +53,8 @@ public class QuickstartRunner {
   private static final int DEFAULT_SERVER_ADMIN_API_PORT = 7500;
   private static final int DEFAULT_BROKER_PORT = 8000;
   private static final int DEFAULT_CONTROLLER_PORT = 9000;
+  private static final int DEFAULT_MINION_PORT = 6000;
+
 
   private static final String DEFAULT_ZK_DIR = "PinotZkDir";
   private static final String DEFAULT_CONTROLLER_DIR = "PinotControllerDir";
@@ -63,6 +65,7 @@ public class QuickstartRunner {
   private final int _numServers;
   private final int _numBrokers;
   private final int _numControllers;
+  private final int _numMinions;
   private final File _tempDir;
   private final boolean _enableTenantIsolation;
 
@@ -74,10 +77,17 @@ public class QuickstartRunner {
   public QuickstartRunner(List<QuickstartTableRequest> tableRequests, int numServers, int numBrokers,
       int numControllers, File tempDir, boolean enableIsolation)
       throws Exception {
+    this(tableRequests, numServers, numBrokers, numControllers, 1, tempDir, enableIsolation);
+  }
+
+  public QuickstartRunner(List<QuickstartTableRequest> tableRequests, int numServers, int numBrokers,
+      int numControllers, int numMinions, File tempDir, boolean enableIsolation)
+      throws Exception {
     _tableRequests = tableRequests;
     _numServers = numServers;
     _numBrokers = numBrokers;
     _numControllers = numControllers;
+    _numMinions = numMinions;
     _tempDir = tempDir;
     _enableTenantIsolation = enableIsolation;
     clean();
@@ -131,6 +141,16 @@ public class QuickstartRunner {
     }
   }
 
+  private void startMinions()
+      throws Exception {
+    for (int i = 0; i < _numMinions; i++) {
+      StartMinionCommand minionStarter = new StartMinionCommand();
+      minionStarter.setMinionPort(DEFAULT_MINION_PORT + i)
+          .setZkAddress(ZK_ADDRESS).setClusterName(CLUSTER_NAME);
+      minionStarter.execute();
+    }
+  }
+
   private void clean()
       throws Exception {
     FileUtils.cleanDirectory(_tempDir);
@@ -142,6 +162,7 @@ public class QuickstartRunner {
     startControllers();
     startBrokers();
     startServers();
+    startMinions();
   }
 
   public void stop()
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartMinionCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartMinionCommand.java
index 4ba95bc..74cb81f 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartMinionCommand.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartMinionCommand.java
@@ -110,4 +110,24 @@ public class StartMinionCommand extends AbstractBaseAdminCommand implements Comm
     }
     return PinotConfigUtils.generateMinionConf(_minionHost, _minionPort);
   }
+
+  public StartMinionCommand setMinionHost(String minionHost) {
+    _minionHost = minionHost;
+    return this;
+  }
+
+  public StartMinionCommand setMinionPort(int minionPort) {
+    _minionPort = minionPort;
+    return this;
+  }
+
+  public StartMinionCommand setZkAddress(String zkAddress) {
+    _zkAddress = zkAddress;
+    return this;
+  }
+
+  public StartMinionCommand setClusterName(String clusterName) {
+    _clusterName = clusterName;
+    return this;
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org