You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2020/12/30 07:01:57 UTC

[incubator-pinot] 01/01: Adding InsertData sub command for pinot-admin

This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch adding_InsertData_sub_command
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 5985dc5c0eeaec0516cf8cd33b251c9617cd6540
Author: Xiang Fu <fx...@gmail.com>
AuthorDate: Tue Dec 29 16:21:53 2020 -0800

    Adding InsertData sub command for pinot-admin
---
 .../helix/ControllerRequestURLBuilder.java         |   3 +
 .../batch/common/SegmentGenerationTaskRunner.java  |   7 +
 pinot-tools/pom.xml                                |  16 +-
 .../pinot/tools/admin/PinotAdministrator.java      |   2 +
 .../tools/admin/command/ImportDataCommand.java     | 340 +++++++++++++++++++++
 5 files changed, 362 insertions(+), 6 deletions(-)

diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestURLBuilder.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestURLBuilder.java
index 5d3e297..333d1e6 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestURLBuilder.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestURLBuilder.java
@@ -211,6 +211,9 @@ public class ControllerRequestURLBuilder {
     }
     return url;
   }
+  public String forTableSchemaGet(String tableName) {
+    return StringUtil.join("/", _baseUrl, "tables", tableName, "schema");
+  }
 
   public String forSchemaCreate() {
     return StringUtil.join("/", _baseUrl, "schemas");
diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationTaskRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationTaskRunner.java
index 2bd0762..7b9a5ef 100644
--- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationTaskRunner.java
+++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationTaskRunner.java
@@ -26,6 +26,7 @@ import java.util.HashMap;
 import java.util.Map;
 import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
 import org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.core.segment.name.FixedSegmentNameGenerator;
 import org.apache.pinot.core.segment.name.NormalizedDateSegmentNameGenerator;
 import org.apache.pinot.core.segment.name.SegmentNameGenerator;
 import org.apache.pinot.core.segment.name.SimpleSegmentNameGenerator;
@@ -44,9 +45,13 @@ import org.apache.pinot.spi.utils.JsonUtils;
 
 public class SegmentGenerationTaskRunner implements Serializable {
 
+  public static final String FIXED_SEGMENT_NAME_GENERATOR = "fixed";
   public static final String SIMPLE_SEGMENT_NAME_GENERATOR = "simple";
   public static final String NORMALIZED_DATE_SEGMENT_NAME_GENERATOR = "normalizedDate";
 
+  // For FixedSegmentNameGenerator
+  public static final String SEGMENT_NAME = "segment.name";
+
   // For SimpleSegmentNameGenerator
   public static final String SEGMENT_NAME_POSTFIX = "segment.name.postfix";
 
@@ -123,6 +128,8 @@ public class SegmentGenerationTaskRunner implements Serializable {
       segmentNameGeneratorConfigs = new HashMap<>();
     }
     switch (segmentNameGeneratorType) {
+      case FIXED_SEGMENT_NAME_GENERATOR:
+        return new FixedSegmentNameGenerator(segmentNameGeneratorConfigs.get(SEGMENT_NAME));
       case SIMPLE_SEGMENT_NAME_GENERATOR:
         return new SimpleSegmentNameGenerator(tableName, segmentNameGeneratorConfigs.get(SEGMENT_NAME_POSTFIX));
       case NORMALIZED_DATE_SEGMENT_NAME_GENERATOR:
diff --git a/pinot-tools/pom.xml b/pinot-tools/pom.xml
index 4dd0a16..25b6516 100644
--- a/pinot-tools/pom.xml
+++ b/pinot-tools/pom.xml
@@ -57,6 +57,11 @@
     </dependency>
     <dependency>
       <groupId>org.apache.pinot</groupId>
+      <artifactId>pinot-batch-ingestion-common</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.pinot</groupId>
       <artifactId>pinot-avro</artifactId>
       <version>${project.version}</version>
     </dependency>
@@ -67,6 +72,11 @@
     </dependency>
     <dependency>
       <groupId>org.apache.pinot</groupId>
+      <artifactId>pinot-protobuf</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.pinot</groupId>
       <artifactId>pinot-json</artifactId>
       <version>${project.version}</version>
     </dependency>
@@ -92,12 +102,6 @@
       <scope>runtime</scope>
     </dependency>
     <dependency>
-      <groupId>org.apache.pinot</groupId>
-      <artifactId>pinot-batch-ingestion-standalone</artifactId>
-      <version>${project.version}</version>
-      <scope>runtime</scope>
-    </dependency>
-    <dependency>
       <groupId>commons-cli</groupId>
       <artifactId>commons-cli</artifactId>
     </dependency>
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/PinotAdministrator.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/PinotAdministrator.java
index d1fc6df..4e0bf65 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/PinotAdministrator.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/PinotAdministrator.java
@@ -34,6 +34,7 @@ import org.apache.pinot.tools.admin.command.CreateSegmentCommand;
 import org.apache.pinot.tools.admin.command.DeleteClusterCommand;
 import org.apache.pinot.tools.admin.command.GenerateDataCommand;
 import org.apache.pinot.tools.admin.command.GitHubEventsQuickStartCommand;
+import org.apache.pinot.tools.admin.command.ImportDataCommand;
 import org.apache.pinot.tools.admin.command.LaunchDataIngestionJobCommand;
 import org.apache.pinot.tools.admin.command.MoveReplicaGroup;
 import org.apache.pinot.tools.admin.command.OfflineSegmentIntervalCheckerCommand;
@@ -98,6 +99,7 @@ public class PinotAdministrator {
       @SubCommand(name = "GenerateData", impl = GenerateDataCommand.class),
       @SubCommand(name = "LaunchDataIngestionJob", impl = LaunchDataIngestionJobCommand.class),
       @SubCommand(name = "CreateSegment", impl = CreateSegmentCommand.class),
+      @SubCommand(name = "ImportData", impl = ImportDataCommand.class),
       @SubCommand(name = "StartZookeeper", impl = StartZookeeperCommand.class),
       @SubCommand(name = "StartKafka", impl = StartKafkaCommand.class),
       @SubCommand(name = "StreamAvroIntoKafka", impl = StreamAvroIntoKafkaCommand.class),
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/ImportDataCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/ImportDataCommand.java
new file mode 100644
index 0000000..36c596f
--- /dev/null
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/ImportDataCommand.java
@@ -0,0 +1,340 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tools.admin.command;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.controller.helix.ControllerRequestURLBuilder;
+import org.apache.pinot.spi.data.readers.FileFormat;
+import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
+import org.apache.pinot.spi.ingestion.batch.IngestionJobLauncher;
+import org.apache.pinot.spi.ingestion.batch.spec.ExecutionFrameworkSpec;
+import org.apache.pinot.spi.ingestion.batch.spec.PinotClusterSpec;
+import org.apache.pinot.spi.ingestion.batch.spec.PinotFSSpec;
+import org.apache.pinot.spi.ingestion.batch.spec.PushJobSpec;
+import org.apache.pinot.spi.ingestion.batch.spec.RecordReaderSpec;
+import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
+import org.apache.pinot.spi.ingestion.batch.spec.SegmentNameGeneratorSpec;
+import org.apache.pinot.spi.ingestion.batch.spec.TableSpec;
+import org.apache.pinot.spi.utils.IngestionConfigUtils;
+import org.apache.pinot.tools.Command;
+import org.kohsuke.args4j.Option;
+import org.kohsuke.args4j.spi.StringArrayOptionHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Class to implement ImportData command.
+ */
+@SuppressWarnings("unused")
+public class ImportDataCommand extends AbstractBaseAdminCommand implements Command {
+  private static final Logger LOGGER = LoggerFactory.getLogger(ImportDataCommand.class);
+  private static final String SEGMENT_NAME = "segment.name";
+
+  @Option(name = "-dataFilePath", required = true, metaVar = "<string>", usage = "data file path.")
+  private String _dataFilePath;
+
+  @Option(name = "-format", required = true, metaVar = "<AVRO/CSV/JSON/THRIFT/PARQUET/ORC>", usage = "Input data format.")
+  private FileFormat _format;
+
+  @Option(name = "-table", required = true, metaVar = "<string>", usage = "Table name.")
+  private String _table;
+
+  @Option(name = "-controllerURI", metaVar = "<string>", usage = "Pinot Controller URI.")
+  private String _controllerURI = "http://localhost:9000";
+
+  @Option(name = "-tempDir", metaVar = "<string>", usage = "Temporary directory used to hold data during segment creation.")
+  private String _tempDir = new File(FileUtils.getTempDirectory(), getClass().getSimpleName()).getAbsolutePath();
+
+  @Option(name = "-extraConfigs", metaVar = "<extra configs>", handler = StringArrayOptionHandler.class, usage = "Extra configs to be set.")
+  private List<String> _extraConfigs;
+
+  @SuppressWarnings("FieldCanBeLocal")
+  @Option(name = "-help", help = true, aliases = {"-h", "--h", "--help"}, usage = "Print this message.")
+  private final boolean _help = false;
+
+  public ImportDataCommand setDataFilePath(String dataFilePath) {
+    _dataFilePath = dataFilePath;
+    return this;
+  }
+
+  public ImportDataCommand setFormat(FileFormat format) {
+    _format = format;
+    return this;
+  }
+
+  public ImportDataCommand setTable(String table) {
+    _table = table;
+    return this;
+  }
+
+  public ImportDataCommand setControllerURI(String controllerURI) {
+    _controllerURI = controllerURI;
+    return this;
+  }
+
+  public ImportDataCommand setTempDir(String tempDir) {
+    _tempDir = tempDir;
+    return this;
+  }
+
+  public List<String> getExtraConfigs() {
+    return _extraConfigs;
+  }
+
+  public ImportDataCommand setExtraConfigs(List<String> extraConfigs) {
+    _extraConfigs = extraConfigs;
+    return this;
+  }
+
+  public String getDataFilePath() {
+    return _dataFilePath;
+  }
+
+  public FileFormat getFormat() {
+    return _format;
+  }
+
+  public String getTable() {
+    return _table;
+  }
+
+  public String getControllerURI() {
+    return _controllerURI;
+  }
+
+  public String getTempDir() {
+    return _tempDir;
+  }
+
+  @Override
+  public String toString() {
+    String results = String
+        .format("InsertData -dataFilePath %s -format %s -table %s -controllerURI %s -tempDir %s", _dataFilePath,
+            _format, _table, _controllerURI, _tempDir);
+    if (_extraConfigs != null) {
+      results += " -extraConfigs " + Arrays.toString(_extraConfigs.toArray());
+    }
+    return results;
+  }
+
+  @Override
+  public final String getName() {
+    return "InsertData";
+  }
+
+  @Override
+  public String description() {
+    return "Insert data into Pinot cluster.";
+  }
+
+  @Override
+  public boolean getHelp() {
+    return _help;
+  }
+
+  @Override
+  public boolean execute()
+      throws IOException {
+    LOGGER.info("Executing command: {}", toString());
+    Preconditions.checkArgument(_table != null, "'table' must be specified");
+    Preconditions.checkArgument(_format != null, "'format' must be specified");
+    Preconditions.checkArgument(_dataFilePath != null, "'dataFilePath' must be specified");
+
+    try {
+
+      URI dataFileURI = URI.create(_dataFilePath);
+      if ((dataFileURI.getScheme() == null)) {
+        File dataFile = new File(_dataFilePath);
+        Preconditions.checkArgument(dataFile.exists(), "'dataFile': '%s' doesn't exist", dataFile);
+        LOGGER.info("Found data files: {} of format: {}", dataFile, _format);
+      }
+
+      initTempDir();
+      IngestionJobLauncher.runIngestionJob(generateSegmentGenerationJobSpec());
+      LOGGER.info("Successfully load data from {} to Pinot.", _dataFilePath);
+      return true;
+    } catch (Exception e) {
+      throw e;
+    } finally {
+      FileUtils.deleteQuietly(new File(_tempDir));
+    }
+  }
+
+  private void initTempDir()
+      throws IOException {
+    File tempDir = new File(_tempDir);
+    if (tempDir.exists()) {
+      LOGGER.info("Deleting the existing 'tempDir': {}", tempDir);
+      FileUtils.forceDelete(tempDir);
+    }
+    FileUtils.forceMkdir(tempDir);
+  }
+
+  private SegmentGenerationJobSpec generateSegmentGenerationJobSpec() {
+    final Map<String, String> extraConfigs = getExtraConfigs(_extraConfigs);
+
+    SegmentGenerationJobSpec spec = new SegmentGenerationJobSpec();
+    URI dataFileURI = URI.create(_dataFilePath);
+    URI parent = dataFileURI.getPath().endsWith("/") ? dataFileURI.resolve("..") : dataFileURI.resolve(".");
+    spec.setInputDirURI(parent.toString());
+    spec.setIncludeFileNamePattern("glob:**" + dataFileURI.getPath());
+    spec.setOutputDirURI(_tempDir);
+    spec.setCleanUpOutputDir(true);
+    spec.setOverwriteOutput(true);
+    spec.setJobType("SegmentCreationAndTarPush");
+
+    // set ExecutionFrameworkSpec
+    ExecutionFrameworkSpec executionFrameworkSpec = new ExecutionFrameworkSpec();
+    executionFrameworkSpec.setName("standalone");
+    executionFrameworkSpec.setSegmentGenerationJobRunnerClassName(
+        "org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner");
+    executionFrameworkSpec.setSegmentTarPushJobRunnerClassName(
+        "org.apache.pinot.plugin.ingestion.batch.standalone.SegmentTarPushJobRunner");
+    spec.setExecutionFrameworkSpec(executionFrameworkSpec);
+
+    // set PinotFSSpecs
+    List<PinotFSSpec> pinotFSSpecs = new ArrayList<>();
+    pinotFSSpecs.add(getPinotFSSpec("file", "org.apache.pinot.spi.filesystem.LocalPinotFS", Collections.emptyMap()));
+    pinotFSSpecs
+        .add(getPinotFSSpec("s3", "org.apache.pinot.plugin.filesystem.S3PinotFS", getS3PinotFSConfigs(extraConfigs)));
+    spec.setPinotFSSpecs(pinotFSSpecs);
+
+    // set RecordReaderSpec
+    RecordReaderSpec recordReaderSpec = new RecordReaderSpec();
+    recordReaderSpec.setDataFormat(_format.name());
+    recordReaderSpec.setClassName(getRecordReaderClass(_format));
+    recordReaderSpec.setConfigClassName(getRecordReaderConfigClass(_format));
+    recordReaderSpec.setConfigs(IngestionConfigUtils.getRecordReaderProps(extraConfigs));
+    spec.setRecordReaderSpec(recordReaderSpec);
+
+    // set TableSpec
+    TableSpec tableSpec = new TableSpec();
+    tableSpec.setTableName(_table);
+    tableSpec.setSchemaURI(ControllerRequestURLBuilder.baseUrl(_controllerURI).forTableSchemaGet(_table));
+    tableSpec.setTableConfigURI(ControllerRequestURLBuilder.baseUrl(_controllerURI).forTableGet(_table));
+    spec.setTableSpec(tableSpec);
+
+    // set SegmentNameGeneratorSpec
+    SegmentNameGeneratorSpec segmentNameGeneratorSpec = new SegmentNameGeneratorSpec();
+    segmentNameGeneratorSpec
+        .setType(org.apache.pinot.spi.ingestion.batch.BatchConfigProperties.SegmentNameGeneratorType.FIXED);
+    String segmentName = (extraConfigs.containsKey(SEGMENT_NAME)) ? extraConfigs.get(SEGMENT_NAME)
+        : String.format("%s_%s", _table, DigestUtils.sha256Hex(_dataFilePath));
+    segmentNameGeneratorSpec.setConfigs(ImmutableMap.of(SEGMENT_NAME, segmentName));
+    spec.setSegmentNameGeneratorSpec(segmentNameGeneratorSpec);
+
+    // set PinotClusterSpecs
+    PinotClusterSpec pinotClusterSpec = new PinotClusterSpec();
+    pinotClusterSpec.setControllerURI(_controllerURI);
+    PinotClusterSpec[] pinotClusterSpecs = new PinotClusterSpec[]{pinotClusterSpec};
+    spec.setPinotClusterSpecs(pinotClusterSpecs);
+
+    // set PushJobSpec
+    PushJobSpec pushJobSpec = new PushJobSpec();
+    pushJobSpec.setPushAttempts(3);
+    pushJobSpec.setPushRetryIntervalMillis(10000);
+    spec.setPushJobSpec(pushJobSpec);
+
+    return spec;
+  }
+
+  private Map<String, String> getS3PinotFSConfigs(Map<String, String> extraConfigs) {
+    Map<String, String> s3PinotFSConfigs = new HashMap<>();
+    s3PinotFSConfigs.put("region", System.getProperty("AWS_REGION", "us-west-2"));
+    s3PinotFSConfigs.putAll(IngestionConfigUtils.getConfigMapWithPrefix(extraConfigs,
+        BatchConfigProperties.INPUT_FS_PROP_PREFIX + IngestionConfigUtils.DOT_SEPARATOR));
+    return s3PinotFSConfigs;
+  }
+
+  private PinotFSSpec getPinotFSSpec(String scheme, String className, Map<String, String> configs) {
+    PinotFSSpec pinotFSSpec = new PinotFSSpec();
+    pinotFSSpec.setScheme(scheme);
+    pinotFSSpec.setClassName(className);
+    pinotFSSpec.setConfigs(configs);
+    return pinotFSSpec;
+  }
+
+  private Map<String, String> getExtraConfigs(List<String> extraConfigs) {
+    if (extraConfigs == null) {
+      return Collections.emptyMap();
+    }
+    Map<String, String> recordReaderConfigs = new HashMap<>();
+    for (String kvPair : extraConfigs) {
+      String[] splits = kvPair.split("=", 2);
+      if ((splits.length == 2) && (splits[0] != null) && (splits[1] != null)) {
+        recordReaderConfigs.put(splits[0], splits[1]);
+      }
+    }
+    return recordReaderConfigs;
+  }
+
+  private String getRecordReaderConfigClass(FileFormat format) {
+    switch (format) {
+      case CSV:
+        return "org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig";
+      case PROTO:
+        return "org.apache.pinot.plugin.inputformat.protobuf.ProtoBufRecordReaderConfig";
+      case THRIFT:
+        return "org.apache.pinot.plugin.inputformat.thrift.ThriftRecordReaderConfig";
+      case ORC:
+      case JSON:
+      case AVRO:
+      case GZIPPED_AVRO:
+      case PARQUET:
+        return null;
+      default:
+        throw new IllegalArgumentException("Unsupported file format - " + format);
+    }
+  }
+
+  private String getRecordReaderClass(FileFormat format) {
+    switch (format) {
+      case CSV:
+        return "org.apache.pinot.plugin.inputformat.csv.CSVRecordReader";
+      case ORC:
+        return "org.apache.pinot.plugin.inputformat.orc.ORCRecordReader";
+      case JSON:
+        return "org.apache.pinot.plugin.inputformat.json.JSONRecordReader";
+      case AVRO:
+      case GZIPPED_AVRO:
+        return "org.apache.pinot.plugin.inputformat.avro.AvroRecordReader";
+      case PARQUET:
+        return "org.apache.pinot.plugin.inputformat.parquet.ParquetRecordReader";
+      case PROTO:
+        return "org.apache.pinot.plugin.inputformat.protobuf.ProtoBufRecordReader";
+      case THRIFT:
+        return "org.apache.pinot.plugin.inputformat.thrift.ThriftRecordReader";
+      default:
+        throw new IllegalArgumentException("Unsupported file format - " + format);
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org