You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2020/12/14 19:45:38 UTC
[incubator-pinot] branch master updated: simplify batch config and
corresponding utils (#6332)
This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 4183ffe simplify batch config and corresponding utils (#6332)
4183ffe is described below
commit 4183ffe71c19944312a99555392033cbd7481ac4
Author: Xiang Fu <fx...@gmail.com>
AuthorDate: Mon Dec 14 11:45:04 2020 -0800
simplify batch config and corresponding utils (#6332)
---
.../pinot/core/util/TableConfigUtilsTest.java | 19 ++-
.../pinot/spi/filesystem/PinotFSFactory.java | 2 +-
.../pinot/spi/ingestion/batch/BatchConfig.java | 83 ++++++-------
.../spi/ingestion/batch/BatchConfigProperties.java | 49 +++++---
.../pinot/spi/utils/IngestionConfigUtils.java | 42 ++++++-
.../pinot/spi/ingestion/batch/BatchConfigTest.java | 133 +++++----------------
6 files changed, 145 insertions(+), 183 deletions(-)
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/util/TableConfigUtilsTest.java b/pinot-core/src/test/java/org/apache/pinot/core/util/TableConfigUtilsTest.java
index ae313c0..f269cae 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/util/TableConfigUtilsTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/util/TableConfigUtilsTest.java
@@ -377,16 +377,12 @@ public class TableConfigUtilsTest {
@Test
public void ingestionBatchConfigsTest() {
Map<String, String> batchConfigMap = new HashMap<>();
- batchConfigMap.put(BatchConfigProperties.BATCH_TYPE, "s3");
- batchConfigMap
- .put(BatchConfigProperties.constructBatchProperty("s3", BatchConfigProperties.INPUT_DIR_URI), "s3://foo");
- batchConfigMap
- .put(BatchConfigProperties.constructBatchProperty("s3", BatchConfigProperties.OUTPUT_DIR_URI), "s3://bar");
- batchConfigMap
- .put(BatchConfigProperties.constructBatchProperty("s3", BatchConfigProperties.FS_CLASS), "org.foo.S3FS");
- batchConfigMap.put(BatchConfigProperties.constructBatchProperty("s3", BatchConfigProperties.INPUT_FORMAT), "avro");
- batchConfigMap.put(BatchConfigProperties.constructBatchProperty("s3", BatchConfigProperties.RECORD_READER_CLASS),
- "org.foo.Reader");
+ batchConfigMap.put(BatchConfigProperties.INPUT_DIR_URI, "s3://foo");
+ batchConfigMap.put(BatchConfigProperties.OUTPUT_DIR_URI, "gs://bar");
+ batchConfigMap.put(BatchConfigProperties.INPUT_FS_CLASS, "org.foo.S3FS");
+ batchConfigMap.put(BatchConfigProperties.OUTPUT_FS_CLASS, "org.foo.GcsFS");
+ batchConfigMap.put(BatchConfigProperties.INPUT_FORMAT, "avro");
+ batchConfigMap.put(BatchConfigProperties.RECORD_READER_CLASS, "org.foo.Reader");
IngestionConfig ingestionConfig =
new IngestionConfig(new BatchIngestionConfig(Lists.newArrayList(batchConfigMap, batchConfigMap), null, null),
@@ -395,8 +391,7 @@ public class TableConfigUtilsTest {
new TableConfigBuilder(TableType.OFFLINE).setTableName("myTable_OFFLINE").setIngestionConfig(ingestionConfig)
.build();
TableConfigUtils.validateIngestionConfig(tableConfig, null);
-
- batchConfigMap.remove(BatchConfigProperties.BATCH_TYPE);
+ batchConfigMap.remove(BatchConfigProperties.INPUT_FORMAT);
try {
TableConfigUtils.validateIngestionConfig(tableConfig, null);
Assert.fail("Should fail for invalid batch config map");
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/PinotFSFactory.java b/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/PinotFSFactory.java
index b163f5c..6366d95 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/PinotFSFactory.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/PinotFSFactory.java
@@ -38,8 +38,8 @@ public class PinotFSFactory {
private PinotFSFactory() {
}
+ public static final String LOCAL_PINOT_FS_SCHEME = "file";
private static final Logger LOGGER = LoggerFactory.getLogger(PinotFSFactory.class);
- private static final String LOCAL_PINOT_FS_SCHEME = "file";
private static final String CLASS = "class";
private static final Map<String, PinotFS> PINOT_FS_MAP = new HashMap<String, PinotFS>() {
{
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfig.java
index 965eb9a..780f711 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfig.java
@@ -28,62 +28,43 @@ import org.apache.pinot.spi.data.readers.FileFormat;
* Provides all config related to the batch data source, as configured in the table config's ingestion config
*/
public class BatchConfig {
+ private final Map<String, String> _batchConfigMap;
+
private final String _tableNameWithType;
- private final String _type;
private final String _inputDirURI;
private final String _outputDirURI;
- private final String _fsClassName;
- private final Map<String, String> _fsProps = new HashMap<>();
+ private final String _inputFsClassName;
+ private final Map<String, String> _inputFsProps = new HashMap<>();
+ private final String _outputFsClassName;
+ private final Map<String, String> _outputFsProps = new HashMap<>();
private final FileFormat _inputFormat;
private final String _recordReaderClassName;
private final String _recordReaderConfigClassName;
private final Map<String, String> _recordReaderProps = new HashMap<>();
- private final Map<String, String> _batchConfigMap = new HashMap<>();
-
public BatchConfig(String tableNameWithType, Map<String, String> batchConfigsMap) {
+ _batchConfigMap = batchConfigsMap;
_tableNameWithType = tableNameWithType;
-
- _type = batchConfigsMap.get(BatchConfigProperties.BATCH_TYPE);
- Preconditions.checkState(_type != null, "Property: %s cannot be null", BatchConfigProperties.BATCH_TYPE);
-
- String inputDirURIKey = BatchConfigProperties.constructBatchProperty(_type, BatchConfigProperties.INPUT_DIR_URI);
- _inputDirURI = batchConfigsMap.get(inputDirURIKey);
- Preconditions.checkState(_inputDirURI != null, "Property: %s cannot be null", inputDirURIKey);
-
- String outputDirURIKey = BatchConfigProperties.constructBatchProperty(_type, BatchConfigProperties.OUTPUT_DIR_URI);
- _outputDirURI = batchConfigsMap.get(outputDirURIKey);
- Preconditions.checkState(_outputDirURI != null, "Property: %s cannot be null", outputDirURIKey);
-
- String fsClassNameKey = BatchConfigProperties.constructBatchProperty(_type, BatchConfigProperties.FS_CLASS);
- _fsClassName = batchConfigsMap.get(fsClassNameKey);
- Preconditions.checkState(_fsClassName != null, "Property: %s cannot be null", fsClassNameKey);
-
- String inputFormatKey = BatchConfigProperties.constructBatchProperty(_type, BatchConfigProperties.INPUT_FORMAT);
- String inputFormat = batchConfigsMap.get(inputFormatKey);
- Preconditions.checkState(inputFormat != null, "Property: %s cannot be null", inputFormat);
+ String inputFormat = batchConfigsMap.get(BatchConfigProperties.INPUT_FORMAT);
+ Preconditions.checkState(inputFormat != null, "Property: %s cannot be null", BatchConfigProperties.INPUT_FORMAT);
_inputFormat = FileFormat.valueOf(inputFormat.toUpperCase());
-
- String recordReaderClassNameKey =
- BatchConfigProperties.constructBatchProperty(_type, BatchConfigProperties.RECORD_READER_CLASS);
- _recordReaderClassName = batchConfigsMap.get(recordReaderClassNameKey);
- Preconditions.checkState(_recordReaderClassName != null, "Property: %s cannot be null", recordReaderClassNameKey);
-
- String recordReaderConfigClassNameKey =
- BatchConfigProperties.constructBatchProperty(_type, BatchConfigProperties.RECORD_READER_CONFIG_CLASS);
- _recordReaderConfigClassName = batchConfigsMap.get(recordReaderConfigClassNameKey);
-
- String fsPropPrefix = BatchConfigProperties.constructBatchProperty(_type, BatchConfigProperties.FS_PROP_PREFIX);
- String recordReaderPropPrefix =
- BatchConfigProperties.constructBatchProperty(_type, BatchConfigProperties.RECORD_READER_PROP_PREFIX);
+ _inputDirURI = batchConfigsMap.get(BatchConfigProperties.INPUT_DIR_URI);
+ _inputFsClassName = batchConfigsMap.get(BatchConfigProperties.INPUT_FS_CLASS);
+ _outputDirURI = batchConfigsMap.get(BatchConfigProperties.OUTPUT_DIR_URI);
+ _outputFsClassName = batchConfigsMap.get(BatchConfigProperties.OUTPUT_FS_CLASS);
+ _recordReaderClassName = batchConfigsMap.get(BatchConfigProperties.RECORD_READER_CLASS);
+ _recordReaderConfigClassName = batchConfigsMap.get(BatchConfigProperties.RECORD_READER_CONFIG_CLASS);
for (Map.Entry<String, String> entry : batchConfigsMap.entrySet()) {
String key = entry.getKey();
- if (key.startsWith(fsPropPrefix)) {
- _fsProps.put(key, entry.getValue());
- } else if (key.startsWith(recordReaderPropPrefix)) {
+ if (key.startsWith(BatchConfigProperties.INPUT_FS_PROP_PREFIX)) {
+ _inputFsProps.put(key, entry.getValue());
+ }
+ if (key.startsWith(BatchConfigProperties.OUTPUT_FS_PROP_PREFIX)) {
+ _outputFsProps.put(key, entry.getValue());
+ }
+ if (key.startsWith(BatchConfigProperties.RECORD_READER_PROP_PREFIX)) {
_recordReaderProps.put(key, entry.getValue());
}
- _batchConfigMap.put(key, entry.getValue());
}
}
@@ -91,10 +72,6 @@ public class BatchConfig {
return _tableNameWithType;
}
- public String getType() {
- return _type;
- }
-
public String getInputDirURI() {
return _inputDirURI;
}
@@ -103,12 +80,20 @@ public class BatchConfig {
return _outputDirURI;
}
- public String getFsClassName() {
- return _fsClassName;
+ public String getInputFsClassName() {
+ return _inputFsClassName;
+ }
+
+ public Map<String, String> getInputFsProps() {
+ return _inputFsProps;
+ }
+
+ public String getOutputFsClassName() {
+ return _outputFsClassName;
}
- public Map<String, String> getFsProps() {
- return _fsProps;
+ public Map<String, String> getOutputFsProps() {
+ return _outputFsProps;
}
public FileFormat getInputFormat() {
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfigProperties.java b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfigProperties.java
index 5d276aa..2d83335 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfigProperties.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfigProperties.java
@@ -18,33 +18,52 @@
*/
package org.apache.pinot.spi.ingestion.batch;
-import org.apache.commons.lang3.StringUtils;
-
-
/**
* Defines all the keys used in the batch configs map
*/
public class BatchConfigProperties {
- public static final String DOT_SEPARATOR = ".";
- public static final String BATCH_PREFIX = "batch";
+ public static final String TABLE_CONFIGS = "tableConfigs";
+ public static final String TABLE_NAME = "tableName";
- public static final String BATCH_TYPE = "batchType";
public static final String INPUT_DIR_URI = "inputDirURI";
public static final String OUTPUT_DIR_URI = "outputDirURI";
- public static final String FS_CLASS = "fs.className";
- public static final String FS_PROP_PREFIX = "fs.prop";
+ public static final String INPUT_FS_CLASS = "input.fs.className";
+ public static final String INPUT_FS_PROP_PREFIX = "input.fs.prop";
+ public static final String OUTPUT_FS_CLASS = "output.fs.className";
+ public static final String OUTPUT_FS_PROP_PREFIX = "output.fs.prop";
public static final String INPUT_FORMAT = "inputFormat";
+ public static final String INCLUDE_FILE_NAME_PATTERN = "includeFileNamePattern";
+ public static final String EXCLUDE_FILE_NAME_PATTERN = "excludeFileNamePattern";
public static final String RECORD_READER_CLASS = "recordReader.className";
- public static final String RECORD_READER_CONFIG_CLASS = "recordReader.config.className";
+ public static final String RECORD_READER_CONFIG_CLASS = "recordReader.configClassName";
public static final String RECORD_READER_PROP_PREFIX = "recordReader.prop";
-
+ public static final String SCHEMA = "schema";
+ public static final String SCHEMA_URI = "schemaURI";
+ public static final String SEQUENCE_ID = "sequenceId";
+ public static final String SEGMENT_NAME_GENERATOR_TYPE = "segmentNameGenerator.type";
+ public static final String SEGMENT_NAME_GENERATOR_CONFIGS = "segmentNameGenerator.configs";
+ public static final String OVERWRITE_OUTPUT = "overwriteOutput";
public static final String INPUT_DATA_FILE_URI_KEY = "input.data.file.uri";
- /**
- * Helper method to create a batch config property
- */
- public static String constructBatchProperty(String batchType, String property) {
- return StringUtils.join(BATCH_PREFIX, batchType, property, DOT_SEPARATOR);
+ public static final String PUSH_MODE = "push.mode";
+ public static final String PUSH_CONTROLLER_URI = "push.controllerUri";
+ public static final String PUSH_SEGMENT_URI_PREFIX = "push.segmentUriPrefix";
+ public static final String PUSH_SEGMENT_URI_SUFFIX = "push.segmentUriSuffix";
+
+ public static final String INPUT_FILE_URI = "input.file.uri";
+ public static final String OUTPUT_SEGMENT_DIR_URI = "output.segment.dir.uri";
+
+ public enum SegmentIngestionType {
+ APPEND, REPLACE
+ }
+
+ public class SegmentNameGeneratorType {
+ public static final String SIMPLE = "simple";
+ public static final String NORMALIZED_DATE = "normalizedDate";
+ public static final String FIXED = "fixed";
}
+ public enum SegmentPushType {
+ TAR, URI, METADATA
+ }
}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java
index 62499dd..84e0dbe 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java
@@ -19,17 +19,23 @@
package org.apache.pinot.spi.utils;
import com.google.common.base.Preconditions;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
/**
* Helper methods for extracting fields from IngestionConfig in a backward compatible manner
*/
public final class IngestionConfigUtils {
+ public static final String DOT_SEPARATOR = ".";
+ private static final String DEFAULT_PUSH_MODE = "metadata";
+
/**
* Fetches the streamConfig from the given realtime table.
* First, the ingestionConfigs->stream->streamConfigs will be checked.
@@ -42,8 +48,10 @@ public final class IngestionConfigUtils {
Preconditions.checkState(tableConfig.getTableType() == TableType.REALTIME,
"Cannot fetch streamConfigs for OFFLINE table: %s", tableNameWithType);
Map<String, String> streamConfigMap = null;
- if (tableConfig.getIngestionConfig() != null && tableConfig.getIngestionConfig().getStreamIngestionConfig() != null) {
- List<Map<String, String>> streamConfigMaps = tableConfig.getIngestionConfig().getStreamIngestionConfig().getStreamConfigMaps();
+ if (tableConfig.getIngestionConfig() != null
+ && tableConfig.getIngestionConfig().getStreamIngestionConfig() != null) {
+ List<Map<String, String>> streamConfigMaps =
+ tableConfig.getIngestionConfig().getStreamIngestionConfig().getStreamConfigMaps();
Preconditions.checkState(streamConfigMaps.size() == 1, "Only 1 stream supported per table");
streamConfigMap = streamConfigMaps.get(0);
}
@@ -92,4 +100,34 @@ public final class IngestionConfigUtils {
return segmentIngestionFrequency;
}
+ public static PinotConfiguration getFsProps(Map<String, String> batchConfigMap) {
+ return new PinotConfiguration(getPropsWithPrefix(batchConfigMap, BatchConfigProperties.INPUT_FS_PROP_PREFIX + DOT_SEPARATOR));
+ }
+
+ public static Map<String, Object> getPropsWithPrefix(Map<String, String> batchConfigMap, String prefix) {
+ Map<String, Object> props = new HashMap<>();
+ props.putAll(getConfigMapWithPrefix(batchConfigMap, prefix));
+ return props;
+ }
+
+ public static Map<String, String> getConfigMapWithPrefix(Map<String, String> batchConfigMap, String prefix) {
+ Map<String, String> props = new HashMap<>();
+ for (String configKey : batchConfigMap.keySet()) {
+ if (configKey.startsWith(prefix)) {
+ String[] splits = configKey.split(prefix, 2);
+ if (splits.length > 1) {
+ props.put(splits[1], batchConfigMap.get(configKey));
+ }
+ }
+ }
+ return props;
+ }
+
+ public static String getPushMode(Map<String, String> batchConfigMap) {
+ String pushMode = batchConfigMap.get(BatchConfigProperties.PUSH_MODE);
+ if (pushMode == null) {
+ pushMode = DEFAULT_PUSH_MODE;
+ }
+ return pushMode;
+ }
}
diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/ingestion/batch/BatchConfigTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/ingestion/batch/BatchConfigTest.java
index bc34f1a..851c552 100644
--- a/pinot-spi/src/test/java/org/apache/pinot/spi/ingestion/batch/BatchConfigTest.java
+++ b/pinot-spi/src/test/java/org/apache/pinot/spi/ingestion/batch/BatchConfigTest.java
@@ -36,136 +36,61 @@ public class BatchConfigTest {
public void testBatchConfig() {
Map<String, String> batchConfigMap = new HashMap<>();
String tableName = "foo_REALTIME";
- String batchType = "s3";
String inputDir = "s3://foo/input";
String outputDir = "s3://foo/output";
- String fsClass = "org.apache.S3FS";
+ String inputFsClass = "org.apache.S3FS";
+ String outputFsClass = "org.apache.GcsGS";
String region = "us-west";
String username = "foo";
+ String accessKey = "${ACCESS_KEY}";
+ String secretKey = "${SECRET_KEY}";
String inputFormat = "csv";
String recordReaderClass = "org.foo.CSVRecordReader";
String recordReaderConfigClass = "org.foo.CSVRecordReaderConfig";
String separator = "|";
- batchConfigMap.put(BatchConfigProperties.BATCH_TYPE, batchType);
- batchConfigMap
- .put(BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.INPUT_DIR_URI), inputDir);
- batchConfigMap
- .put(BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.OUTPUT_DIR_URI), outputDir);
- batchConfigMap
- .put(BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.FS_CLASS), fsClass);
- batchConfigMap
- .put(BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.INPUT_FORMAT), inputFormat);
- batchConfigMap
- .put(BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.RECORD_READER_CLASS),
- recordReaderClass);
- batchConfigMap
- .put(BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.RECORD_READER_CONFIG_CLASS),
- recordReaderConfigClass);
- batchConfigMap
- .put(BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.FS_PROP_PREFIX) + ".region",
- region);
- batchConfigMap.put(
- BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.FS_PROP_PREFIX) + ".username",
- username);
- batchConfigMap.put(
- BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.RECORD_READER_PROP_PREFIX)
- + ".separator", separator);
+ batchConfigMap.put(BatchConfigProperties.INPUT_DIR_URI, inputDir);
+ batchConfigMap.put(BatchConfigProperties.OUTPUT_DIR_URI, outputDir);
+ batchConfigMap.put(BatchConfigProperties.INPUT_FS_CLASS, inputFsClass);
+ batchConfigMap.put(BatchConfigProperties.OUTPUT_FS_CLASS, outputFsClass);
+ batchConfigMap.put(BatchConfigProperties.INPUT_FORMAT, inputFormat);
+ batchConfigMap.put(BatchConfigProperties.RECORD_READER_CLASS, recordReaderClass);
+ batchConfigMap.put(BatchConfigProperties.RECORD_READER_CONFIG_CLASS, recordReaderConfigClass);
+ batchConfigMap.put(BatchConfigProperties.INPUT_FS_PROP_PREFIX + ".region", region);
+ batchConfigMap.put(BatchConfigProperties.INPUT_FS_PROP_PREFIX + ".username", username);
+ batchConfigMap.put(BatchConfigProperties.OUTPUT_FS_PROP_PREFIX + ".region", region);
+ batchConfigMap.put(BatchConfigProperties.OUTPUT_FS_PROP_PREFIX + ".accessKey", accessKey);
+ batchConfigMap.put(BatchConfigProperties.OUTPUT_FS_PROP_PREFIX + ".secretKey", secretKey);
+ batchConfigMap.put(BatchConfigProperties.RECORD_READER_PROP_PREFIX + ".separator", separator);
// config with all the right properties
BatchConfig batchConfig = new BatchConfig(tableName, batchConfigMap);
- assertEquals(batchConfig.getType(), batchType);
assertEquals(batchConfig.getInputDirURI(), inputDir);
assertEquals(batchConfig.getOutputDirURI(), outputDir);
- assertEquals(batchConfig.getFsClassName(), fsClass);
+ assertEquals(batchConfig.getInputFsClassName(), inputFsClass);
+ assertEquals(batchConfig.getOutputFsClassName(), outputFsClass);
assertEquals(batchConfig.getInputFormat(), FileFormat.CSV);
assertEquals(batchConfig.getRecordReaderClassName(), recordReaderClass);
assertEquals(batchConfig.getRecordReaderConfigClassName(), recordReaderConfigClass);
- assertEquals(batchConfig.getFsProps().size(), 2);
- assertEquals(batchConfig.getFsProps().get(
- BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.FS_PROP_PREFIX) + ".region"),
- region);
- assertEquals(batchConfig.getFsProps().get(
- BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.FS_PROP_PREFIX) + ".username"),
- username);
+ assertEquals(batchConfig.getInputFsProps().size(), 2);
+ assertEquals(batchConfig.getInputFsProps().get(BatchConfigProperties.INPUT_FS_PROP_PREFIX + ".region"), region);
+ assertEquals(batchConfig.getInputFsProps().get(BatchConfigProperties.INPUT_FS_PROP_PREFIX + ".username"), username);
+ assertEquals(batchConfig.getOutputFsProps().size(), 3);
+ assertEquals(batchConfig.getOutputFsProps().get(BatchConfigProperties.OUTPUT_FS_PROP_PREFIX + ".region"), region);
+ assertEquals(batchConfig.getOutputFsProps().get(BatchConfigProperties.OUTPUT_FS_PROP_PREFIX + ".accessKey"), accessKey);
+ assertEquals(batchConfig.getOutputFsProps().get(BatchConfigProperties.OUTPUT_FS_PROP_PREFIX + ".secretKey"), secretKey);
assertEquals(batchConfig.getRecordReaderProps().size(), 1);
- assertEquals(batchConfig.getRecordReaderProps().get(
- BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.RECORD_READER_PROP_PREFIX)
- + ".separator"), separator);
+ assertEquals(batchConfig.getRecordReaderProps().get(BatchConfigProperties.RECORD_READER_PROP_PREFIX + ".separator"),
+ separator);
assertEquals(batchConfig.getTableNameWithType(), tableName);
// Missing props
Map<String, String> testBatchConfigMap = new HashMap<>(batchConfigMap);
- testBatchConfigMap.remove(BatchConfigProperties.BATCH_TYPE);
- try {
- new BatchConfig(tableName, testBatchConfigMap);
- Assert.fail("Should fail for missing 'batchType");
- } catch (IllegalStateException e) {
- // expected
- }
-
- testBatchConfigMap = new HashMap<>(batchConfigMap);
- testBatchConfigMap
- .remove(BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.INPUT_DIR_URI));
- try {
- new BatchConfig(tableName, testBatchConfigMap);
- Assert.fail("Should fail for missing 'inputDirURI");
- } catch (IllegalStateException e) {
- // expected
- }
-
- testBatchConfigMap = new HashMap<>(batchConfigMap);
- testBatchConfigMap
- .remove(BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.OUTPUT_DIR_URI));
- try {
- new BatchConfig(tableName, testBatchConfigMap);
- Assert.fail("Should fail for missing 'outputDirURI");
- } catch (IllegalStateException e) {
- // expected
- }
-
- testBatchConfigMap = new HashMap<>(batchConfigMap);
- testBatchConfigMap
- .remove(BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.INPUT_FORMAT));
+ testBatchConfigMap.remove(BatchConfigProperties.INPUT_FORMAT);
try {
new BatchConfig(tableName, testBatchConfigMap);
Assert.fail("Should fail for missing 'inputFormat");
} catch (IllegalStateException e) {
// expected
}
-
- testBatchConfigMap = new HashMap<>(batchConfigMap);
- testBatchConfigMap
- .put(BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.INPUT_FORMAT), "moo");
- try {
- new BatchConfig(tableName, testBatchConfigMap);
- Assert.fail("Should fail for incorrect 'inputFormat");
- } catch (IllegalArgumentException e) {
- // expected
- }
-
- testBatchConfigMap = new HashMap<>(batchConfigMap);
- testBatchConfigMap
- .remove(BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.RECORD_READER_CLASS));
- try {
- new BatchConfig(tableName, testBatchConfigMap);
- Assert.fail("Should fail for missing 'recordReaderClassName");
- } catch (IllegalStateException e) {
- // expected
- }
-
- testBatchConfigMap = new HashMap<>(batchConfigMap);
- testBatchConfigMap
- .remove(BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.RECORD_READER_CONFIG_CLASS));
- new BatchConfig(tableName, testBatchConfigMap);
-
- testBatchConfigMap = new HashMap<>(batchConfigMap);
- testBatchConfigMap
- .remove(BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.FS_CLASS));
- try {
- new BatchConfig(tableName, testBatchConfigMap);
- Assert.fail("Should fail for missing 'fsClassName");
- } catch (IllegalStateException e) {
- // expected
- }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org