You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2020/12/30 07:01:57 UTC
[incubator-pinot] 01/01: Adding InsertData sub command for
pinot-admin
This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch adding_InsertData_sub_command
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 5985dc5c0eeaec0516cf8cd33b251c9617cd6540
Author: Xiang Fu <fx...@gmail.com>
AuthorDate: Tue Dec 29 16:21:53 2020 -0800
Adding InsertData sub command for pinot-admin
---
.../helix/ControllerRequestURLBuilder.java | 3 +
.../batch/common/SegmentGenerationTaskRunner.java | 7 +
pinot-tools/pom.xml | 16 +-
.../pinot/tools/admin/PinotAdministrator.java | 2 +
.../tools/admin/command/ImportDataCommand.java | 340 +++++++++++++++++++++
5 files changed, 362 insertions(+), 6 deletions(-)
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestURLBuilder.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestURLBuilder.java
index 5d3e297..333d1e6 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestURLBuilder.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestURLBuilder.java
@@ -211,6 +211,9 @@ public class ControllerRequestURLBuilder {
}
return url;
}
+ public String forTableSchemaGet(String tableName) {
+ return StringUtil.join("/", _baseUrl, "tables", tableName, "schema");
+ }
public String forSchemaCreate() {
return StringUtil.join("/", _baseUrl, "schemas");
diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationTaskRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationTaskRunner.java
index 2bd0762..7b9a5ef 100644
--- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationTaskRunner.java
+++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationTaskRunner.java
@@ -26,6 +26,7 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
import org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.core.segment.name.FixedSegmentNameGenerator;
import org.apache.pinot.core.segment.name.NormalizedDateSegmentNameGenerator;
import org.apache.pinot.core.segment.name.SegmentNameGenerator;
import org.apache.pinot.core.segment.name.SimpleSegmentNameGenerator;
@@ -44,9 +45,13 @@ import org.apache.pinot.spi.utils.JsonUtils;
public class SegmentGenerationTaskRunner implements Serializable {
+ public static final String FIXED_SEGMENT_NAME_GENERATOR = "fixed";
public static final String SIMPLE_SEGMENT_NAME_GENERATOR = "simple";
public static final String NORMALIZED_DATE_SEGMENT_NAME_GENERATOR = "normalizedDate";
+ // For FixedSegmentNameGenerator
+ public static final String SEGMENT_NAME = "segment.name";
+
// For SimpleSegmentNameGenerator
public static final String SEGMENT_NAME_POSTFIX = "segment.name.postfix";
@@ -123,6 +128,8 @@ public class SegmentGenerationTaskRunner implements Serializable {
segmentNameGeneratorConfigs = new HashMap<>();
}
switch (segmentNameGeneratorType) {
+ case FIXED_SEGMENT_NAME_GENERATOR:
+ return new FixedSegmentNameGenerator(segmentNameGeneratorConfigs.get(SEGMENT_NAME));
case SIMPLE_SEGMENT_NAME_GENERATOR:
return new SimpleSegmentNameGenerator(tableName, segmentNameGeneratorConfigs.get(SEGMENT_NAME_POSTFIX));
case NORMALIZED_DATE_SEGMENT_NAME_GENERATOR:
diff --git a/pinot-tools/pom.xml b/pinot-tools/pom.xml
index 4dd0a16..25b6516 100644
--- a/pinot-tools/pom.xml
+++ b/pinot-tools/pom.xml
@@ -57,6 +57,11 @@
</dependency>
<dependency>
<groupId>org.apache.pinot</groupId>
+ <artifactId>pinot-batch-ingestion-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.pinot</groupId>
<artifactId>pinot-avro</artifactId>
<version>${project.version}</version>
</dependency>
@@ -67,6 +72,11 @@
</dependency>
<dependency>
<groupId>org.apache.pinot</groupId>
+ <artifactId>pinot-protobuf</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.pinot</groupId>
<artifactId>pinot-json</artifactId>
<version>${project.version}</version>
</dependency>
@@ -92,12 +102,6 @@
<scope>runtime</scope>
</dependency>
<dependency>
- <groupId>org.apache.pinot</groupId>
- <artifactId>pinot-batch-ingestion-standalone</artifactId>
- <version>${project.version}</version>
- <scope>runtime</scope>
- </dependency>
- <dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
</dependency>
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/PinotAdministrator.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/PinotAdministrator.java
index d1fc6df..4e0bf65 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/PinotAdministrator.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/PinotAdministrator.java
@@ -34,6 +34,7 @@ import org.apache.pinot.tools.admin.command.CreateSegmentCommand;
import org.apache.pinot.tools.admin.command.DeleteClusterCommand;
import org.apache.pinot.tools.admin.command.GenerateDataCommand;
import org.apache.pinot.tools.admin.command.GitHubEventsQuickStartCommand;
+import org.apache.pinot.tools.admin.command.ImportDataCommand;
import org.apache.pinot.tools.admin.command.LaunchDataIngestionJobCommand;
import org.apache.pinot.tools.admin.command.MoveReplicaGroup;
import org.apache.pinot.tools.admin.command.OfflineSegmentIntervalCheckerCommand;
@@ -98,6 +99,7 @@ public class PinotAdministrator {
@SubCommand(name = "GenerateData", impl = GenerateDataCommand.class),
@SubCommand(name = "LaunchDataIngestionJob", impl = LaunchDataIngestionJobCommand.class),
@SubCommand(name = "CreateSegment", impl = CreateSegmentCommand.class),
+ @SubCommand(name = "ImportData", impl = ImportDataCommand.class),
@SubCommand(name = "StartZookeeper", impl = StartZookeeperCommand.class),
@SubCommand(name = "StartKafka", impl = StartKafkaCommand.class),
@SubCommand(name = "StreamAvroIntoKafka", impl = StreamAvroIntoKafkaCommand.class),
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/ImportDataCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/ImportDataCommand.java
new file mode 100644
index 0000000..36c596f
--- /dev/null
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/ImportDataCommand.java
@@ -0,0 +1,340 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tools.admin.command;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.controller.helix.ControllerRequestURLBuilder;
+import org.apache.pinot.spi.data.readers.FileFormat;
+import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
+import org.apache.pinot.spi.ingestion.batch.IngestionJobLauncher;
+import org.apache.pinot.spi.ingestion.batch.spec.ExecutionFrameworkSpec;
+import org.apache.pinot.spi.ingestion.batch.spec.PinotClusterSpec;
+import org.apache.pinot.spi.ingestion.batch.spec.PinotFSSpec;
+import org.apache.pinot.spi.ingestion.batch.spec.PushJobSpec;
+import org.apache.pinot.spi.ingestion.batch.spec.RecordReaderSpec;
+import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
+import org.apache.pinot.spi.ingestion.batch.spec.SegmentNameGeneratorSpec;
+import org.apache.pinot.spi.ingestion.batch.spec.TableSpec;
+import org.apache.pinot.spi.utils.IngestionConfigUtils;
+import org.apache.pinot.tools.Command;
+import org.kohsuke.args4j.Option;
+import org.kohsuke.args4j.spi.StringArrayOptionHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Class to implement ImportData command.
+ */
+@SuppressWarnings("unused")
+public class ImportDataCommand extends AbstractBaseAdminCommand implements Command {
+ private static final Logger LOGGER = LoggerFactory.getLogger(ImportDataCommand.class);
+ private static final String SEGMENT_NAME = "segment.name";
+
+ @Option(name = "-dataFilePath", required = true, metaVar = "<string>", usage = "data file path.")
+ private String _dataFilePath;
+
+ @Option(name = "-format", required = true, metaVar = "<AVRO/CSV/JSON/THRIFT/PARQUET/ORC>", usage = "Input data format.")
+ private FileFormat _format;
+
+ @Option(name = "-table", required = true, metaVar = "<string>", usage = "Table name.")
+ private String _table;
+
+ @Option(name = "-controllerURI", metaVar = "<string>", usage = "Pinot Controller URI.")
+ private String _controllerURI = "http://localhost:9000";
+
+ @Option(name = "-tempDir", metaVar = "<string>", usage = "Temporary directory used to hold data during segment creation.")
+ private String _tempDir = new File(FileUtils.getTempDirectory(), getClass().getSimpleName()).getAbsolutePath();
+
+ @Option(name = "-extraConfigs", metaVar = "<extra configs>", handler = StringArrayOptionHandler.class, usage = "Extra configs to be set.")
+ private List<String> _extraConfigs;
+
+ @SuppressWarnings("FieldCanBeLocal")
+ @Option(name = "-help", help = true, aliases = {"-h", "--h", "--help"}, usage = "Print this message.")
+ private final boolean _help = false;
+
+ public ImportDataCommand setDataFilePath(String dataFilePath) {
+ _dataFilePath = dataFilePath;
+ return this;
+ }
+
+ public ImportDataCommand setFormat(FileFormat format) {
+ _format = format;
+ return this;
+ }
+
+ public ImportDataCommand setTable(String table) {
+ _table = table;
+ return this;
+ }
+
+ public ImportDataCommand setControllerURI(String controllerURI) {
+ _controllerURI = controllerURI;
+ return this;
+ }
+
+ public ImportDataCommand setTempDir(String tempDir) {
+ _tempDir = tempDir;
+ return this;
+ }
+
+ public List<String> getExtraConfigs() {
+ return _extraConfigs;
+ }
+
+ public ImportDataCommand setExtraConfigs(List<String> extraConfigs) {
+ _extraConfigs = extraConfigs;
+ return this;
+ }
+
+ public String getDataFilePath() {
+ return _dataFilePath;
+ }
+
+ public FileFormat getFormat() {
+ return _format;
+ }
+
+ public String getTable() {
+ return _table;
+ }
+
+ public String getControllerURI() {
+ return _controllerURI;
+ }
+
+ public String getTempDir() {
+ return _tempDir;
+ }
+
+ @Override
+ public String toString() {
+ String results = String
+ .format("InsertData -dataFilePath %s -format %s -table %s -controllerURI %s -tempDir %s", _dataFilePath,
+ _format, _table, _controllerURI, _tempDir);
+ if (_extraConfigs != null) {
+ results += " -extraConfigs " + Arrays.toString(_extraConfigs.toArray());
+ }
+ return results;
+ }
+
+ @Override
+ public final String getName() {
+ return "InsertData";
+ }
+
+ @Override
+ public String description() {
+ return "Insert data into Pinot cluster.";
+ }
+
+ @Override
+ public boolean getHelp() {
+ return _help;
+ }
+
+ @Override
+ public boolean execute()
+ throws IOException {
+ LOGGER.info("Executing command: {}", toString());
+ Preconditions.checkArgument(_table != null, "'table' must be specified");
+ Preconditions.checkArgument(_format != null, "'format' must be specified");
+ Preconditions.checkArgument(_dataFilePath != null, "'dataFilePath' must be specified");
+
+ try {
+
+ URI dataFileURI = URI.create(_dataFilePath);
+ if ((dataFileURI.getScheme() == null)) {
+ File dataFile = new File(_dataFilePath);
+ Preconditions.checkArgument(dataFile.exists(), "'dataFile': '%s' doesn't exist", dataFile);
+ LOGGER.info("Found data files: {} of format: {}", dataFile, _format);
+ }
+
+ initTempDir();
+ IngestionJobLauncher.runIngestionJob(generateSegmentGenerationJobSpec());
+ LOGGER.info("Successfully load data from {} to Pinot.", _dataFilePath);
+ return true;
+ } catch (Exception e) {
+ throw e;
+ } finally {
+ FileUtils.deleteQuietly(new File(_tempDir));
+ }
+ }
+
+ private void initTempDir()
+ throws IOException {
+ File tempDir = new File(_tempDir);
+ if (tempDir.exists()) {
+ LOGGER.info("Deleting the existing 'tempDir': {}", tempDir);
+ FileUtils.forceDelete(tempDir);
+ }
+ FileUtils.forceMkdir(tempDir);
+ }
+
+ private SegmentGenerationJobSpec generateSegmentGenerationJobSpec() {
+ final Map<String, String> extraConfigs = getExtraConfigs(_extraConfigs);
+
+ SegmentGenerationJobSpec spec = new SegmentGenerationJobSpec();
+ URI dataFileURI = URI.create(_dataFilePath);
+ URI parent = dataFileURI.getPath().endsWith("/") ? dataFileURI.resolve("..") : dataFileURI.resolve(".");
+ spec.setInputDirURI(parent.toString());
+ spec.setIncludeFileNamePattern("glob:**" + dataFileURI.getPath());
+ spec.setOutputDirURI(_tempDir);
+ spec.setCleanUpOutputDir(true);
+ spec.setOverwriteOutput(true);
+ spec.setJobType("SegmentCreationAndTarPush");
+
+ // set ExecutionFrameworkSpec
+ ExecutionFrameworkSpec executionFrameworkSpec = new ExecutionFrameworkSpec();
+ executionFrameworkSpec.setName("standalone");
+ executionFrameworkSpec.setSegmentGenerationJobRunnerClassName(
+ "org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner");
+ executionFrameworkSpec.setSegmentTarPushJobRunnerClassName(
+ "org.apache.pinot.plugin.ingestion.batch.standalone.SegmentTarPushJobRunner");
+ spec.setExecutionFrameworkSpec(executionFrameworkSpec);
+
+ // set PinotFSSpecs
+ List<PinotFSSpec> pinotFSSpecs = new ArrayList<>();
+ pinotFSSpecs.add(getPinotFSSpec("file", "org.apache.pinot.spi.filesystem.LocalPinotFS", Collections.emptyMap()));
+ pinotFSSpecs
+ .add(getPinotFSSpec("s3", "org.apache.pinot.plugin.filesystem.S3PinotFS", getS3PinotFSConfigs(extraConfigs)));
+ spec.setPinotFSSpecs(pinotFSSpecs);
+
+ // set RecordReaderSpec
+ RecordReaderSpec recordReaderSpec = new RecordReaderSpec();
+ recordReaderSpec.setDataFormat(_format.name());
+ recordReaderSpec.setClassName(getRecordReaderClass(_format));
+ recordReaderSpec.setConfigClassName(getRecordReaderConfigClass(_format));
+ recordReaderSpec.setConfigs(IngestionConfigUtils.getRecordReaderProps(extraConfigs));
+ spec.setRecordReaderSpec(recordReaderSpec);
+
+ // set TableSpec
+ TableSpec tableSpec = new TableSpec();
+ tableSpec.setTableName(_table);
+ tableSpec.setSchemaURI(ControllerRequestURLBuilder.baseUrl(_controllerURI).forTableSchemaGet(_table));
+ tableSpec.setTableConfigURI(ControllerRequestURLBuilder.baseUrl(_controllerURI).forTableGet(_table));
+ spec.setTableSpec(tableSpec);
+
+ // set SegmentNameGeneratorSpec
+ SegmentNameGeneratorSpec segmentNameGeneratorSpec = new SegmentNameGeneratorSpec();
+ segmentNameGeneratorSpec
+ .setType(org.apache.pinot.spi.ingestion.batch.BatchConfigProperties.SegmentNameGeneratorType.FIXED);
+ String segmentName = (extraConfigs.containsKey(SEGMENT_NAME)) ? extraConfigs.get(SEGMENT_NAME)
+ : String.format("%s_%s", _table, DigestUtils.sha256Hex(_dataFilePath));
+ segmentNameGeneratorSpec.setConfigs(ImmutableMap.of(SEGMENT_NAME, segmentName));
+ spec.setSegmentNameGeneratorSpec(segmentNameGeneratorSpec);
+
+ // set PinotClusterSpecs
+ PinotClusterSpec pinotClusterSpec = new PinotClusterSpec();
+ pinotClusterSpec.setControllerURI(_controllerURI);
+ PinotClusterSpec[] pinotClusterSpecs = new PinotClusterSpec[]{pinotClusterSpec};
+ spec.setPinotClusterSpecs(pinotClusterSpecs);
+
+ // set PushJobSpec
+ PushJobSpec pushJobSpec = new PushJobSpec();
+ pushJobSpec.setPushAttempts(3);
+ pushJobSpec.setPushRetryIntervalMillis(10000);
+ spec.setPushJobSpec(pushJobSpec);
+
+ return spec;
+ }
+
+ private Map<String, String> getS3PinotFSConfigs(Map<String, String> extraConfigs) {
+ Map<String, String> s3PinotFSConfigs = new HashMap<>();
+ s3PinotFSConfigs.put("region", System.getProperty("AWS_REGION", "us-west-2"));
+ s3PinotFSConfigs.putAll(IngestionConfigUtils.getConfigMapWithPrefix(extraConfigs,
+ BatchConfigProperties.INPUT_FS_PROP_PREFIX + IngestionConfigUtils.DOT_SEPARATOR));
+ return s3PinotFSConfigs;
+ }
+
+ private PinotFSSpec getPinotFSSpec(String scheme, String className, Map<String, String> configs) {
+ PinotFSSpec pinotFSSpec = new PinotFSSpec();
+ pinotFSSpec.setScheme(scheme);
+ pinotFSSpec.setClassName(className);
+ pinotFSSpec.setConfigs(configs);
+ return pinotFSSpec;
+ }
+
+ private Map<String, String> getExtraConfigs(List<String> extraConfigs) {
+ if (extraConfigs == null) {
+ return Collections.emptyMap();
+ }
+ Map<String, String> recordReaderConfigs = new HashMap<>();
+ for (String kvPair : extraConfigs) {
+ String[] splits = kvPair.split("=", 2);
+ if ((splits.length == 2) && (splits[0] != null) && (splits[1] != null)) {
+ recordReaderConfigs.put(splits[0], splits[1]);
+ }
+ }
+ return recordReaderConfigs;
+ }
+
+ private String getRecordReaderConfigClass(FileFormat format) {
+ switch (format) {
+ case CSV:
+ return "org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig";
+ case PROTO:
+ return "org.apache.pinot.plugin.inputformat.protobuf.ProtoBufRecordReaderConfig";
+ case THRIFT:
+ return "org.apache.pinot.plugin.inputformat.thrift.ThriftRecordReaderConfig";
+ case ORC:
+ case JSON:
+ case AVRO:
+ case GZIPPED_AVRO:
+ case PARQUET:
+ return null;
+ default:
+ throw new IllegalArgumentException("Unsupported file format - " + format);
+ }
+ }
+
+ private String getRecordReaderClass(FileFormat format) {
+ switch (format) {
+ case CSV:
+ return "org.apache.pinot.plugin.inputformat.csv.CSVRecordReader";
+ case ORC:
+ return "org.apache.pinot.plugin.inputformat.orc.ORCRecordReader";
+ case JSON:
+ return "org.apache.pinot.plugin.inputformat.json.JSONRecordReader";
+ case AVRO:
+ case GZIPPED_AVRO:
+ return "org.apache.pinot.plugin.inputformat.avro.AvroRecordReader";
+ case PARQUET:
+ return "org.apache.pinot.plugin.inputformat.parquet.ParquetRecordReader";
+ case PROTO:
+ return "org.apache.pinot.plugin.inputformat.protobuf.ProtoBufRecordReader";
+ case THRIFT:
+ return "org.apache.pinot.plugin.inputformat.thrift.ThriftRecordReader";
+ default:
+ throw new IllegalArgumentException("Unsupported file format - " + format);
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org