You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2020/12/14 19:45:38 UTC

[incubator-pinot] branch master updated: simplify batch config and corresponding utils (#6332)

This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 4183ffe  simplify batch config and corresponding utils (#6332)
4183ffe is described below

commit 4183ffe71c19944312a99555392033cbd7481ac4
Author: Xiang Fu <fx...@gmail.com>
AuthorDate: Mon Dec 14 11:45:04 2020 -0800

    simplify batch config and corresponding utils (#6332)
---
 .../pinot/core/util/TableConfigUtilsTest.java      |  19 ++-
 .../pinot/spi/filesystem/PinotFSFactory.java       |   2 +-
 .../pinot/spi/ingestion/batch/BatchConfig.java     |  83 ++++++-------
 .../spi/ingestion/batch/BatchConfigProperties.java |  49 +++++---
 .../pinot/spi/utils/IngestionConfigUtils.java      |  42 ++++++-
 .../pinot/spi/ingestion/batch/BatchConfigTest.java | 133 +++++----------------
 6 files changed, 145 insertions(+), 183 deletions(-)

diff --git a/pinot-core/src/test/java/org/apache/pinot/core/util/TableConfigUtilsTest.java b/pinot-core/src/test/java/org/apache/pinot/core/util/TableConfigUtilsTest.java
index ae313c0..f269cae 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/util/TableConfigUtilsTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/util/TableConfigUtilsTest.java
@@ -377,16 +377,12 @@ public class TableConfigUtilsTest {
   @Test
   public void ingestionBatchConfigsTest() {
     Map<String, String> batchConfigMap = new HashMap<>();
-    batchConfigMap.put(BatchConfigProperties.BATCH_TYPE, "s3");
-    batchConfigMap
-        .put(BatchConfigProperties.constructBatchProperty("s3", BatchConfigProperties.INPUT_DIR_URI), "s3://foo");
-    batchConfigMap
-        .put(BatchConfigProperties.constructBatchProperty("s3", BatchConfigProperties.OUTPUT_DIR_URI), "s3://bar");
-    batchConfigMap
-        .put(BatchConfigProperties.constructBatchProperty("s3", BatchConfigProperties.FS_CLASS), "org.foo.S3FS");
-    batchConfigMap.put(BatchConfigProperties.constructBatchProperty("s3", BatchConfigProperties.INPUT_FORMAT), "avro");
-    batchConfigMap.put(BatchConfigProperties.constructBatchProperty("s3", BatchConfigProperties.RECORD_READER_CLASS),
-        "org.foo.Reader");
+    batchConfigMap.put(BatchConfigProperties.INPUT_DIR_URI, "s3://foo");
+    batchConfigMap.put(BatchConfigProperties.OUTPUT_DIR_URI, "gs://bar");
+    batchConfigMap.put(BatchConfigProperties.INPUT_FS_CLASS, "org.foo.S3FS");
+    batchConfigMap.put(BatchConfigProperties.OUTPUT_FS_CLASS, "org.foo.GcsFS");
+    batchConfigMap.put(BatchConfigProperties.INPUT_FORMAT, "avro");
+    batchConfigMap.put(BatchConfigProperties.RECORD_READER_CLASS, "org.foo.Reader");
 
     IngestionConfig ingestionConfig =
         new IngestionConfig(new BatchIngestionConfig(Lists.newArrayList(batchConfigMap, batchConfigMap), null, null),
@@ -395,8 +391,7 @@ public class TableConfigUtilsTest {
         new TableConfigBuilder(TableType.OFFLINE).setTableName("myTable_OFFLINE").setIngestionConfig(ingestionConfig)
             .build();
     TableConfigUtils.validateIngestionConfig(tableConfig, null);
-
-    batchConfigMap.remove(BatchConfigProperties.BATCH_TYPE);
+    batchConfigMap.remove(BatchConfigProperties.INPUT_FORMAT);
     try {
       TableConfigUtils.validateIngestionConfig(tableConfig, null);
       Assert.fail("Should fail for invalid batch config map");
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/PinotFSFactory.java b/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/PinotFSFactory.java
index b163f5c..6366d95 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/PinotFSFactory.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/PinotFSFactory.java
@@ -38,8 +38,8 @@ public class PinotFSFactory {
   private PinotFSFactory() {
   }
 
+  public static final String LOCAL_PINOT_FS_SCHEME = "file";
   private static final Logger LOGGER = LoggerFactory.getLogger(PinotFSFactory.class);
-  private static final String LOCAL_PINOT_FS_SCHEME = "file";
   private static final String CLASS = "class";
   private static final Map<String, PinotFS> PINOT_FS_MAP = new HashMap<String, PinotFS>() {
     {
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfig.java
index 965eb9a..780f711 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfig.java
@@ -28,62 +28,43 @@ import org.apache.pinot.spi.data.readers.FileFormat;
  * Provides all config related to the batch data source, as configured in the table config's ingestion config
  */
 public class BatchConfig {
+  private final Map<String, String> _batchConfigMap;
+
   private final String _tableNameWithType;
-  private final String _type;
   private final String _inputDirURI;
   private final String _outputDirURI;
-  private final String _fsClassName;
-  private final Map<String, String> _fsProps = new HashMap<>();
+  private final String _inputFsClassName;
+  private final Map<String, String> _inputFsProps = new HashMap<>();
+  private final String _outputFsClassName;
+  private final Map<String, String> _outputFsProps = new HashMap<>();
   private final FileFormat _inputFormat;
   private final String _recordReaderClassName;
   private final String _recordReaderConfigClassName;
   private final Map<String, String> _recordReaderProps = new HashMap<>();
 
-  private final Map<String, String> _batchConfigMap = new HashMap<>();
-
   public BatchConfig(String tableNameWithType, Map<String, String> batchConfigsMap) {
+    _batchConfigMap = batchConfigsMap;
     _tableNameWithType = tableNameWithType;
-
-    _type = batchConfigsMap.get(BatchConfigProperties.BATCH_TYPE);
-    Preconditions.checkState(_type != null, "Property: %s cannot be null", BatchConfigProperties.BATCH_TYPE);
-
-    String inputDirURIKey = BatchConfigProperties.constructBatchProperty(_type, BatchConfigProperties.INPUT_DIR_URI);
-    _inputDirURI = batchConfigsMap.get(inputDirURIKey);
-    Preconditions.checkState(_inputDirURI != null, "Property: %s cannot be null", inputDirURIKey);
-
-    String outputDirURIKey = BatchConfigProperties.constructBatchProperty(_type, BatchConfigProperties.OUTPUT_DIR_URI);
-    _outputDirURI = batchConfigsMap.get(outputDirURIKey);
-    Preconditions.checkState(_outputDirURI != null, "Property: %s cannot be null", outputDirURIKey);
-
-    String fsClassNameKey = BatchConfigProperties.constructBatchProperty(_type, BatchConfigProperties.FS_CLASS);
-    _fsClassName = batchConfigsMap.get(fsClassNameKey);
-    Preconditions.checkState(_fsClassName != null, "Property: %s cannot be null", fsClassNameKey);
-
-    String inputFormatKey = BatchConfigProperties.constructBatchProperty(_type, BatchConfigProperties.INPUT_FORMAT);
-    String inputFormat = batchConfigsMap.get(inputFormatKey);
-    Preconditions.checkState(inputFormat != null, "Property: %s cannot be null", inputFormat);
+    String inputFormat = batchConfigsMap.get(BatchConfigProperties.INPUT_FORMAT);
+    Preconditions.checkState(inputFormat != null, "Property: %s cannot be null", BatchConfigProperties.INPUT_FORMAT);
     _inputFormat = FileFormat.valueOf(inputFormat.toUpperCase());
-
-    String recordReaderClassNameKey =
-        BatchConfigProperties.constructBatchProperty(_type, BatchConfigProperties.RECORD_READER_CLASS);
-    _recordReaderClassName = batchConfigsMap.get(recordReaderClassNameKey);
-    Preconditions.checkState(_recordReaderClassName != null, "Property: %s cannot be null", recordReaderClassNameKey);
-
-    String recordReaderConfigClassNameKey =
-        BatchConfigProperties.constructBatchProperty(_type, BatchConfigProperties.RECORD_READER_CONFIG_CLASS);
-    _recordReaderConfigClassName = batchConfigsMap.get(recordReaderConfigClassNameKey);
-
-    String fsPropPrefix = BatchConfigProperties.constructBatchProperty(_type, BatchConfigProperties.FS_PROP_PREFIX);
-    String recordReaderPropPrefix =
-        BatchConfigProperties.constructBatchProperty(_type, BatchConfigProperties.RECORD_READER_PROP_PREFIX);
+    _inputDirURI = batchConfigsMap.get(BatchConfigProperties.INPUT_DIR_URI);
+    _inputFsClassName = batchConfigsMap.get(BatchConfigProperties.INPUT_FS_CLASS);
+    _outputDirURI = batchConfigsMap.get(BatchConfigProperties.OUTPUT_DIR_URI);
+    _outputFsClassName = batchConfigsMap.get(BatchConfigProperties.OUTPUT_FS_CLASS);
+    _recordReaderClassName = batchConfigsMap.get(BatchConfigProperties.RECORD_READER_CLASS);
+    _recordReaderConfigClassName = batchConfigsMap.get(BatchConfigProperties.RECORD_READER_CONFIG_CLASS);
     for (Map.Entry<String, String> entry : batchConfigsMap.entrySet()) {
       String key = entry.getKey();
-      if (key.startsWith(fsPropPrefix)) {
-        _fsProps.put(key, entry.getValue());
-      } else if (key.startsWith(recordReaderPropPrefix)) {
+      if (key.startsWith(BatchConfigProperties.INPUT_FS_PROP_PREFIX)) {
+        _inputFsProps.put(key, entry.getValue());
+      }
+      if (key.startsWith(BatchConfigProperties.OUTPUT_FS_PROP_PREFIX)) {
+        _outputFsProps.put(key, entry.getValue());
+      }
+      if (key.startsWith(BatchConfigProperties.RECORD_READER_PROP_PREFIX)) {
         _recordReaderProps.put(key, entry.getValue());
       }
-      _batchConfigMap.put(key, entry.getValue());
     }
   }
 
@@ -91,10 +72,6 @@ public class BatchConfig {
     return _tableNameWithType;
   }
 
-  public String getType() {
-    return _type;
-  }
-
   public String getInputDirURI() {
     return _inputDirURI;
   }
@@ -103,12 +80,20 @@ public class BatchConfig {
     return _outputDirURI;
   }
 
-  public String getFsClassName() {
-    return _fsClassName;
+  public String getInputFsClassName() {
+    return _inputFsClassName;
+  }
+
+  public Map<String, String> getInputFsProps() {
+    return _inputFsProps;
+  }
+
+  public String getOutputFsClassName() {
+    return _outputFsClassName;
   }
 
-  public Map<String, String> getFsProps() {
-    return _fsProps;
+  public Map<String, String> getOutputFsProps() {
+    return _outputFsProps;
   }
 
   public FileFormat getInputFormat() {
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfigProperties.java b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfigProperties.java
index 5d276aa..2d83335 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfigProperties.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfigProperties.java
@@ -18,33 +18,52 @@
  */
 package org.apache.pinot.spi.ingestion.batch;
 
-import org.apache.commons.lang3.StringUtils;
-
-
 /**
  * Defines all the keys used in the batch configs map
  */
 public class BatchConfigProperties {
 
-  public static final String DOT_SEPARATOR = ".";
-  public static final String BATCH_PREFIX = "batch";
+  public static final String TABLE_CONFIGS = "tableConfigs";
+  public static final String TABLE_NAME = "tableName";
 
-  public static final String BATCH_TYPE = "batchType";
   public static final String INPUT_DIR_URI = "inputDirURI";
   public static final String OUTPUT_DIR_URI = "outputDirURI";
-  public static final String FS_CLASS = "fs.className";
-  public static final String FS_PROP_PREFIX = "fs.prop";
+  public static final String INPUT_FS_CLASS = "input.fs.className";
+  public static final String INPUT_FS_PROP_PREFIX = "input.fs.prop";
+  public static final String OUTPUT_FS_CLASS = "output.fs.className";
+  public static final String OUTPUT_FS_PROP_PREFIX = "output.fs.prop";
   public static final String INPUT_FORMAT = "inputFormat";
+  public static final String INCLUDE_FILE_NAME_PATTERN = "includeFileNamePattern";
+  public static final String EXCLUDE_FILE_NAME_PATTERN = "excludeFileNamePattern";
   public static final String RECORD_READER_CLASS = "recordReader.className";
-  public static final String RECORD_READER_CONFIG_CLASS = "recordReader.config.className";
+  public static final String RECORD_READER_CONFIG_CLASS = "recordReader.configClassName";
   public static final String RECORD_READER_PROP_PREFIX = "recordReader.prop";
-
+  public static final String SCHEMA = "schema";
+  public static final String SCHEMA_URI = "schemaURI";
+  public static final String SEQUENCE_ID = "sequenceId";
+  public static final String SEGMENT_NAME_GENERATOR_TYPE = "segmentNameGenerator.type";
+  public static final String SEGMENT_NAME_GENERATOR_CONFIGS = "segmentNameGenerator.configs";
+  public static final String OVERWRITE_OUTPUT = "overwriteOutput";
   public static final String INPUT_DATA_FILE_URI_KEY = "input.data.file.uri";
-  /**
-   * Helper method to create a batch config property
-   */
-  public static String constructBatchProperty(String batchType, String property) {
-    return StringUtils.join(BATCH_PREFIX, batchType, property, DOT_SEPARATOR);
+  public static final String PUSH_MODE = "push.mode";
+  public static final String PUSH_CONTROLLER_URI = "push.controllerUri";
+  public static final String PUSH_SEGMENT_URI_PREFIX = "push.segmentUriPrefix";
+  public static final String PUSH_SEGMENT_URI_SUFFIX = "push.segmentUriSuffix";
+
+  public static final String INPUT_FILE_URI = "input.file.uri";
+  public static final String OUTPUT_SEGMENT_DIR_URI = "output.segment.dir.uri";
+
+  public enum SegmentIngestionType {
+    APPEND, REPLACE
+  }
+
+  public class SegmentNameGeneratorType {
+    public static final String SIMPLE = "simple";
+    public static final String NORMALIZED_DATE = "normalizedDate";
+    public static final String FIXED = "fixed";
   }
 
+  public enum SegmentPushType {
+    TAR, URI, METADATA
+  }
 }
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java
index 62499dd..84e0dbe 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java
@@ -19,17 +19,23 @@
 package org.apache.pinot.spi.utils;
 
 import com.google.common.base.Preconditions;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
 
 
 /**
  * Helper methods for extracting fields from IngestionConfig in a backward compatible manner
  */
 public final class IngestionConfigUtils {
+  public static final String DOT_SEPARATOR = ".";
+  private static final String DEFAULT_PUSH_MODE = "metadata";
+
   /**
    * Fetches the streamConfig from the given realtime table.
    * First, the ingestionConfigs->stream->streamConfigs will be checked.
@@ -42,8 +48,10 @@ public final class IngestionConfigUtils {
     Preconditions.checkState(tableConfig.getTableType() == TableType.REALTIME,
         "Cannot fetch streamConfigs for OFFLINE table: %s", tableNameWithType);
     Map<String, String> streamConfigMap = null;
-    if (tableConfig.getIngestionConfig() != null && tableConfig.getIngestionConfig().getStreamIngestionConfig() != null) {
-      List<Map<String, String>> streamConfigMaps = tableConfig.getIngestionConfig().getStreamIngestionConfig().getStreamConfigMaps();
+    if (tableConfig.getIngestionConfig() != null
+        && tableConfig.getIngestionConfig().getStreamIngestionConfig() != null) {
+      List<Map<String, String>> streamConfigMaps =
+          tableConfig.getIngestionConfig().getStreamIngestionConfig().getStreamConfigMaps();
       Preconditions.checkState(streamConfigMaps.size() == 1, "Only 1 stream supported per table");
       streamConfigMap = streamConfigMaps.get(0);
     }
@@ -92,4 +100,34 @@ public final class IngestionConfigUtils {
     return segmentIngestionFrequency;
   }
 
+  public static PinotConfiguration getFsProps(Map<String, String> batchConfigMap) {
+    return new PinotConfiguration(getPropsWithPrefix(batchConfigMap, BatchConfigProperties.INPUT_FS_PROP_PREFIX + DOT_SEPARATOR));
+  }
+
+  public static Map<String, Object> getPropsWithPrefix(Map<String, String> batchConfigMap, String prefix) {
+    Map<String, Object> props = new HashMap<>();
+    props.putAll(getConfigMapWithPrefix(batchConfigMap, prefix));
+    return props;
+  }
+
+  public static Map<String, String> getConfigMapWithPrefix(Map<String, String> batchConfigMap, String prefix) {
+    Map<String, String> props = new HashMap<>();
+    for (String configKey : batchConfigMap.keySet()) {
+      if (configKey.startsWith(prefix)) {
+        String[] splits = configKey.split(prefix, 2);
+        if (splits.length > 1) {
+          props.put(splits[1], batchConfigMap.get(configKey));
+        }
+      }
+    }
+    return props;
+  }
+
+  public static String getPushMode(Map<String, String> batchConfigMap) {
+    String pushMode = batchConfigMap.get(BatchConfigProperties.PUSH_MODE);
+    if (pushMode == null) {
+      pushMode = DEFAULT_PUSH_MODE;
+    }
+    return pushMode;
+  }
 }
diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/ingestion/batch/BatchConfigTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/ingestion/batch/BatchConfigTest.java
index bc34f1a..851c552 100644
--- a/pinot-spi/src/test/java/org/apache/pinot/spi/ingestion/batch/BatchConfigTest.java
+++ b/pinot-spi/src/test/java/org/apache/pinot/spi/ingestion/batch/BatchConfigTest.java
@@ -36,136 +36,61 @@ public class BatchConfigTest {
   public void testBatchConfig() {
     Map<String, String> batchConfigMap = new HashMap<>();
     String tableName = "foo_REALTIME";
-    String batchType = "s3";
     String inputDir = "s3://foo/input";
     String outputDir = "s3://foo/output";
-    String fsClass = "org.apache.S3FS";
+    String inputFsClass = "org.apache.S3FS";
+    String outputFsClass = "org.apache.GcsGS";
     String region = "us-west";
     String username = "foo";
+    String accessKey = "${ACCESS_KEY}";
+    String secretKey = "${SECRET_KEY}";
     String inputFormat = "csv";
     String recordReaderClass = "org.foo.CSVRecordReader";
     String recordReaderConfigClass = "org.foo.CSVRecordReaderConfig";
     String separator = "|";
-    batchConfigMap.put(BatchConfigProperties.BATCH_TYPE, batchType);
-    batchConfigMap
-        .put(BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.INPUT_DIR_URI), inputDir);
-    batchConfigMap
-        .put(BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.OUTPUT_DIR_URI), outputDir);
-    batchConfigMap
-        .put(BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.FS_CLASS), fsClass);
-    batchConfigMap
-        .put(BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.INPUT_FORMAT), inputFormat);
-    batchConfigMap
-        .put(BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.RECORD_READER_CLASS),
-            recordReaderClass);
-    batchConfigMap
-        .put(BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.RECORD_READER_CONFIG_CLASS),
-            recordReaderConfigClass);
-    batchConfigMap
-        .put(BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.FS_PROP_PREFIX) + ".region",
-            region);
-    batchConfigMap.put(
-        BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.FS_PROP_PREFIX) + ".username",
-        username);
-    batchConfigMap.put(
-        BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.RECORD_READER_PROP_PREFIX)
-            + ".separator", separator);
+    batchConfigMap.put(BatchConfigProperties.INPUT_DIR_URI, inputDir);
+    batchConfigMap.put(BatchConfigProperties.OUTPUT_DIR_URI, outputDir);
+    batchConfigMap.put(BatchConfigProperties.INPUT_FS_CLASS, inputFsClass);
+    batchConfigMap.put(BatchConfigProperties.OUTPUT_FS_CLASS, outputFsClass);
+    batchConfigMap.put(BatchConfigProperties.INPUT_FORMAT, inputFormat);
+    batchConfigMap.put(BatchConfigProperties.RECORD_READER_CLASS, recordReaderClass);
+    batchConfigMap.put(BatchConfigProperties.RECORD_READER_CONFIG_CLASS, recordReaderConfigClass);
+    batchConfigMap.put(BatchConfigProperties.INPUT_FS_PROP_PREFIX + ".region", region);
+    batchConfigMap.put(BatchConfigProperties.INPUT_FS_PROP_PREFIX + ".username", username);
+    batchConfigMap.put(BatchConfigProperties.OUTPUT_FS_PROP_PREFIX + ".region", region);
+    batchConfigMap.put(BatchConfigProperties.OUTPUT_FS_PROP_PREFIX + ".accessKey", accessKey);
+    batchConfigMap.put(BatchConfigProperties.OUTPUT_FS_PROP_PREFIX + ".secretKey", secretKey);
+    batchConfigMap.put(BatchConfigProperties.RECORD_READER_PROP_PREFIX + ".separator", separator);
 
     // config with all the right properties
     BatchConfig batchConfig = new BatchConfig(tableName, batchConfigMap);
-    assertEquals(batchConfig.getType(), batchType);
     assertEquals(batchConfig.getInputDirURI(), inputDir);
     assertEquals(batchConfig.getOutputDirURI(), outputDir);
-    assertEquals(batchConfig.getFsClassName(), fsClass);
+    assertEquals(batchConfig.getInputFsClassName(), inputFsClass);
+    assertEquals(batchConfig.getOutputFsClassName(), outputFsClass);
     assertEquals(batchConfig.getInputFormat(), FileFormat.CSV);
     assertEquals(batchConfig.getRecordReaderClassName(), recordReaderClass);
     assertEquals(batchConfig.getRecordReaderConfigClassName(), recordReaderConfigClass);
-    assertEquals(batchConfig.getFsProps().size(), 2);
-    assertEquals(batchConfig.getFsProps().get(
-        BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.FS_PROP_PREFIX) + ".region"),
-        region);
-    assertEquals(batchConfig.getFsProps().get(
-        BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.FS_PROP_PREFIX) + ".username"),
-        username);
+    assertEquals(batchConfig.getInputFsProps().size(), 2);
+    assertEquals(batchConfig.getInputFsProps().get(BatchConfigProperties.INPUT_FS_PROP_PREFIX + ".region"), region);
+    assertEquals(batchConfig.getInputFsProps().get(BatchConfigProperties.INPUT_FS_PROP_PREFIX + ".username"), username);
+    assertEquals(batchConfig.getOutputFsProps().size(), 3);
+    assertEquals(batchConfig.getOutputFsProps().get(BatchConfigProperties.OUTPUT_FS_PROP_PREFIX + ".region"), region);
+    assertEquals(batchConfig.getOutputFsProps().get(BatchConfigProperties.OUTPUT_FS_PROP_PREFIX + ".accessKey"), accessKey);
+    assertEquals(batchConfig.getOutputFsProps().get(BatchConfigProperties.OUTPUT_FS_PROP_PREFIX + ".secretKey"), secretKey);
     assertEquals(batchConfig.getRecordReaderProps().size(), 1);
-    assertEquals(batchConfig.getRecordReaderProps().get(
-        BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.RECORD_READER_PROP_PREFIX)
-            + ".separator"), separator);
+    assertEquals(batchConfig.getRecordReaderProps().get(BatchConfigProperties.RECORD_READER_PROP_PREFIX + ".separator"),
+        separator);
     assertEquals(batchConfig.getTableNameWithType(), tableName);
 
     // Missing props
     Map<String, String> testBatchConfigMap = new HashMap<>(batchConfigMap);
-    testBatchConfigMap.remove(BatchConfigProperties.BATCH_TYPE);
-    try {
-      new BatchConfig(tableName, testBatchConfigMap);
-      Assert.fail("Should fail for missing 'batchType");
-    } catch (IllegalStateException e) {
-      // expected
-    }
-
-    testBatchConfigMap = new HashMap<>(batchConfigMap);
-    testBatchConfigMap
-        .remove(BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.INPUT_DIR_URI));
-    try {
-      new BatchConfig(tableName, testBatchConfigMap);
-      Assert.fail("Should fail for missing 'inputDirURI");
-    } catch (IllegalStateException e) {
-      // expected
-    }
-
-    testBatchConfigMap = new HashMap<>(batchConfigMap);
-    testBatchConfigMap
-        .remove(BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.OUTPUT_DIR_URI));
-    try {
-      new BatchConfig(tableName, testBatchConfigMap);
-      Assert.fail("Should fail for missing 'outputDirURI");
-    } catch (IllegalStateException e) {
-      // expected
-    }
-
-    testBatchConfigMap = new HashMap<>(batchConfigMap);
-    testBatchConfigMap
-        .remove(BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.INPUT_FORMAT));
+    testBatchConfigMap.remove(BatchConfigProperties.INPUT_FORMAT);
     try {
       new BatchConfig(tableName, testBatchConfigMap);
       Assert.fail("Should fail for missing 'inputFormat");
     } catch (IllegalStateException e) {
       // expected
     }
-
-    testBatchConfigMap = new HashMap<>(batchConfigMap);
-    testBatchConfigMap
-        .put(BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.INPUT_FORMAT), "moo");
-    try {
-      new BatchConfig(tableName, testBatchConfigMap);
-      Assert.fail("Should fail for incorrect 'inputFormat");
-    } catch (IllegalArgumentException e) {
-      // expected
-    }
-
-    testBatchConfigMap = new HashMap<>(batchConfigMap);
-    testBatchConfigMap
-        .remove(BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.RECORD_READER_CLASS));
-    try {
-      new BatchConfig(tableName, testBatchConfigMap);
-      Assert.fail("Should fail for missing 'recordReaderClassName");
-    } catch (IllegalStateException e) {
-      // expected
-    }
-
-    testBatchConfigMap = new HashMap<>(batchConfigMap);
-    testBatchConfigMap
-        .remove(BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.RECORD_READER_CONFIG_CLASS));
-    new BatchConfig(tableName, testBatchConfigMap);
-
-    testBatchConfigMap = new HashMap<>(batchConfigMap);
-    testBatchConfigMap
-        .remove(BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.FS_CLASS));
-    try {
-      new BatchConfig(tableName, testBatchConfigMap);
-      Assert.fail("Should fail for missing 'fsClassName");
-    } catch (IllegalStateException e) {
-      // expected
-    }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org