You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2021/09/21 23:07:24 UTC
[pinot] branch master updated: Update some usage of BatchConfig
(#7459)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 6976697 Update some usage of BatchConfig (#7459)
6976697 is described below
commit 6976697aeaa3aa30377f958181eb4f2ba6399fd4
Author: Rong Rong <wa...@gmail.com>
AuthorDate: Tue Sep 21 16:07:09 2021 -0700
Update some usage of BatchConfig (#7459)
---
.../resources/PinotIngestionRestletResource.java | 4 +--
.../pinot/controller/util/FileIngestionHelper.java | 31 +++++++++++-----------
.../org/apache/pinot/tools/BootstrapTableTool.java | 8 +++---
3 files changed, 20 insertions(+), 23 deletions(-)
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotIngestionRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotIngestionRestletResource.java
index 7c0d882..f35ca49 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotIngestionRestletResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotIngestionRestletResource.java
@@ -47,7 +47,6 @@ import org.apache.pinot.controller.util.FileIngestionHelper.DataPayload;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.ingestion.batch.BatchConfig;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
@@ -193,11 +192,10 @@ public class PinotIngestionRestletResource {
Map<String, String> batchConfigMap =
JsonUtils.stringToObject(batchConfigMapStr, new TypeReference<Map<String, String>>() {
});
- BatchConfig batchConfig = new BatchConfig(tableNameWithType, batchConfigMap);
Schema schema = _pinotHelixResourceManager.getTableSchema(tableNameWithType);
FileIngestionHelper fileIngestionHelper =
- new FileIngestionHelper(tableConfig, schema, batchConfig, getControllerUri(),
+ new FileIngestionHelper(tableConfig, schema, batchConfigMap, getControllerUri(),
new File(_controllerConf.getDataDir(), UPLOAD_DIR), getAuthToken());
return fileIngestionHelper.buildSegmentAndPush(payload);
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/util/FileIngestionHelper.java b/pinot-controller/src/main/java/org/apache/pinot/controller/util/FileIngestionHelper.java
index dfe5816..4268ea5 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/util/FileIngestionHelper.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/util/FileIngestionHelper.java
@@ -41,7 +41,6 @@ import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig;
import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
-import org.apache.pinot.spi.ingestion.batch.BatchConfig;
import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
import org.apache.pinot.spi.ingestion.segment.uploader.SegmentUploader;
import org.apache.pinot.spi.plugin.PluginManager;
@@ -69,16 +68,16 @@ public class FileIngestionHelper {
private final TableConfig _tableConfig;
private final Schema _schema;
- private final BatchConfig _batchConfig;
+ private final Map<String, String> _batchConfigMap;
private final URI _controllerUri;
private final File _uploadDir;
private final AuthContext _authContext;
- public FileIngestionHelper(TableConfig tableConfig, Schema schema, BatchConfig batchConfig, URI controllerUri,
- File uploadDir, String authToken) {
+ public FileIngestionHelper(TableConfig tableConfig, Schema schema, Map<String, String> batchConfigMap,
+ URI controllerUri, File uploadDir, String authToken) {
_tableConfig = tableConfig;
_schema = schema;
- _batchConfig = batchConfig;
+ _batchConfigMap = batchConfigMap;
_controllerUri = controllerUri;
_uploadDir = uploadDir;
_authContext = new AuthContext(authToken);
@@ -100,15 +99,16 @@ public class FileIngestionHelper {
File outputDir = new File(workingDir, OUTPUT_SEGMENT_DIR);
File segmentTarDir = new File(workingDir, SEGMENT_TAR_DIR);
try {
- Preconditions
- .checkState(inputDir.mkdirs(), "Could not create directory for downloading input file locally: %s", inputDir);
- Preconditions.checkState(segmentTarDir.mkdirs(), "Could not create directory for segment tar file: %s", inputDir);
+ Preconditions.checkState(inputDir.mkdirs(),
+ "Could not create directory for downloading input file locally: %s", inputDir);
+ Preconditions.checkState(segmentTarDir.mkdirs(),
+ "Could not create directory for segment tar file: %s", inputDir);
// Copy file to local working dir
- File inputFile = new File(inputDir,
- String.format("%s.%s", DATA_FILE_PREFIX, _batchConfig.getInputFormat().toString().toLowerCase()));
+ File inputFile = new File(inputDir, String.format(
+ "%s.%s", DATA_FILE_PREFIX, _batchConfigMap.get(BatchConfigProperties.INPUT_DIR_URI).toLowerCase()));
if (payload._payloadType == PayloadType.URI) {
- copyURIToLocal(_batchConfig, payload._uri, inputFile);
+ copyURIToLocal(_batchConfigMap, payload._uri, inputFile);
LOGGER.info("Copied from URI: {} to local file: {}", payload._uri, inputFile.getAbsolutePath());
} else {
copyMultipartToLocal(payload._multiPart, inputFile);
@@ -116,7 +116,7 @@ public class FileIngestionHelper {
}
// Update batch config map with values for file upload
- Map<String, String> batchConfigMapOverride = new HashMap<>(_batchConfig.getBatchConfigMap());
+ Map<String, String> batchConfigMapOverride = new HashMap<>(_batchConfigMap);
batchConfigMapOverride.put(BatchConfigProperties.INPUT_DIR_URI, inputFile.getAbsolutePath());
batchConfigMapOverride.put(BatchConfigProperties.OUTPUT_DIR_URI, outputDir.getAbsolutePath());
batchConfigMapOverride.put(BatchConfigProperties.PUSH_CONTROLLER_URI, _controllerUri.toString());
@@ -170,12 +170,13 @@ public class FileIngestionHelper {
/**
* Copy the file from given URI to local file
*/
- public static void copyURIToLocal(BatchConfig batchConfig, URI sourceFileURI, File destFile)
+ public static void copyURIToLocal(Map<String, String> batchConfigMap, URI sourceFileURI, File destFile)
throws Exception {
String sourceFileURIScheme = sourceFileURI.getScheme();
if (!PinotFSFactory.isSchemeSupported(sourceFileURIScheme)) {
- PinotFSFactory.register(sourceFileURIScheme, batchConfig.getInputFsClassName(),
- IngestionConfigUtils.getInputFsProps(batchConfig.getInputFsProps()));
+ PinotFSFactory.register(sourceFileURIScheme, batchConfigMap.get(BatchConfigProperties.INPUT_FS_CLASS),
+ IngestionConfigUtils.getInputFsProps(IngestionConfigUtils.getConfigMapWithPrefix(
+ batchConfigMap, BatchConfigProperties.INPUT_FS_PROP_PREFIX)));
}
PinotFSFactory.create(sourceFileURIScheme).copyToLocalFile(sourceFileURI, destFile);
}
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/BootstrapTableTool.java b/pinot-tools/src/main/java/org/apache/pinot/tools/BootstrapTableTool.java
index 00eb71f..0534dca 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/BootstrapTableTool.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/BootstrapTableTool.java
@@ -34,7 +34,6 @@ import org.apache.pinot.common.minion.MinionClient;
import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.core.util.TlsUtils;
import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.ingestion.batch.BatchConfig;
import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
import org.apache.pinot.spi.ingestion.batch.IngestionJobLauncher;
import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
@@ -128,7 +127,7 @@ public class BootstrapTableTool {
JsonUtils.inputStreamToObject(new FileInputStream(offlineTableConfigFile), TableConfig.class);
if (tableConfig.getIngestionConfig() != null
&& tableConfig.getIngestionConfig().getBatchIngestionConfig() != null) {
- updatedTableConfig(tableConfig, tableName, setupTableTmpDir);
+ updatedTableConfig(tableConfig, setupTableTmpDir);
}
LOGGER.info("Adding offline table: {}", tableName);
@@ -190,13 +189,12 @@ public class BootstrapTableTool {
return true;
}
- private void updatedTableConfig(TableConfig tableConfig, String tableName, File setupTableTmpDir)
+ private void updatedTableConfig(TableConfig tableConfig, File setupTableTmpDir)
throws Exception {
final List<Map<String, String>> batchConfigsMaps =
tableConfig.getIngestionConfig().getBatchIngestionConfig().getBatchConfigMaps();
for (Map<String, String> batchConfigsMap : batchConfigsMaps) {
- BatchConfig batchConfig = new BatchConfig(TableNameBuilder.OFFLINE.tableNameWithType(tableName), batchConfigsMap);
- String inputDirURI = batchConfig.getInputDirURI();
+ String inputDirURI = batchConfigsMap.get(BatchConfigProperties.INPUT_DIR_URI);
if (!new File(inputDirURI).exists()) {
URL resolvedInputDirURI = BootstrapTableTool.class.getClassLoader().getResource(inputDirURI);
if (resolvedInputDirURI != null) {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org