You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2020/12/09 08:34:03 UTC
[incubator-pinot] 02/02: Adding pinot minion segment creation task
This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch adding_pinot_minion_segment_creation_tasks_2
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit f7c9ddb50d9e24cd0f8487162833c478f37a6329
Author: Xiang Fu <fx...@gmail.com>
AuthorDate: Wed Dec 9 00:33:26 2020 -0800
Adding pinot minion segment creation task
---
.../SegmentGenerationAndPushTaskGenerator.java | 241 +++++++++++++++++++++
.../minion/generator/TaskGeneratorRegistry.java | 1 +
.../apache/pinot/core/common/MinionConstants.java | 6 +
pinot-minion/pom.xml | 5 +
.../executor/SegmentGenerationAndPushResult.java | 91 ++++++++
.../SegmentGenerationAndPushTaskExecutor.java | 187 ++++++++++++++++
...egmentGenerationAndPushTaskExecutorFactory.java | 8 +
.../executor/TaskExecutorFactoryRegistry.java | 1 +
.../pinot/tools/BatchQuickstartWithMinion.java | 35 +++
.../org/apache/pinot/tools/BootstrapTableTool.java | 136 ++++++++++--
.../java/org/apache/pinot/tools/Quickstart.java | 17 +-
.../tools/admin/command/QuickstartRunner.java | 21 ++
.../tools/admin/command/StartMinionCommand.java | 20 ++
13 files changed, 743 insertions(+), 26 deletions(-)
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/SegmentGenerationAndPushTaskGenerator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/SegmentGenerationAndPushTaskGenerator.java
new file mode 100644
index 0000000..0d05472
--- /dev/null
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/SegmentGenerationAndPushTaskGenerator.java
@@ -0,0 +1,241 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.minion.generator;
+
+import com.google.common.base.Preconditions;
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.FileSystems;
+import java.nio.file.PathMatcher;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
+import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableTaskConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
+import org.apache.pinot.spi.utils.IngestionConfigUtils;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class SegmentGenerationAndPushTaskGenerator implements PinotTaskGenerator {
+ private static final Logger LOGGER = LoggerFactory.getLogger(SegmentGenerationAndPushTaskGenerator.class);
+
+ private final ClusterInfoAccessor _clusterInfoAccessor;
+
+ public SegmentGenerationAndPushTaskGenerator(ClusterInfoAccessor clusterInfoAccessor) {
+ _clusterInfoAccessor = clusterInfoAccessor;
+ }
+
+ @Override
+ public String getTaskType() {
+ return MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE;
+ }
+
+ @Override
+ public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
+ List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>();
+
+ for (TableConfig tableConfig : tableConfigs) {
+ // Only generate tasks for OFFLINE tables
+ String offlineTableName = tableConfig.getTableName();
+ if (tableConfig.getTableType() != TableType.OFFLINE) {
+ LOGGER.warn("Skip generating SegmentGenerationAndPushTask for non-OFFLINE table: {}", offlineTableName);
+ continue;
+ }
+
+ TableTaskConfig tableTaskConfig = tableConfig.getTaskConfig();
+ Preconditions.checkNotNull(tableTaskConfig);
+ Map<String, String> taskConfigs =
+ tableTaskConfig.getConfigsForTaskType(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE);
+ Preconditions.checkNotNull(taskConfigs, "Task config shouldn't be null for Table: {}", offlineTableName);
+
+ // Get max number of tasks for this table
+ int tableMaxNumTasks;
+ String tableMaxNumTasksConfig = taskConfigs.get(MinionConstants.TABLE_MAX_NUM_TASKS_KEY);
+ if (tableMaxNumTasksConfig != null) {
+ try {
+ tableMaxNumTasks = Integer.parseInt(tableMaxNumTasksConfig);
+ } catch (Exception e) {
+ tableMaxNumTasks = Integer.MAX_VALUE;
+ }
+ } else {
+ tableMaxNumTasks = Integer.MAX_VALUE;
+ }
+
+ // Generate tasks
+ int tableNumTasks = 0;
+ // Generate up to tableMaxNumTasks tasks each time for each table
+ if (tableNumTasks == tableMaxNumTasks) {
+ break;
+ }
+ String batchSegmentIngestionType = IngestionConfigUtils.getBatchSegmentIngestionType(tableConfig);
+ String batchSegmentIngestionFrequency = IngestionConfigUtils.getBatchSegmentIngestionFrequency(tableConfig);
+ BatchIngestionConfig batchIngestionConfig = tableConfig.getIngestionConfig().getBatchIngestionConfig();
+ List<Map<String, String>> batchConfigMaps = batchIngestionConfig.getBatchConfigMaps();
+ for (Map<String, String> batchConfigMap : batchConfigMaps) {
+ try {
+ URI inputDirURI = getDirectoryUri(batchConfigMap.get(BatchConfigProperties.INPUT_DIR_URI));
+ URI outputDirURI = getDirectoryUri(batchConfigMap.get(BatchConfigProperties.OUTPUT_DIR_URI));
+
+ List<OfflineSegmentZKMetadata> offlineSegmentsMetadata = Collections.emptyList();
+ // For append mode, we don't create segments for input file URIs already created.
+ if (BatchConfigProperties.SegmentIngestionType.APPEND.name().equalsIgnoreCase(batchSegmentIngestionType)) {
+ offlineSegmentsMetadata = this._clusterInfoAccessor.getOfflineSegmentsMetadata(offlineTableName);
+ }
+ List<URI> inputFileURIs = getInputFilesFromDirectory(batchConfigMap, inputDirURI,
+ getExistingSegmentInputFiles(offlineSegmentsMetadata));
+
+ String pushMode = IngestionConfigUtils.getPushMode(batchConfigMap);
+ for (URI inputFileURI : inputFileURIs) {
+ Map<String, String> singleFileGenerationTaskConfig = new HashMap<>(batchConfigMap);
+ singleFileGenerationTaskConfig.put(BatchConfigProperties.INPUT_FILE_URI, inputFileURI.toString());
+ URI outputSegmentDirURI = getRelativeOutputPath(inputDirURI, inputFileURI, outputDirURI);
+ singleFileGenerationTaskConfig.put(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI, outputSegmentDirURI.toString());
+ singleFileGenerationTaskConfig
+ .put(BatchConfigProperties.SCHEMA, JsonUtils.objectToString(_clusterInfoAccessor.getTableSchema(offlineTableName)));
+ singleFileGenerationTaskConfig
+ .put(BatchConfigProperties.TABLE_CONFIGS, JsonUtils.objectToString(_clusterInfoAccessor.getTableConfig(offlineTableName)));
+ singleFileGenerationTaskConfig.put(BatchConfigProperties.SEQUENCE_ID, String.valueOf(tableNumTasks));
+ singleFileGenerationTaskConfig.put(BatchConfigProperties.SEGMENT_NAME_GENERATOR_TYPE, BatchConfigProperties.SegmentNameGeneratorType.SIMPLE);
+ singleFileGenerationTaskConfig.put(BatchConfigProperties.PUSH_MODE, pushMode);
+ singleFileGenerationTaskConfig.put(BatchConfigProperties.PUSH_CONTROLLER_URI, _clusterInfoAccessor.getVipUrl());
+ // Only submit raw data files with timestamp larger than checkpoint
+ pinotTaskConfigs.add(new PinotTaskConfig(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE,
+ singleFileGenerationTaskConfig));
+ tableNumTasks++;
+
+ // Generate up to tableMaxNumTasks tasks each time for each table
+ if (tableNumTasks == tableMaxNumTasks) {
+ break;
+ }
+ }
+ } catch (Exception e) {
+ LOGGER.error("Unable to generate the SegmentGenerationAndPush task. [ table configs: {}, task configs: {} ]",
+ tableConfig, taskConfigs, e);
+ }
+ }
+ }
+ return pinotTaskConfigs;
+ }
+
+ private List<URI> getInputFilesFromDirectory(Map<String, String> batchConfigMap, URI inputDirURI,
+ Set<String> existingSegmentInputFileURIs) {
+ String inputDirURIScheme = inputDirURI.getScheme();
+ if (!PinotFSFactory.isSchemeSupported(inputDirURIScheme)) {
+ String fsClass = batchConfigMap.get(BatchConfigProperties.INPUT_FS_CLASS);
+ PinotConfiguration fsProps = IngestionConfigUtils.getFsProps(batchConfigMap);
+ PinotFSFactory.register(inputDirURIScheme, fsClass, fsProps);
+ }
+ PinotFS inputDirFS = PinotFSFactory.create(inputDirURIScheme);
+
+ String includeFileNamePattern = batchConfigMap.get(BatchConfigProperties.INCLUDE_FILE_NAME_PATTERN);
+ String excludeFileNamePattern = batchConfigMap.get(BatchConfigProperties.EXCLUDE_FILE_NAME_PATTERN);
+
+ //Get list of files to process
+ String[] files;
+ try {
+ files = inputDirFS.listFiles(inputDirURI, true);
+ } catch (IOException e) {
+ LOGGER.error("Unable to list files under URI: " + inputDirURI, e);
+ return Collections.emptyList();
+ }
+ PathMatcher includeFilePathMatcher = null;
+ if (includeFileNamePattern != null) {
+ includeFilePathMatcher = FileSystems.getDefault().getPathMatcher(includeFileNamePattern);
+ }
+ PathMatcher excludeFilePathMatcher = null;
+ if (excludeFileNamePattern != null) {
+ excludeFilePathMatcher = FileSystems.getDefault().getPathMatcher(excludeFileNamePattern);
+ }
+ List<URI> inputFileURIs = new ArrayList<>();
+ for (String file : files) {
+ if (includeFilePathMatcher != null) {
+ if (!includeFilePathMatcher.matches(Paths.get(file))) {
+ continue;
+ }
+ }
+ if (excludeFilePathMatcher != null) {
+ if (excludeFilePathMatcher.matches(Paths.get(file))) {
+ continue;
+ }
+ }
+ try {
+ URI inputFileURI = new URI(file);
+ if (inputFileURI.getScheme() == null) {
+ inputFileURI = new File(file).toURI();
+ }
+ if (inputDirFS.isDirectory(inputFileURI) || existingSegmentInputFileURIs.contains(inputFileURI.toString())) {
+ continue;
+ }
+ inputFileURIs.add(inputFileURI);
+ } catch (Exception e) {
+ continue;
+ }
+ }
+ return inputFileURIs;
+ }
+
+ private Set<String> getExistingSegmentInputFiles(List<OfflineSegmentZKMetadata> offlineSegmentsMetadata) {
+ Set<String> existingSegmentInputFiles = new HashSet<>();
+ for (OfflineSegmentZKMetadata metadata : offlineSegmentsMetadata) {
+ if ((metadata.getCustomMap() != null) && metadata.getCustomMap()
+ .containsKey(BatchConfigProperties.INPUT_DATA_FILE_URI_KEY)) {
+ existingSegmentInputFiles.add(metadata.getCustomMap().get(BatchConfigProperties.INPUT_DATA_FILE_URI_KEY));
+ }
+ }
+ return existingSegmentInputFiles;
+ }
+
+ private URI getDirectoryUri(String uriStr)
+ throws URISyntaxException {
+ URI uri = new URI(uriStr);
+ if (uri.getScheme() == null) {
+ uri = new File(uriStr).toURI();
+ }
+ return uri;
+ }
+
+ public static URI getRelativeOutputPath(URI baseInputDir, URI inputFile, URI outputDir) {
+ URI relativePath = baseInputDir.relativize(inputFile);
+ Preconditions.checkState(relativePath.getPath().length() > 0 && !relativePath.equals(inputFile),
+ "Unable to extract out the relative path based on base input path: " + baseInputDir);
+ String outputDirStr = outputDir.toString();
+ outputDir = !outputDirStr.endsWith("/") ? URI.create(outputDirStr.concat("/")) : outputDir;
+ URI relativeOutputURI = outputDir.resolve(relativePath).resolve(".");
+ return relativeOutputURI;
+ }
+}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/TaskGeneratorRegistry.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/TaskGeneratorRegistry.java
index f112d8b..21070d3 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/TaskGeneratorRegistry.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/TaskGeneratorRegistry.java
@@ -36,6 +36,7 @@ public class TaskGeneratorRegistry {
public TaskGeneratorRegistry(@Nonnull ClusterInfoAccessor clusterInfoAccessor) {
registerTaskGenerator(new ConvertToRawIndexTaskGenerator(clusterInfoAccessor));
registerTaskGenerator(new RealtimeToOfflineSegmentsTaskGenerator(clusterInfoAccessor));
+ registerTaskGenerator(new SegmentGenerationAndPushTaskGenerator(clusterInfoAccessor));
}
/**
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
index cd98833..546a9fb 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
@@ -88,4 +88,10 @@ public class MinionConstants {
public static final String AGGREGATION_TYPE_KEY_SUFFIX = ".aggregationType";
public static final String MAX_NUM_RECORDS_PER_SEGMENT_KEY = "maxNumRecordsPerSegment";
}
+
+ // Generate segment and push to controller based on batch ingestion configs
+ public static class SegmentGenerationAndPushTask {
+ public static final String TASK_TYPE = "SegmentGenerationAndPushTask";
+ }
+
}
diff --git a/pinot-minion/pom.xml b/pinot-minion/pom.xml
index cc3608f..f9a442c 100644
--- a/pinot-minion/pom.xml
+++ b/pinot-minion/pom.xml
@@ -81,6 +81,11 @@
<artifactId>pinot-core</artifactId>
</dependency>
<dependency>
+ <groupId>org.apache.pinot</groupId>
+ <artifactId>pinot-batch-ingestion-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.helix</groupId>
<artifactId>helix-core</artifactId>
<exclusions>
diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/SegmentGenerationAndPushResult.java b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/SegmentGenerationAndPushResult.java
new file mode 100644
index 0000000..d1cabcb
--- /dev/null
+++ b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/SegmentGenerationAndPushResult.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.minion.executor;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pinot.core.minion.PinotTaskConfig;
+
+
+/**
+ * The class <code>SegmentGenerationAndPushResult</code> wraps the segment generation and push
+ * results.
+ */
+public class SegmentGenerationAndPushResult {
+ private final boolean _succeed;
+ private final String _segmentName;
+ private final Exception _exception;
+ private final Map<String, Object> _customProperties;
+
+ private SegmentGenerationAndPushResult(boolean succeed, String segmentName, Exception exception,
+ Map<String, Object> customProperties) {
+ _succeed = succeed;
+ _segmentName = segmentName;
+ _exception = exception;
+ _customProperties = customProperties;
+ }
+
+ @SuppressWarnings("unchecked")
+ public <T> T getCustomProperty(String key) {
+ return (T) _customProperties.get(key);
+ }
+
+ public Exception getException() {
+ return _exception;
+ }
+
+ public boolean isSucceed() {
+ return _succeed;
+ }
+
+ public String getSegmentName() {
+ return _segmentName;
+ }
+
+ public static class Builder {
+ private boolean _succeed;
+ private String _segmentName;
+ private Exception _exception;
+ private final Map<String, Object> _customProperties = new HashMap<>();
+
+ public Builder setSucceed(boolean succeed) {
+ _succeed = succeed;
+ return this;
+ }
+
+ public void setSegmentName(String segmentName) {
+ _segmentName = segmentName;
+ }
+
+ public Builder setException(Exception exception) {
+ _exception = exception;
+ return this;
+ }
+
+ public Builder setCustomProperty(String key, Object property) {
+ _customProperties.put(key, property);
+ return this;
+ }
+
+ public SegmentGenerationAndPushResult build() {
+ return new SegmentGenerationAndPushResult(_succeed, _segmentName, _exception, _customProperties);
+ }
+ }
+}
diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/SegmentGenerationAndPushTaskExecutor.java b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/SegmentGenerationAndPushTaskExecutor.java
new file mode 100644
index 0000000..0edb584
--- /dev/null
+++ b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/SegmentGenerationAndPushTaskExecutor.java
@@ -0,0 +1,187 @@
+package org.apache.pinot.minion.executor;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.File;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.utils.TarGzCompressionUtils;
+import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationTaskRunner;
+import org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationUtils;
+import org.apache.pinot.plugin.ingestion.batch.common.SegmentPushUtils;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.filesystem.LocalPinotFS;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
+import org.apache.pinot.spi.ingestion.batch.spec.Constants;
+import org.apache.pinot.spi.ingestion.batch.spec.PinotClusterSpec;
+import org.apache.pinot.spi.ingestion.batch.spec.PushJobSpec;
+import org.apache.pinot.spi.ingestion.batch.spec.RecordReaderSpec;
+import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
+import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationTaskSpec;
+import org.apache.pinot.spi.ingestion.batch.spec.SegmentNameGeneratorSpec;
+import org.apache.pinot.spi.ingestion.batch.spec.TableSpec;
+import org.apache.pinot.spi.utils.DataSizeUtils;
+import org.apache.pinot.spi.utils.IngestionConfigUtils;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.retry.AttemptsExceededException;
+import org.apache.pinot.spi.utils.retry.RetriableOperationException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class SegmentGenerationAndPushTaskExecutor extends BaseTaskExecutor {
+ private static final Logger LOGGER = LoggerFactory.getLogger(SegmentGenerationAndPushTaskExecutor.class);
+
+ private static final PinotFS LOCAL_PINOT_FS = new LocalPinotFS();
+
+ @Override
+ public Object executeTask(PinotTaskConfig pinotTaskConfig)
+ throws Exception {
+ Map<String, String> taskConfigs = pinotTaskConfig.getConfigs();
+ SegmentGenerationAndPushResult.Builder resultBuilder = new SegmentGenerationAndPushResult.Builder();
+ File localTempDir = new File(FileUtils.getTempDirectory(), "pinot-" + UUID.randomUUID());
+ try {
+ // Generate Pinot Segment
+ SegmentGenerationTaskSpec taskSpec = new SegmentGenerationTaskSpec();
+ URI inputFileURI = URI.create(taskConfigs.get(BatchConfigProperties.INPUT_FILE_URI));
+ File localInputTempDir = new File(localTempDir, "input");
+ FileUtils.forceMkdir(localInputTempDir);
+ File localOutputTempDir = new File(localTempDir, "output");
+ FileUtils.forceMkdir(localOutputTempDir);
+ String inputFileURIScheme = inputFileURI.getScheme();
+ if (inputFileURIScheme == null) {
+ inputFileURIScheme = PinotFSFactory.LOCAL_PINOT_FS_SCHEME;
+ }
+ if (!PinotFSFactory.isSchemeSupported(inputFileURIScheme)) {
+ String fsClass = taskConfigs.get(BatchConfigProperties.INPUT_FS_CLASS);
+ PinotConfiguration fsProps = IngestionConfigUtils.getFsProps(taskConfigs);
+ PinotFSFactory.register(inputFileURIScheme, fsClass, fsProps);
+ }
+ PinotFS inputFileFS = PinotFSFactory.create(inputFileURIScheme);
+ URI outputSegmentDirURI = URI.create(taskConfigs.get(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI));
+ String outputURIScheme = outputSegmentDirURI.getScheme();
+ if (outputURIScheme == null) {
+ outputURIScheme = PinotFSFactory.LOCAL_PINOT_FS_SCHEME;
+ }
+ PinotFS outputFileFS = PinotFSFactory.create(outputURIScheme);
+ //copy input path to local
+ File localInputDataFile = new File(localInputTempDir, new File(inputFileURI.getPath()).getName());
+ inputFileFS.copyToLocalFile(inputFileURI, localInputDataFile);
+ taskSpec.setInputFilePath(localInputDataFile.getAbsolutePath());
+ taskSpec.setOutputDirectoryPath(localOutputTempDir.getAbsolutePath());
+
+ RecordReaderSpec recordReaderSpec = new RecordReaderSpec();
+ recordReaderSpec.setDataFormat(taskConfigs.get(BatchConfigProperties.INPUT_FORMAT));
+ recordReaderSpec.setClassName(taskConfigs.get(BatchConfigProperties.RECORD_READER_CLASS));
+ recordReaderSpec.setConfigClassName(taskConfigs.get(BatchConfigProperties.RECORD_READER_CONFIG_CLASS));
+ taskSpec.setRecordReaderSpec(recordReaderSpec);
+ Schema schema;
+ if (taskConfigs.containsKey(BatchConfigProperties.SCHEMA)) {
+ schema = JsonUtils
+ .stringToObject(JsonUtils.objectToString(taskConfigs.get(BatchConfigProperties.SCHEMA)), Schema.class);
+ } else if (taskConfigs.containsKey(BatchConfigProperties.SCHEMA_URI)) {
+ schema = SegmentGenerationUtils.getSchema(taskConfigs.get(BatchConfigProperties.SCHEMA_URI));
+ } else {
+ throw new RuntimeException(
+ "Missing schema for segment generation job: please set `schema` or `schemaURI` in task config.");
+ }
+ taskSpec.setSchema(schema);
+ JsonNode tableConfig = JsonUtils.stringToJsonNode(taskConfigs.get(BatchConfigProperties.TABLE_CONFIGS));
+ taskSpec.setTableConfig(tableConfig);
+ taskSpec.setSequenceId(Integer.parseInt(taskConfigs.get(BatchConfigProperties.SEQUENCE_ID)));
+ SegmentNameGeneratorSpec segmentNameGeneratorSpec = new SegmentNameGeneratorSpec();
+ segmentNameGeneratorSpec.setType(taskConfigs.get(BatchConfigProperties.SEGMENT_NAME_GENERATOR_TYPE));
+ segmentNameGeneratorSpec.setConfigs(IngestionConfigUtils
+ .getConfigMapWithPrefix(taskConfigs, BatchConfigProperties.SEGMENT_NAME_GENERATOR_CONFIGS));
+ taskSpec.setSegmentNameGeneratorSpec(segmentNameGeneratorSpec);
+ taskSpec.setCustomProperty(BatchConfigProperties.INPUT_DATA_FILE_URI_KEY, inputFileURI.toString());
+ SegmentGenerationTaskRunner taskRunner = new SegmentGenerationTaskRunner(taskSpec);
+ String segmentName = taskRunner.run();
+ // Tar segment directory to compress file
+ File localSegmentDir = new File(localOutputTempDir, segmentName);
+ String segmentTarFileName = segmentName + Constants.TAR_GZ_FILE_EXT;
+ File localSegmentTarFile = new File(localOutputTempDir, segmentTarFileName);
+ LOGGER.info("Tarring segment from: {} to: {}", localSegmentDir, localSegmentTarFile);
+ TarGzCompressionUtils.createTarGzFile(localSegmentDir, localSegmentTarFile);
+ long uncompressedSegmentSize = FileUtils.sizeOf(localSegmentDir);
+ long compressedSegmentSize = FileUtils.sizeOf(localSegmentTarFile);
+ LOGGER.info("Size for segment: {}, uncompressed: {}, compressed: {}", segmentName,
+ DataSizeUtils.fromBytes(uncompressedSegmentSize), DataSizeUtils.fromBytes(compressedSegmentSize));
+ //move segment to output PinotFS
+ URI outputSegmentTarURI = URI.create(outputSegmentDirURI + segmentTarFileName);
+ if (!Boolean.parseBoolean(taskConfigs.get(BatchConfigProperties.OVERWRITE_OUTPUT)) && outputFileFS
+ .exists(outputSegmentDirURI)) {
+ LOGGER.warn("Not overwrite existing output segment tar file: {}", outputFileFS.exists(outputSegmentDirURI));
+ } else {
+ outputFileFS.copyFromLocalFile(localSegmentTarFile, outputSegmentTarURI);
+ }
+ resultBuilder.setSegmentName(segmentName);
+ // Segment push task
+ //Get list of files to process
+ String pushMode = taskConfigs.get(BatchConfigProperties.PUSH_MODE);
+ PushJobSpec pushJobSpec = new PushJobSpec();
+ pushJobSpec.setPushAttempts(5);
+ pushJobSpec.setPushParallelism(1);
+ pushJobSpec.setPushRetryIntervalMillis(1000);
+ pushJobSpec.setSegmentUriPrefix(taskConfigs.get(BatchConfigProperties.PUSH_SEGMENT_URI_PREFIX));
+ pushJobSpec.setSegmentUriSuffix(taskConfigs.get(BatchConfigProperties.PUSH_SEGMENT_URI_SUFFIX));
+ SegmentGenerationJobSpec spec = new SegmentGenerationJobSpec();
+ spec.setPushJobSpec(pushJobSpec);
+ TableSpec tableSpec = new TableSpec();
+ tableSpec.setTableName(tableConfig.get(BatchConfigProperties.TABLE_NAME).asText());
+ spec.setTableSpec(tableSpec);
+ PinotClusterSpec pinotClusterSpec = new PinotClusterSpec();
+ pinotClusterSpec.setControllerURI(taskConfigs.get(BatchConfigProperties.PUSH_CONTROLLER_URI));
+ PinotClusterSpec[] pinotClusterSpecs = new PinotClusterSpec[]{pinotClusterSpec};
+ spec.setPinotClusterSpecs(pinotClusterSpecs);
+ switch (BatchConfigProperties.SegmentPushType.valueOf(pushMode.toUpperCase())) {
+ case TAR:
+ try {
+ SegmentPushUtils.pushSegments(spec, LOCAL_PINOT_FS, Arrays.asList(outputSegmentTarURI.toString()));
+ } catch (RetriableOperationException | AttemptsExceededException e) {
+ throw new RuntimeException(e);
+ }
+ break;
+ case URI:
+ try {
+ List<String> segmentUris = new ArrayList<>();
+ URI updatedURI = SegmentPushUtils
+ .generateSegmentTarURI(outputSegmentDirURI, outputSegmentTarURI, pushJobSpec.getSegmentUriPrefix(),
+ pushJobSpec.getSegmentUriSuffix());
+ segmentUris.add(updatedURI.toString());
+ SegmentPushUtils.sendSegmentUris(spec, segmentUris);
+ } catch (RetriableOperationException | AttemptsExceededException e) {
+ throw new RuntimeException(e);
+ }
+ break;
+ case METADATA:
+ try {
+ Map<String, String> segmentUriToTarPathMap = SegmentPushUtils
+ .getSegmentUriToTarPathMap(outputSegmentDirURI, pushJobSpec.getSegmentUriPrefix(),
+ pushJobSpec.getSegmentUriSuffix(), new String[]{outputSegmentTarURI.toString()});
+ SegmentPushUtils.sendSegmentUriAndMetadata(spec, outputFileFS, segmentUriToTarPathMap);
+ } catch (RetriableOperationException | AttemptsExceededException e) {
+ throw new RuntimeException(e);
+ }
+ break;
+ default:
+ throw new Exception("Unrecognized push mode - " + pushMode);
+ }
+ resultBuilder.setSucceed(true);
+ } catch (Exception e) {
+ resultBuilder.setException(e);
+ } finally {
+ // Cleanup output dir
+ FileUtils.deleteQuietly(localTempDir);
+ }
+ return resultBuilder.build();
+ }
+}
diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/SegmentGenerationAndPushTaskExecutorFactory.java b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/SegmentGenerationAndPushTaskExecutorFactory.java
new file mode 100644
index 0000000..e4b6447
--- /dev/null
+++ b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/SegmentGenerationAndPushTaskExecutorFactory.java
@@ -0,0 +1,8 @@
+package org.apache.pinot.minion.executor;
+
+public class SegmentGenerationAndPushTaskExecutorFactory implements PinotTaskExecutorFactory {
+ @Override
+ public PinotTaskExecutor create() {
+ return new SegmentGenerationAndPushTaskExecutor();
+ }
+}
diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/TaskExecutorFactoryRegistry.java b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/TaskExecutorFactoryRegistry.java
index 1b783dc..a86a39b 100644
--- a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/TaskExecutorFactoryRegistry.java
+++ b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/TaskExecutorFactoryRegistry.java
@@ -38,6 +38,7 @@ public class TaskExecutorFactoryRegistry {
registerTaskExecutorFactory(MinionConstants.MergeRollupTask.TASK_TYPE, new MergeRollupTaskExecutorFactory());
registerTaskExecutorFactory(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE,
new RealtimeToOfflineSegmentsTaskExecutorFactory(minionTaskZkMetadataManager));
+ registerTaskExecutorFactory(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE, new SegmentGenerationAndPushTaskExecutorFactory());
}
/**
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/BatchQuickstartWithMinion.java b/pinot-tools/src/main/java/org/apache/pinot/tools/BatchQuickstartWithMinion.java
new file mode 100644
index 0000000..9dc11b6
--- /dev/null
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/BatchQuickstartWithMinion.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tools;
+
+import org.apache.pinot.spi.plugin.PluginManager;
+
+
+public class BatchQuickstartWithMinion extends Quickstart {
+
+ public String getBootstrapDataDir() {
+ return "examples/minions/batch/baseballStats";
+ }
+
+ public static void main(String[] args)
+ throws Exception {
+ PluginManager.get().init();
+ new BatchQuickstartWithMinion().execute();
+ }
+}
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/BootstrapTableTool.java b/pinot-tools/src/main/java/org/apache/pinot/tools/BootstrapTableTool.java
index 0723b06..fa70e8a 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/BootstrapTableTool.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/BootstrapTableTool.java
@@ -21,12 +21,24 @@ package org.apache.pinot.tools;
import com.google.common.base.Preconditions;
import java.io.BufferedReader;
import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.Reader;
+import java.net.URI;
import java.net.URL;
+import java.util.List;
+import java.util.Map;
import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.minion.MinionClient;
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.ingestion.batch.BatchConfig;
+import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
import org.apache.pinot.spi.ingestion.batch.IngestionJobLauncher;
import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.tools.admin.command.AddTableCommand;
import org.apache.pinot.tools.utils.JarUtils;
import org.slf4j.Logger;
@@ -36,9 +48,11 @@ import org.yaml.snakeyaml.Yaml;
public class BootstrapTableTool {
private static final Logger LOGGER = LoggerFactory.getLogger(BootstrapTableTool.class);
+ private static final String COMPLETED = "COMPLETED";
private final String _controllerHost;
private final int _controllerPort;
private final String _tableDir;
+ private final MinionClient _minionClient;
public BootstrapTableTool(String controllerHost, int controllerPort, String tableDir) {
Preconditions.checkNotNull(controllerHost);
@@ -46,11 +60,13 @@ public class BootstrapTableTool {
_controllerHost = controllerHost;
_controllerPort = controllerPort;
_tableDir = tableDir;
+ _minionClient = new MinionClient(controllerHost, String.valueOf(controllerPort));
}
public boolean execute()
throws Exception {
File setupTableTmpDir = new File(FileUtils.getTempDirectory(), String.valueOf(System.currentTimeMillis()));
+ setupTableTmpDir.mkdirs();
File tableDir = new File(_tableDir);
String tableName = tableDir.getName();
@@ -95,41 +111,121 @@ public class BootstrapTableTool {
.setTableConfigFile(tableConfigFile.getAbsolutePath()).setControllerHost(_controllerHost)
.setControllerPort(String.valueOf(_controllerPort)).setExecute(true).execute();
}
+
private boolean bootstrapOfflineTable(File setupTableTmpDir, String tableName, File schemaFile,
File offlineTableConfigFile, File ingestionJobSpecFile)
throws Exception {
- LOGGER.info("Adding offline table: {}", tableName);
- boolean tableCreationResult = createTable(schemaFile, offlineTableConfigFile);
+ TableConfig tableConfig =
+ JsonUtils.inputStreamToObject(new FileInputStream(offlineTableConfigFile), TableConfig.class);
+ if (tableConfig.getIngestionConfig() != null
+ && tableConfig.getIngestionConfig().getBatchIngestionConfig() != null) {
+ updatedTableConfig(tableConfig, tableName, setupTableTmpDir);
+ }
+ LOGGER.info("Adding offline table: {}", tableName);
+ File updatedTableConfigFile =
+ new File(setupTableTmpDir, String.format("%s_%d.config", tableName, System.currentTimeMillis()));
+ FileOutputStream outputStream = new FileOutputStream(updatedTableConfigFile);
+ outputStream.write(JsonUtils.objectToPrettyString(tableConfig).getBytes());
+ outputStream.close();
+ boolean tableCreationResult = createTable(schemaFile, updatedTableConfigFile);
if (!tableCreationResult) {
throw new RuntimeException(String
.format("Unable to create offline table - %s from schema file [%s] and table conf file [%s].", tableName,
schemaFile, offlineTableConfigFile));
}
+ if (tableConfig.getTaskConfig() != null) {
+ _minionClient.scheduleMinionTasks();
+ waitForMinionTaskToFinish(30_000L);
+ }
+ if (ingestionJobSpecFile != null) {
+ if (ingestionJobSpecFile.exists()) {
+ LOGGER.info("Launch data ingestion job to build index segment for table {} and push to controller [{}:{}]",
+ tableName, _controllerHost, _controllerPort);
+ try (Reader reader = new BufferedReader(new FileReader(ingestionJobSpecFile.getAbsolutePath()))) {
+ SegmentGenerationJobSpec spec = new Yaml().loadAs(reader, SegmentGenerationJobSpec.class);
+ String inputDirURI = spec.getInputDirURI();
+ if (!new File(inputDirURI).exists()) {
+ URL resolvedInputDirURI = BootstrapTableTool.class.getClassLoader().getResource(inputDirURI);
+ if (resolvedInputDirURI.getProtocol().equals("jar")) {
+ String[] splits = resolvedInputDirURI.getFile().split("!");
+ String inputDir = new File(setupTableTmpDir, "inputData").toString();
+ JarUtils.copyResourcesToDirectory(splits[0], splits[1].substring(1), inputDir);
+ spec.setInputDirURI(inputDir);
+ } else {
+ spec.setInputDirURI(resolvedInputDirURI.toString());
+ }
+ }
+ IngestionJobLauncher.runIngestionJob(spec);
+ }
+ } else {
+ LOGGER.info("Not found ingestionJobSpec.yaml at location [{}], skipping data ingestion",
+ ingestionJobSpecFile.getAbsolutePath());
+ }
+ }
+ return true;
+ }
- if (ingestionJobSpecFile.exists()) {
- LOGGER.info("Launch data ingestion job to build index segment for table {} and push to controller [{}:{}]",
- tableName, _controllerHost, _controllerPort);
- try (Reader reader = new BufferedReader(new FileReader(ingestionJobSpecFile.getAbsolutePath()))) {
- SegmentGenerationJobSpec spec = new Yaml().loadAs(reader, SegmentGenerationJobSpec.class);
- String inputDirURI = spec.getInputDirURI();
- if (!new File(inputDirURI).exists()) {
- URL resolvedInputDirURI = BootstrapTableTool.class.getClassLoader().getResource(inputDirURI);
- if (resolvedInputDirURI.getProtocol().equals("jar")) {
+ private void updatedTableConfig(TableConfig tableConfig, String tableName, File setupTableTmpDir)
+ throws Exception {
+ final List<Map<String, String>> batchConfigsMaps =
+ tableConfig.getIngestionConfig().getBatchIngestionConfig().getBatchConfigMaps();
+ for (Map<String, String> batchConfigsMap : batchConfigsMaps) {
+ BatchConfig batchConfig = new BatchConfig(TableNameBuilder.OFFLINE.tableNameWithType(tableName), batchConfigsMap);
+ String inputDirURI = batchConfig.getInputDirURI();
+ if (!new File(inputDirURI).exists()) {
+ URL resolvedInputDirURI = BootstrapTableTool.class.getClassLoader().getResource(inputDirURI);
+ if (resolvedInputDirURI != null) {
+ if ("jar".equals(resolvedInputDirURI.getProtocol())) {
String[] splits = resolvedInputDirURI.getFile().split("!");
- String inputDir = new File(setupTableTmpDir, "inputData").toString();
- JarUtils.copyResourcesToDirectory(splits[0], splits[1].substring(1), inputDir);
- spec.setInputDirURI(inputDir);
+ File inputDir = new File(setupTableTmpDir, "inputData");
+ JarUtils.copyResourcesToDirectory(splits[0], splits[1].substring(1), inputDir.toString());
+ batchConfigsMap.put(BatchConfigProperties.INPUT_DIR_URI, inputDir.toURI().toString());
+ batchConfigsMap.put(BatchConfigProperties.OUTPUT_DIR_URI,
+ new File(inputDir.getParent(), "segments").toURI().toString());
} else {
- spec.setInputDirURI(resolvedInputDirURI.toString());
+ final URI inputURI = resolvedInputDirURI.toURI();
+ batchConfigsMap.put(BatchConfigProperties.INPUT_DIR_URI, inputURI.toString());
+ URI outputURI =
+ inputURI.getPath().endsWith("/") ? inputURI.resolve("../segments") : inputURI.resolve("./segments");
+ batchConfigsMap.put(BatchConfigProperties.OUTPUT_DIR_URI, outputURI.toString());
}
}
- IngestionJobLauncher.runIngestionJob(spec);
}
- } else {
- LOGGER.info("Not found ingestionJobSpec.yaml at location [{}], skipping data ingestion",
- ingestionJobSpecFile.getAbsolutePath());
}
- return true;
+ }
+
+ private boolean waitForMinionTaskToFinish(long timeoutInMillis) {
+ long startTime = System.currentTimeMillis();
+ while (System.currentTimeMillis() - startTime < timeoutInMillis) {
+ try {
+ Thread.sleep(500L);
+ } catch (InterruptedException e) {
+ // Swallow the exception
+ }
+ try {
+ final Map<String, String> taskStatesMap =
+ _minionClient.getTasksStates(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE);
+ if (taskStatesMap.isEmpty()) {
+ LOGGER.info("No scheduled tasks yet, sleep 500 millis seconds");
+ continue;
+ }
+ boolean allCompleted = true;
+ for (String taskId : taskStatesMap.keySet()) {
+ if (!COMPLETED.equalsIgnoreCase(taskStatesMap.get(taskId))) {
+ allCompleted = false;
+ break;
+ }
+ }
+ if (allCompleted) {
+ LOGGER.info("All minion tasks are completed.");
+ return true;
+ }
+ } catch (Exception e) {
+ LOGGER.error("Failed to query task endpoint", e);
+ continue;
+ }
+ }
+ return false;
}
}
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/Quickstart.java b/pinot-tools/src/main/java/org/apache/pinot/tools/Quickstart.java
index 69f5941..475a8ec 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/Quickstart.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/Quickstart.java
@@ -43,6 +43,10 @@ public class Quickstart {
}
}
+ public String getBootstrapDataDir() {
+ return "examples/batch/baseballStats";
+ }
+
public static void printStatus(Color color, String message) {
System.out.println(color._code + message + Color.RESET._code);
}
@@ -144,16 +148,17 @@ public class Quickstart {
File dataFile = new File(dataDir, "baseballStats_data.csv");
ClassLoader classLoader = Quickstart.class.getClassLoader();
- URL resource = classLoader.getResource("examples/batch/baseballStats/baseballStats_schema.json");
+ URL resource = classLoader.getResource(getBootstrapDataDir() + "/baseballStats_schema.json");
com.google.common.base.Preconditions.checkNotNull(resource);
FileUtils.copyURLToFile(resource, schemaFile);
- resource = classLoader.getResource("examples/batch/baseballStats/rawdata/baseballStats_data.csv");
+ resource = classLoader.getResource(getBootstrapDataDir() + "/rawdata/baseballStats_data.csv");
com.google.common.base.Preconditions.checkNotNull(resource);
FileUtils.copyURLToFile(resource, dataFile);
- resource = classLoader.getResource("examples/batch/baseballStats/ingestionJobSpec.yaml");
- com.google.common.base.Preconditions.checkNotNull(resource);
- FileUtils.copyURLToFile(resource, ingestionJobSpecFile);
- resource = classLoader.getResource("examples/batch/baseballStats/baseballStats_offline_table_config.json");
+ resource = classLoader.getResource(getBootstrapDataDir() + "/ingestionJobSpec.yaml");
+ if (resource != null) {
+ FileUtils.copyURLToFile(resource, ingestionJobSpecFile);
+ }
+ resource = classLoader.getResource(getBootstrapDataDir() + "/baseballStats_offline_table_config.json");
com.google.common.base.Preconditions.checkNotNull(resource);
FileUtils.copyURLToFile(resource, tableConfigFile);
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java
index e2e3e38..072effd 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java
@@ -53,6 +53,8 @@ public class QuickstartRunner {
private static final int DEFAULT_SERVER_ADMIN_API_PORT = 7500;
private static final int DEFAULT_BROKER_PORT = 8000;
private static final int DEFAULT_CONTROLLER_PORT = 9000;
+ private static final int DEFAULT_MINION_PORT = 6000;
+
private static final String DEFAULT_ZK_DIR = "PinotZkDir";
private static final String DEFAULT_CONTROLLER_DIR = "PinotControllerDir";
@@ -63,6 +65,7 @@ public class QuickstartRunner {
private final int _numServers;
private final int _numBrokers;
private final int _numControllers;
+ private final int _numMinions;
private final File _tempDir;
private final boolean _enableTenantIsolation;
@@ -74,10 +77,17 @@ public class QuickstartRunner {
public QuickstartRunner(List<QuickstartTableRequest> tableRequests, int numServers, int numBrokers,
int numControllers, File tempDir, boolean enableIsolation)
throws Exception {
+ this(tableRequests, numServers, numBrokers, numControllers, 1, tempDir, enableIsolation);
+ }
+
+ public QuickstartRunner(List<QuickstartTableRequest> tableRequests, int numServers, int numBrokers,
+ int numControllers, int numMinions, File tempDir, boolean enableIsolation)
+ throws Exception {
_tableRequests = tableRequests;
_numServers = numServers;
_numBrokers = numBrokers;
_numControllers = numControllers;
+ _numMinions = numMinions;
_tempDir = tempDir;
_enableTenantIsolation = enableIsolation;
clean();
@@ -131,6 +141,16 @@ public class QuickstartRunner {
}
}
+ private void startMinions()
+ throws Exception {
+ for (int i = 0; i < _numMinions; i++) {
+ StartMinionCommand minionStarter = new StartMinionCommand();
+ minionStarter.setMinionPort(DEFAULT_MINION_PORT + i)
+ .setZkAddress(ZK_ADDRESS).setClusterName(CLUSTER_NAME);
+ minionStarter.execute();
+ }
+ }
+
private void clean()
throws Exception {
FileUtils.cleanDirectory(_tempDir);
@@ -142,6 +162,7 @@ public class QuickstartRunner {
startControllers();
startBrokers();
startServers();
+ startMinions();
}
public void stop()
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartMinionCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartMinionCommand.java
index 4ba95bc..74cb81f 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartMinionCommand.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartMinionCommand.java
@@ -110,4 +110,24 @@ public class StartMinionCommand extends AbstractBaseAdminCommand implements Comm
}
return PinotConfigUtils.generateMinionConf(_minionHost, _minionPort);
}
+
+ public StartMinionCommand setMinionHost(String minionHost) {
+ _minionHost = minionHost;
+ return this;
+ }
+
+ public StartMinionCommand setMinionPort(int minionPort) {
+ _minionPort = minionPort;
+ return this;
+ }
+
+ public StartMinionCommand setZkAddress(String zkAddress) {
+ _zkAddress = zkAddress;
+ return this;
+ }
+
+ public StartMinionCommand setClusterName(String clusterName) {
+ _clusterName = clusterName;
+ return this;
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org