You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2021/09/21 23:07:24 UTC

[pinot] branch master updated: Update some usage of BatchConfig (#7459)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 6976697  Update some usage of BatchConfig (#7459)
6976697 is described below

commit 6976697aeaa3aa30377f958181eb4f2ba6399fd4
Author: Rong Rong <wa...@gmail.com>
AuthorDate: Tue Sep 21 16:07:09 2021 -0700

    Update some usage of BatchConfig (#7459)
---
 .../resources/PinotIngestionRestletResource.java   |  4 +--
 .../pinot/controller/util/FileIngestionHelper.java | 31 +++++++++++-----------
 .../org/apache/pinot/tools/BootstrapTableTool.java |  8 +++---
 3 files changed, 20 insertions(+), 23 deletions(-)

diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotIngestionRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotIngestionRestletResource.java
index 7c0d882..f35ca49 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotIngestionRestletResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotIngestionRestletResource.java
@@ -47,7 +47,6 @@ import org.apache.pinot.controller.util.FileIngestionHelper.DataPayload;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.ingestion.batch.BatchConfig;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
@@ -193,11 +192,10 @@ public class PinotIngestionRestletResource {
     Map<String, String> batchConfigMap =
         JsonUtils.stringToObject(batchConfigMapStr, new TypeReference<Map<String, String>>() {
         });
-    BatchConfig batchConfig = new BatchConfig(tableNameWithType, batchConfigMap);
     Schema schema = _pinotHelixResourceManager.getTableSchema(tableNameWithType);
 
     FileIngestionHelper fileIngestionHelper =
-        new FileIngestionHelper(tableConfig, schema, batchConfig, getControllerUri(),
+        new FileIngestionHelper(tableConfig, schema, batchConfigMap, getControllerUri(),
             new File(_controllerConf.getDataDir(), UPLOAD_DIR), getAuthToken());
     return fileIngestionHelper.buildSegmentAndPush(payload);
   }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/util/FileIngestionHelper.java b/pinot-controller/src/main/java/org/apache/pinot/controller/util/FileIngestionHelper.java
index dfe5816..4268ea5 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/util/FileIngestionHelper.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/util/FileIngestionHelper.java
@@ -41,7 +41,6 @@ import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig;
 import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.filesystem.PinotFSFactory;
-import org.apache.pinot.spi.ingestion.batch.BatchConfig;
 import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
 import org.apache.pinot.spi.ingestion.segment.uploader.SegmentUploader;
 import org.apache.pinot.spi.plugin.PluginManager;
@@ -69,16 +68,16 @@ public class FileIngestionHelper {
 
   private final TableConfig _tableConfig;
   private final Schema _schema;
-  private final BatchConfig _batchConfig;
+  private final Map<String, String> _batchConfigMap;
   private final URI _controllerUri;
   private final File _uploadDir;
   private final AuthContext _authContext;
 
-  public FileIngestionHelper(TableConfig tableConfig, Schema schema, BatchConfig batchConfig, URI controllerUri,
-      File uploadDir, String authToken) {
+  public FileIngestionHelper(TableConfig tableConfig, Schema schema, Map<String, String> batchConfigMap,
+      URI controllerUri, File uploadDir, String authToken) {
     _tableConfig = tableConfig;
     _schema = schema;
-    _batchConfig = batchConfig;
+    _batchConfigMap = batchConfigMap;
     _controllerUri = controllerUri;
     _uploadDir = uploadDir;
     _authContext = new AuthContext(authToken);
@@ -100,15 +99,16 @@ public class FileIngestionHelper {
     File outputDir = new File(workingDir, OUTPUT_SEGMENT_DIR);
     File segmentTarDir = new File(workingDir, SEGMENT_TAR_DIR);
     try {
-      Preconditions
-          .checkState(inputDir.mkdirs(), "Could not create directory for downloading input file locally: %s", inputDir);
-      Preconditions.checkState(segmentTarDir.mkdirs(), "Could not create directory for segment tar file: %s", inputDir);
+      Preconditions.checkState(inputDir.mkdirs(),
+          "Could not create directory for downloading input file locally: %s", inputDir);
+      Preconditions.checkState(segmentTarDir.mkdirs(),
+          "Could not create directory for segment tar file: %s", inputDir);
 
       // Copy file to local working dir
-      File inputFile = new File(inputDir,
-          String.format("%s.%s", DATA_FILE_PREFIX, _batchConfig.getInputFormat().toString().toLowerCase()));
+      File inputFile = new File(inputDir, String.format(
+          "%s.%s", DATA_FILE_PREFIX, _batchConfigMap.get(BatchConfigProperties.INPUT_DIR_URI).toLowerCase()));
       if (payload._payloadType == PayloadType.URI) {
-        copyURIToLocal(_batchConfig, payload._uri, inputFile);
+        copyURIToLocal(_batchConfigMap, payload._uri, inputFile);
         LOGGER.info("Copied from URI: {} to local file: {}", payload._uri, inputFile.getAbsolutePath());
       } else {
         copyMultipartToLocal(payload._multiPart, inputFile);
@@ -116,7 +116,7 @@ public class FileIngestionHelper {
       }
 
       // Update batch config map with values for file upload
-      Map<String, String> batchConfigMapOverride = new HashMap<>(_batchConfig.getBatchConfigMap());
+      Map<String, String> batchConfigMapOverride = new HashMap<>(_batchConfigMap);
       batchConfigMapOverride.put(BatchConfigProperties.INPUT_DIR_URI, inputFile.getAbsolutePath());
       batchConfigMapOverride.put(BatchConfigProperties.OUTPUT_DIR_URI, outputDir.getAbsolutePath());
       batchConfigMapOverride.put(BatchConfigProperties.PUSH_CONTROLLER_URI, _controllerUri.toString());
@@ -170,12 +170,13 @@ public class FileIngestionHelper {
   /**
    * Copy the file from given URI to local file
    */
-  public static void copyURIToLocal(BatchConfig batchConfig, URI sourceFileURI, File destFile)
+  public static void copyURIToLocal(Map<String, String> batchConfigMap, URI sourceFileURI, File destFile)
       throws Exception {
     String sourceFileURIScheme = sourceFileURI.getScheme();
     if (!PinotFSFactory.isSchemeSupported(sourceFileURIScheme)) {
-      PinotFSFactory.register(sourceFileURIScheme, batchConfig.getInputFsClassName(),
-          IngestionConfigUtils.getInputFsProps(batchConfig.getInputFsProps()));
+      PinotFSFactory.register(sourceFileURIScheme, batchConfigMap.get(BatchConfigProperties.INPUT_FS_CLASS),
+          IngestionConfigUtils.getInputFsProps(IngestionConfigUtils.getConfigMapWithPrefix(
+              batchConfigMap, BatchConfigProperties.INPUT_FS_PROP_PREFIX)));
     }
     PinotFSFactory.create(sourceFileURIScheme).copyToLocalFile(sourceFileURI, destFile);
   }
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/BootstrapTableTool.java b/pinot-tools/src/main/java/org/apache/pinot/tools/BootstrapTableTool.java
index 00eb71f..0534dca 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/BootstrapTableTool.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/BootstrapTableTool.java
@@ -34,7 +34,6 @@ import org.apache.pinot.common.minion.MinionClient;
 import org.apache.pinot.core.common.MinionConstants;
 import org.apache.pinot.core.util.TlsUtils;
 import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.ingestion.batch.BatchConfig;
 import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
 import org.apache.pinot.spi.ingestion.batch.IngestionJobLauncher;
 import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
@@ -128,7 +127,7 @@ public class BootstrapTableTool {
         JsonUtils.inputStreamToObject(new FileInputStream(offlineTableConfigFile), TableConfig.class);
     if (tableConfig.getIngestionConfig() != null
         && tableConfig.getIngestionConfig().getBatchIngestionConfig() != null) {
-      updatedTableConfig(tableConfig, tableName, setupTableTmpDir);
+      updatedTableConfig(tableConfig, setupTableTmpDir);
     }
 
     LOGGER.info("Adding offline table: {}", tableName);
@@ -190,13 +189,12 @@ public class BootstrapTableTool {
     return true;
   }
 
-  private void updatedTableConfig(TableConfig tableConfig, String tableName, File setupTableTmpDir)
+  private void updatedTableConfig(TableConfig tableConfig, File setupTableTmpDir)
       throws Exception {
     final List<Map<String, String>> batchConfigsMaps =
         tableConfig.getIngestionConfig().getBatchIngestionConfig().getBatchConfigMaps();
     for (Map<String, String> batchConfigsMap : batchConfigsMaps) {
-      BatchConfig batchConfig = new BatchConfig(TableNameBuilder.OFFLINE.tableNameWithType(tableName), batchConfigsMap);
-      String inputDirURI = batchConfig.getInputDirURI();
+      String inputDirURI = batchConfigsMap.get(BatchConfigProperties.INPUT_DIR_URI);
       if (!new File(inputDirURI).exists()) {
         URL resolvedInputDirURI = BootstrapTableTool.class.getClassLoader().getResource(inputDirURI);
         if (resolvedInputDirURI != null) {

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org