You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2020/12/08 06:42:01 UTC

[incubator-pinot] branch adding_pinot_minion_segment_creation_tasks_1 created (now adc0f0c)

This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a change to branch adding_pinot_minion_segment_creation_tasks_1
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git.


      at adc0f0c  simplify batch config and corresponding utils

This branch includes the following new commits:

     new adc0f0c  simplify batch config and corresponding utils

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 01/01: simplify batch config and corresponding utils

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch adding_pinot_minion_segment_creation_tasks_1
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit adc0f0c31bc3bfe4e99a1100265e72401d114ad6
Author: Xiang Fu <fx...@gmail.com>
AuthorDate: Mon Dec 7 22:40:40 2020 -0800

    simplify batch config and corresponding utils
---
 .../pinot/core/util/TableConfigUtilsTest.java      | 18 ++---
 .../pinot/spi/filesystem/PinotFSFactory.java       |  2 +-
 .../pinot/spi/ingestion/batch/BatchConfig.java     | 52 ++++--------
 .../spi/ingestion/batch/BatchConfigProperties.java | 38 ++++++---
 .../pinot/spi/utils/IngestionConfigUtils.java      | 42 +++++++++-
 .../pinot/spi/ingestion/batch/BatchConfigTest.java | 94 ++++------------------
 6 files changed, 108 insertions(+), 138 deletions(-)

diff --git a/pinot-core/src/test/java/org/apache/pinot/core/util/TableConfigUtilsTest.java b/pinot-core/src/test/java/org/apache/pinot/core/util/TableConfigUtilsTest.java
index ae313c0..362cc8f 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/util/TableConfigUtilsTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/util/TableConfigUtilsTest.java
@@ -377,16 +377,11 @@ public class TableConfigUtilsTest {
   @Test
   public void ingestionBatchConfigsTest() {
     Map<String, String> batchConfigMap = new HashMap<>();
-    batchConfigMap.put(BatchConfigProperties.BATCH_TYPE, "s3");
-    batchConfigMap
-        .put(BatchConfigProperties.constructBatchProperty("s3", BatchConfigProperties.INPUT_DIR_URI), "s3://foo");
-    batchConfigMap
-        .put(BatchConfigProperties.constructBatchProperty("s3", BatchConfigProperties.OUTPUT_DIR_URI), "s3://bar");
-    batchConfigMap
-        .put(BatchConfigProperties.constructBatchProperty("s3", BatchConfigProperties.FS_CLASS), "org.foo.S3FS");
-    batchConfigMap.put(BatchConfigProperties.constructBatchProperty("s3", BatchConfigProperties.INPUT_FORMAT), "avro");
-    batchConfigMap.put(BatchConfigProperties.constructBatchProperty("s3", BatchConfigProperties.RECORD_READER_CLASS),
-        "org.foo.Reader");
+    batchConfigMap.put(BatchConfigProperties.INPUT_DIR_URI, "s3://foo");
+    batchConfigMap.put(BatchConfigProperties.OUTPUT_DIR_URI, "s3://bar");
+    batchConfigMap.put(BatchConfigProperties.FS_CLASS, "org.foo.S3FS");
+    batchConfigMap.put(BatchConfigProperties.INPUT_FORMAT, "avro");
+    batchConfigMap.put(BatchConfigProperties.RECORD_READER_CLASS, "org.foo.Reader");
 
     IngestionConfig ingestionConfig =
         new IngestionConfig(new BatchIngestionConfig(Lists.newArrayList(batchConfigMap, batchConfigMap), null, null),
@@ -395,8 +390,7 @@ public class TableConfigUtilsTest {
         new TableConfigBuilder(TableType.OFFLINE).setTableName("myTable_OFFLINE").setIngestionConfig(ingestionConfig)
             .build();
     TableConfigUtils.validateIngestionConfig(tableConfig, null);
-
-    batchConfigMap.remove(BatchConfigProperties.BATCH_TYPE);
+    batchConfigMap.remove(BatchConfigProperties.INPUT_DIR_URI);
     try {
       TableConfigUtils.validateIngestionConfig(tableConfig, null);
       Assert.fail("Should fail for invalid batch config map");
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/PinotFSFactory.java b/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/PinotFSFactory.java
index b163f5c..6366d95 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/PinotFSFactory.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/PinotFSFactory.java
@@ -38,8 +38,8 @@ public class PinotFSFactory {
   private PinotFSFactory() {
   }
 
+  public static final String LOCAL_PINOT_FS_SCHEME = "file";
   private static final Logger LOGGER = LoggerFactory.getLogger(PinotFSFactory.class);
-  private static final String LOCAL_PINOT_FS_SCHEME = "file";
   private static final String CLASS = "class";
   private static final Map<String, PinotFS> PINOT_FS_MAP = new HashMap<String, PinotFS>() {
     {
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfig.java
index 965eb9a..bd4731a 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfig.java
@@ -28,8 +28,9 @@ import org.apache.pinot.spi.data.readers.FileFormat;
  * Provides all config related to the batch data source, as configured in the table config's ingestion config
  */
 public class BatchConfig {
+  private final Map<String, String> _batchConfigMap;
+
   private final String _tableNameWithType;
-  private final String _type;
   private final String _inputDirURI;
   private final String _outputDirURI;
   private final String _fsClassName;
@@ -39,51 +40,32 @@ public class BatchConfig {
   private final String _recordReaderConfigClassName;
   private final Map<String, String> _recordReaderProps = new HashMap<>();
 
-  private final Map<String, String> _batchConfigMap = new HashMap<>();
-
   public BatchConfig(String tableNameWithType, Map<String, String> batchConfigsMap) {
-    _tableNameWithType = tableNameWithType;
+    _batchConfigMap = batchConfigsMap;
 
-    _type = batchConfigsMap.get(BatchConfigProperties.BATCH_TYPE);
-    Preconditions.checkState(_type != null, "Property: %s cannot be null", BatchConfigProperties.BATCH_TYPE);
+    _tableNameWithType = tableNameWithType;
 
-    String inputDirURIKey = BatchConfigProperties.constructBatchProperty(_type, BatchConfigProperties.INPUT_DIR_URI);
-    _inputDirURI = batchConfigsMap.get(inputDirURIKey);
-    Preconditions.checkState(_inputDirURI != null, "Property: %s cannot be null", inputDirURIKey);
+    _inputDirURI = batchConfigsMap.get(BatchConfigProperties.INPUT_DIR_URI);
+    Preconditions.checkState(_inputDirURI != null, "Property: %s cannot be null", BatchConfigProperties.INPUT_DIR_URI);
 
-    String outputDirURIKey = BatchConfigProperties.constructBatchProperty(_type, BatchConfigProperties.OUTPUT_DIR_URI);
-    _outputDirURI = batchConfigsMap.get(outputDirURIKey);
-    Preconditions.checkState(_outputDirURI != null, "Property: %s cannot be null", outputDirURIKey);
+    _outputDirURI = batchConfigsMap.get(BatchConfigProperties.OUTPUT_DIR_URI);
+    Preconditions.checkState(_outputDirURI != null, "Property: %s cannot be null", BatchConfigProperties.OUTPUT_DIR_URI);
 
-    String fsClassNameKey = BatchConfigProperties.constructBatchProperty(_type, BatchConfigProperties.FS_CLASS);
-    _fsClassName = batchConfigsMap.get(fsClassNameKey);
-    Preconditions.checkState(_fsClassName != null, "Property: %s cannot be null", fsClassNameKey);
+    _fsClassName = batchConfigsMap.get(BatchConfigProperties.FS_CLASS);
 
-    String inputFormatKey = BatchConfigProperties.constructBatchProperty(_type, BatchConfigProperties.INPUT_FORMAT);
-    String inputFormat = batchConfigsMap.get(inputFormatKey);
-    Preconditions.checkState(inputFormat != null, "Property: %s cannot be null", inputFormat);
+    String inputFormat = batchConfigsMap.get(BatchConfigProperties.INPUT_FORMAT);
+    Preconditions.checkState(inputFormat != null, "Property: %s cannot be null", BatchConfigProperties.INPUT_FORMAT);
     _inputFormat = FileFormat.valueOf(inputFormat.toUpperCase());
 
-    String recordReaderClassNameKey =
-        BatchConfigProperties.constructBatchProperty(_type, BatchConfigProperties.RECORD_READER_CLASS);
-    _recordReaderClassName = batchConfigsMap.get(recordReaderClassNameKey);
-    Preconditions.checkState(_recordReaderClassName != null, "Property: %s cannot be null", recordReaderClassNameKey);
-
-    String recordReaderConfigClassNameKey =
-        BatchConfigProperties.constructBatchProperty(_type, BatchConfigProperties.RECORD_READER_CONFIG_CLASS);
-    _recordReaderConfigClassName = batchConfigsMap.get(recordReaderConfigClassNameKey);
-
-    String fsPropPrefix = BatchConfigProperties.constructBatchProperty(_type, BatchConfigProperties.FS_PROP_PREFIX);
-    String recordReaderPropPrefix =
-        BatchConfigProperties.constructBatchProperty(_type, BatchConfigProperties.RECORD_READER_PROP_PREFIX);
+    _recordReaderClassName = batchConfigsMap.get(BatchConfigProperties.RECORD_READER_CLASS);
+    _recordReaderConfigClassName = batchConfigsMap.get(BatchConfigProperties.RECORD_READER_CONFIG_CLASS);
     for (Map.Entry<String, String> entry : batchConfigsMap.entrySet()) {
       String key = entry.getKey();
-      if (key.startsWith(fsPropPrefix)) {
+      if (key.startsWith(BatchConfigProperties.FS_PROP_PREFIX)) {
         _fsProps.put(key, entry.getValue());
-      } else if (key.startsWith(recordReaderPropPrefix)) {
+      } else if (key.startsWith(BatchConfigProperties.RECORD_READER_PROP_PREFIX)) {
         _recordReaderProps.put(key, entry.getValue());
       }
-      _batchConfigMap.put(key, entry.getValue());
     }
   }
 
@@ -91,10 +73,6 @@ public class BatchConfig {
     return _tableNameWithType;
   }
 
-  public String getType() {
-    return _type;
-  }
-
   public String getInputDirURI() {
     return _inputDirURI;
   }
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfigProperties.java b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfigProperties.java
index 5d276aa..857ed70 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfigProperties.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfigProperties.java
@@ -26,25 +26,45 @@ import org.apache.commons.lang3.StringUtils;
  */
 public class BatchConfigProperties {
 
-  public static final String DOT_SEPARATOR = ".";
-  public static final String BATCH_PREFIX = "batch";
+  public static final String TABLE_CONFIGS = "tableConfigs";
+  public static final String TABLE_NAME = "tableName";
 
-  public static final String BATCH_TYPE = "batchType";
   public static final String INPUT_DIR_URI = "inputDirURI";
   public static final String OUTPUT_DIR_URI = "outputDirURI";
   public static final String FS_CLASS = "fs.className";
   public static final String FS_PROP_PREFIX = "fs.prop";
   public static final String INPUT_FORMAT = "inputFormat";
+  public static final String INCLUDE_FILE_NAME_PATTERN = "includeFileNamePattern";
+  public static final String EXCLUDE_FILE_NAME_PATTERN = "excludeFileNamePattern";
   public static final String RECORD_READER_CLASS = "recordReader.className";
   public static final String RECORD_READER_CONFIG_CLASS = "recordReader.config.className";
   public static final String RECORD_READER_PROP_PREFIX = "recordReader.prop";
-
+  public static final String SCHEMA = "schema";
+  public static final String SCHEMA_URI = "schemaURI";
+  public static final String SEQUENCE_ID = "sequenceId";
+  public static final String SEGMENT_NAME_GENERATOR_TYPE = "segmentNameGenerator.type";
+  public static final String SEGMENT_NAME_GENERATOR_CONFIGS = "segmentNameGenerator.configs";
+  public static final String OVERWRITE_OUTPUT = "overwriteOutput";
   public static final String INPUT_DATA_FILE_URI_KEY = "input.data.file.uri";
-  /**
-   * Helper method to create a batch config property
-   */
-  public static String constructBatchProperty(String batchType, String property) {
-    return StringUtils.join(BATCH_PREFIX, batchType, property, DOT_SEPARATOR);
+  public static final String PUSH_MODE = "push.mode";
+  public static final String PUSH_CONTROLLER_URI = "push.controller.uri";
+  public static final String PUSH_SEGMENT_URI_PREFIX = "push.segmentUriPrefix";
+  public static final String PUSH_SEGMENT_URI_SUFFIX = "push.segmentUriSuffix";
+
+  public static final String INPUT_FILE_URI = "input.file.uri";
+  public static final String OUTPUT_SEGMENT_DIR_URI = "output.segment.dir.uri";
+
+  public enum SegmentIngestionType {
+    APPEND, REPLACE
   }
 
+  public class SegmentNameGeneratorType {
+    public static final String SIMPLE = "simple";
+    public static final String NORMALIZED_DATE = "normalizedDate";
+    public static final String FIXED = "fixed";
+  }
+
+  public enum SegmentPushType {
+    TAR, URI, METADATA
+  }
 }
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java
index 62499dd..f14895e 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java
@@ -19,17 +19,23 @@
 package org.apache.pinot.spi.utils;
 
 import com.google.common.base.Preconditions;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
 
 
 /**
  * Helper methods for extracting fields from IngestionConfig in a backward compatible manner
  */
 public final class IngestionConfigUtils {
+  public static final String DOT_SEPARATOR = ".";
+  private static final String DEFAULT_PUSH_MODE = "metadata";
+
   /**
    * Fetches the streamConfig from the given realtime table.
    * First, the ingestionConfigs->stream->streamConfigs will be checked.
@@ -42,8 +48,10 @@ public final class IngestionConfigUtils {
     Preconditions.checkState(tableConfig.getTableType() == TableType.REALTIME,
         "Cannot fetch streamConfigs for OFFLINE table: %s", tableNameWithType);
     Map<String, String> streamConfigMap = null;
-    if (tableConfig.getIngestionConfig() != null && tableConfig.getIngestionConfig().getStreamIngestionConfig() != null) {
-      List<Map<String, String>> streamConfigMaps = tableConfig.getIngestionConfig().getStreamIngestionConfig().getStreamConfigMaps();
+    if (tableConfig.getIngestionConfig() != null
+        && tableConfig.getIngestionConfig().getStreamIngestionConfig() != null) {
+      List<Map<String, String>> streamConfigMaps =
+          tableConfig.getIngestionConfig().getStreamIngestionConfig().getStreamConfigMaps();
       Preconditions.checkState(streamConfigMaps.size() == 1, "Only 1 stream supported per table");
       streamConfigMap = streamConfigMaps.get(0);
     }
@@ -92,4 +100,34 @@ public final class IngestionConfigUtils {
     return segmentIngestionFrequency;
   }
 
+  public static PinotConfiguration getFsProps(Map<String, String> batchConfigMap) {
+    return new PinotConfiguration(getPropsWithPrefix(batchConfigMap, BatchConfigProperties.FS_PROP_PREFIX + DOT_SEPARATOR));
+  }
+
+  public static Map<String, Object> getPropsWithPrefix(Map<String, String> batchConfigMap, String prefix) {
+    Map<String, Object> props = new HashMap<>();
+    props.putAll(getConfigMapWithPrefix(batchConfigMap, prefix));
+    return props;
+  }
+
+  public static Map<String, String> getConfigMapWithPrefix(Map<String, String> batchConfigMap, String prefix) {
+    Map<String, String> props = new HashMap<>();
+    for (String configKey : batchConfigMap.keySet()) {
+      if (configKey.startsWith(prefix)) {
+        String[] splits = configKey.split(prefix, 2);
+        if (splits.length > 1) {
+          props.put(splits[1], batchConfigMap.get(configKey));
+        }
+      }
+    }
+    return props;
+  }
+
+  public static String getPushMode(Map<String, String> batchConfigMap) {
+    String pushMode = batchConfigMap.get(BatchConfigProperties.PUSH_MODE);
+    if (pushMode == null) {
+      pushMode = DEFAULT_PUSH_MODE;
+    }
+    return pushMode;
+  }
 }
diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/ingestion/batch/BatchConfigTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/ingestion/batch/BatchConfigTest.java
index bc34f1a..75bf417 100644
--- a/pinot-spi/src/test/java/org/apache/pinot/spi/ingestion/batch/BatchConfigTest.java
+++ b/pinot-spi/src/test/java/org/apache/pinot/spi/ingestion/batch/BatchConfigTest.java
@@ -36,7 +36,6 @@ public class BatchConfigTest {
   public void testBatchConfig() {
     Map<String, String> batchConfigMap = new HashMap<>();
     String tableName = "foo_REALTIME";
-    String batchType = "s3";
     String inputDir = "s3://foo/input";
     String outputDir = "s3://foo/output";
     String fsClass = "org.apache.S3FS";
@@ -46,34 +45,18 @@ public class BatchConfigTest {
     String recordReaderClass = "org.foo.CSVRecordReader";
     String recordReaderConfigClass = "org.foo.CSVRecordReaderConfig";
     String separator = "|";
-    batchConfigMap.put(BatchConfigProperties.BATCH_TYPE, batchType);
-    batchConfigMap
-        .put(BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.INPUT_DIR_URI), inputDir);
-    batchConfigMap
-        .put(BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.OUTPUT_DIR_URI), outputDir);
-    batchConfigMap
-        .put(BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.FS_CLASS), fsClass);
-    batchConfigMap
-        .put(BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.INPUT_FORMAT), inputFormat);
-    batchConfigMap
-        .put(BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.RECORD_READER_CLASS),
-            recordReaderClass);
-    batchConfigMap
-        .put(BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.RECORD_READER_CONFIG_CLASS),
-            recordReaderConfigClass);
-    batchConfigMap
-        .put(BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.FS_PROP_PREFIX) + ".region",
-            region);
-    batchConfigMap.put(
-        BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.FS_PROP_PREFIX) + ".username",
-        username);
-    batchConfigMap.put(
-        BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.RECORD_READER_PROP_PREFIX)
-            + ".separator", separator);
+    batchConfigMap.put(BatchConfigProperties.INPUT_DIR_URI, inputDir);
+    batchConfigMap.put(BatchConfigProperties.OUTPUT_DIR_URI, outputDir);
+    batchConfigMap.put(BatchConfigProperties.FS_CLASS, fsClass);
+    batchConfigMap.put(BatchConfigProperties.INPUT_FORMAT, inputFormat);
+    batchConfigMap.put(BatchConfigProperties.RECORD_READER_CLASS, recordReaderClass);
+    batchConfigMap.put(BatchConfigProperties.RECORD_READER_CONFIG_CLASS, recordReaderConfigClass);
+    batchConfigMap.put(BatchConfigProperties.FS_PROP_PREFIX + ".region", region);
+    batchConfigMap.put(BatchConfigProperties.FS_PROP_PREFIX + ".username", username);
+    batchConfigMap.put(BatchConfigProperties.RECORD_READER_PROP_PREFIX + ".separator", separator);
 
     // config with all the right properties
     BatchConfig batchConfig = new BatchConfig(tableName, batchConfigMap);
-    assertEquals(batchConfig.getType(), batchType);
     assertEquals(batchConfig.getInputDirURI(), inputDir);
     assertEquals(batchConfig.getOutputDirURI(), outputDir);
     assertEquals(batchConfig.getFsClassName(), fsClass);
@@ -81,31 +64,16 @@ public class BatchConfigTest {
     assertEquals(batchConfig.getRecordReaderClassName(), recordReaderClass);
     assertEquals(batchConfig.getRecordReaderConfigClassName(), recordReaderConfigClass);
     assertEquals(batchConfig.getFsProps().size(), 2);
-    assertEquals(batchConfig.getFsProps().get(
-        BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.FS_PROP_PREFIX) + ".region"),
-        region);
-    assertEquals(batchConfig.getFsProps().get(
-        BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.FS_PROP_PREFIX) + ".username"),
-        username);
+    assertEquals(batchConfig.getFsProps().get(BatchConfigProperties.FS_PROP_PREFIX + ".region"), region);
+    assertEquals(batchConfig.getFsProps().get(BatchConfigProperties.FS_PROP_PREFIX + ".username"), username);
     assertEquals(batchConfig.getRecordReaderProps().size(), 1);
-    assertEquals(batchConfig.getRecordReaderProps().get(
-        BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.RECORD_READER_PROP_PREFIX)
-            + ".separator"), separator);
+    assertEquals(batchConfig.getRecordReaderProps().get(BatchConfigProperties.RECORD_READER_PROP_PREFIX + ".separator"),
+        separator);
     assertEquals(batchConfig.getTableNameWithType(), tableName);
 
     // Missing props
     Map<String, String> testBatchConfigMap = new HashMap<>(batchConfigMap);
-    testBatchConfigMap.remove(BatchConfigProperties.BATCH_TYPE);
-    try {
-      new BatchConfig(tableName, testBatchConfigMap);
-      Assert.fail("Should fail for missing 'batchType");
-    } catch (IllegalStateException e) {
-      // expected
-    }
-
-    testBatchConfigMap = new HashMap<>(batchConfigMap);
-    testBatchConfigMap
-        .remove(BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.INPUT_DIR_URI));
+    testBatchConfigMap.remove(BatchConfigProperties.INPUT_DIR_URI);
     try {
       new BatchConfig(tableName, testBatchConfigMap);
       Assert.fail("Should fail for missing 'inputDirURI");
@@ -114,8 +82,7 @@ public class BatchConfigTest {
     }
 
     testBatchConfigMap = new HashMap<>(batchConfigMap);
-    testBatchConfigMap
-        .remove(BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.OUTPUT_DIR_URI));
+    testBatchConfigMap.remove(BatchConfigProperties.OUTPUT_DIR_URI);
     try {
       new BatchConfig(tableName, testBatchConfigMap);
       Assert.fail("Should fail for missing 'outputDirURI");
@@ -124,8 +91,7 @@ public class BatchConfigTest {
     }
 
     testBatchConfigMap = new HashMap<>(batchConfigMap);
-    testBatchConfigMap
-        .remove(BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.INPUT_FORMAT));
+    testBatchConfigMap.remove(BatchConfigProperties.INPUT_FORMAT);
     try {
       new BatchConfig(tableName, testBatchConfigMap);
       Assert.fail("Should fail for missing 'inputFormat");
@@ -134,38 +100,12 @@ public class BatchConfigTest {
     }
 
     testBatchConfigMap = new HashMap<>(batchConfigMap);
-    testBatchConfigMap
-        .put(BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.INPUT_FORMAT), "moo");
+    testBatchConfigMap.put(BatchConfigProperties.INPUT_FORMAT, "moo");
     try {
       new BatchConfig(tableName, testBatchConfigMap);
       Assert.fail("Should fail for incorrect 'inputFormat");
     } catch (IllegalArgumentException e) {
       // expected
     }
-
-    testBatchConfigMap = new HashMap<>(batchConfigMap);
-    testBatchConfigMap
-        .remove(BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.RECORD_READER_CLASS));
-    try {
-      new BatchConfig(tableName, testBatchConfigMap);
-      Assert.fail("Should fail for missing 'recordReaderClassName");
-    } catch (IllegalStateException e) {
-      // expected
-    }
-
-    testBatchConfigMap = new HashMap<>(batchConfigMap);
-    testBatchConfigMap
-        .remove(BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.RECORD_READER_CONFIG_CLASS));
-    new BatchConfig(tableName, testBatchConfigMap);
-
-    testBatchConfigMap = new HashMap<>(batchConfigMap);
-    testBatchConfigMap
-        .remove(BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.FS_CLASS));
-    try {
-      new BatchConfig(tableName, testBatchConfigMap);
-      Assert.fail("Should fail for missing 'fsClassName");
-    } catch (IllegalStateException e) {
-      // expected
-    }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org