You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2020/12/08 06:42:02 UTC

[incubator-pinot] 01/01: simplify batch config and corresponding utils

This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch adding_pinot_minion_segment_creation_tasks_1
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit adc0f0c31bc3bfe4e99a1100265e72401d114ad6
Author: Xiang Fu <fx...@gmail.com>
AuthorDate: Mon Dec 7 22:40:40 2020 -0800

    simplify batch config and corresponding utils
---
 .../pinot/core/util/TableConfigUtilsTest.java      | 18 ++---
 .../pinot/spi/filesystem/PinotFSFactory.java       |  2 +-
 .../pinot/spi/ingestion/batch/BatchConfig.java     | 52 ++++--------
 .../spi/ingestion/batch/BatchConfigProperties.java | 38 ++++++---
 .../pinot/spi/utils/IngestionConfigUtils.java      | 42 +++++++++-
 .../pinot/spi/ingestion/batch/BatchConfigTest.java | 94 ++++------------------
 6 files changed, 108 insertions(+), 138 deletions(-)

diff --git a/pinot-core/src/test/java/org/apache/pinot/core/util/TableConfigUtilsTest.java b/pinot-core/src/test/java/org/apache/pinot/core/util/TableConfigUtilsTest.java
index ae313c0..362cc8f 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/util/TableConfigUtilsTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/util/TableConfigUtilsTest.java
@@ -377,16 +377,11 @@ public class TableConfigUtilsTest {
   @Test
   public void ingestionBatchConfigsTest() {
     Map<String, String> batchConfigMap = new HashMap<>();
-    batchConfigMap.put(BatchConfigProperties.BATCH_TYPE, "s3");
-    batchConfigMap
-        .put(BatchConfigProperties.constructBatchProperty("s3", BatchConfigProperties.INPUT_DIR_URI), "s3://foo");
-    batchConfigMap
-        .put(BatchConfigProperties.constructBatchProperty("s3", BatchConfigProperties.OUTPUT_DIR_URI), "s3://bar");
-    batchConfigMap
-        .put(BatchConfigProperties.constructBatchProperty("s3", BatchConfigProperties.FS_CLASS), "org.foo.S3FS");
-    batchConfigMap.put(BatchConfigProperties.constructBatchProperty("s3", BatchConfigProperties.INPUT_FORMAT), "avro");
-    batchConfigMap.put(BatchConfigProperties.constructBatchProperty("s3", BatchConfigProperties.RECORD_READER_CLASS),
-        "org.foo.Reader");
+    batchConfigMap.put(BatchConfigProperties.INPUT_DIR_URI, "s3://foo");
+    batchConfigMap.put(BatchConfigProperties.OUTPUT_DIR_URI, "s3://bar");
+    batchConfigMap.put(BatchConfigProperties.FS_CLASS, "org.foo.S3FS");
+    batchConfigMap.put(BatchConfigProperties.INPUT_FORMAT, "avro");
+    batchConfigMap.put(BatchConfigProperties.RECORD_READER_CLASS, "org.foo.Reader");
 
     IngestionConfig ingestionConfig =
         new IngestionConfig(new BatchIngestionConfig(Lists.newArrayList(batchConfigMap, batchConfigMap), null, null),
@@ -395,8 +390,7 @@ public class TableConfigUtilsTest {
         new TableConfigBuilder(TableType.OFFLINE).setTableName("myTable_OFFLINE").setIngestionConfig(ingestionConfig)
             .build();
     TableConfigUtils.validateIngestionConfig(tableConfig, null);
-
-    batchConfigMap.remove(BatchConfigProperties.BATCH_TYPE);
+    batchConfigMap.remove(BatchConfigProperties.INPUT_DIR_URI);
     try {
       TableConfigUtils.validateIngestionConfig(tableConfig, null);
       Assert.fail("Should fail for invalid batch config map");
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/PinotFSFactory.java b/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/PinotFSFactory.java
index b163f5c..6366d95 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/PinotFSFactory.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/PinotFSFactory.java
@@ -38,8 +38,8 @@ public class PinotFSFactory {
   private PinotFSFactory() {
   }
 
+  public static final String LOCAL_PINOT_FS_SCHEME = "file";
   private static final Logger LOGGER = LoggerFactory.getLogger(PinotFSFactory.class);
-  private static final String LOCAL_PINOT_FS_SCHEME = "file";
   private static final String CLASS = "class";
   private static final Map<String, PinotFS> PINOT_FS_MAP = new HashMap<String, PinotFS>() {
     {
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfig.java
index 965eb9a..bd4731a 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfig.java
@@ -28,8 +28,9 @@ import org.apache.pinot.spi.data.readers.FileFormat;
  * Provides all config related to the batch data source, as configured in the table config's ingestion config
  */
 public class BatchConfig {
+  private final Map<String, String> _batchConfigMap;
+
   private final String _tableNameWithType;
-  private final String _type;
   private final String _inputDirURI;
   private final String _outputDirURI;
   private final String _fsClassName;
@@ -39,51 +40,32 @@ public class BatchConfig {
   private final String _recordReaderConfigClassName;
   private final Map<String, String> _recordReaderProps = new HashMap<>();
 
-  private final Map<String, String> _batchConfigMap = new HashMap<>();
-
   public BatchConfig(String tableNameWithType, Map<String, String> batchConfigsMap) {
-    _tableNameWithType = tableNameWithType;
+    _batchConfigMap = batchConfigsMap;
 
-    _type = batchConfigsMap.get(BatchConfigProperties.BATCH_TYPE);
-    Preconditions.checkState(_type != null, "Property: %s cannot be null", BatchConfigProperties.BATCH_TYPE);
+    _tableNameWithType = tableNameWithType;
 
-    String inputDirURIKey = BatchConfigProperties.constructBatchProperty(_type, BatchConfigProperties.INPUT_DIR_URI);
-    _inputDirURI = batchConfigsMap.get(inputDirURIKey);
-    Preconditions.checkState(_inputDirURI != null, "Property: %s cannot be null", inputDirURIKey);
+    _inputDirURI = batchConfigsMap.get(BatchConfigProperties.INPUT_DIR_URI);
+    Preconditions.checkState(_inputDirURI != null, "Property: %s cannot be null", BatchConfigProperties.INPUT_DIR_URI);
 
-    String outputDirURIKey = BatchConfigProperties.constructBatchProperty(_type, BatchConfigProperties.OUTPUT_DIR_URI);
-    _outputDirURI = batchConfigsMap.get(outputDirURIKey);
-    Preconditions.checkState(_outputDirURI != null, "Property: %s cannot be null", outputDirURIKey);
+    _outputDirURI = batchConfigsMap.get(BatchConfigProperties.OUTPUT_DIR_URI);
+    Preconditions.checkState(_outputDirURI != null, "Property: %s cannot be null", BatchConfigProperties.OUTPUT_DIR_URI);
 
-    String fsClassNameKey = BatchConfigProperties.constructBatchProperty(_type, BatchConfigProperties.FS_CLASS);
-    _fsClassName = batchConfigsMap.get(fsClassNameKey);
-    Preconditions.checkState(_fsClassName != null, "Property: %s cannot be null", fsClassNameKey);
+    _fsClassName = batchConfigsMap.get(BatchConfigProperties.FS_CLASS);
 
-    String inputFormatKey = BatchConfigProperties.constructBatchProperty(_type, BatchConfigProperties.INPUT_FORMAT);
-    String inputFormat = batchConfigsMap.get(inputFormatKey);
-    Preconditions.checkState(inputFormat != null, "Property: %s cannot be null", inputFormat);
+    String inputFormat = batchConfigsMap.get(BatchConfigProperties.INPUT_FORMAT);
+    Preconditions.checkState(inputFormat != null, "Property: %s cannot be null", BatchConfigProperties.INPUT_FORMAT);
     _inputFormat = FileFormat.valueOf(inputFormat.toUpperCase());
 
-    String recordReaderClassNameKey =
-        BatchConfigProperties.constructBatchProperty(_type, BatchConfigProperties.RECORD_READER_CLASS);
-    _recordReaderClassName = batchConfigsMap.get(recordReaderClassNameKey);
-    Preconditions.checkState(_recordReaderClassName != null, "Property: %s cannot be null", recordReaderClassNameKey);
-
-    String recordReaderConfigClassNameKey =
-        BatchConfigProperties.constructBatchProperty(_type, BatchConfigProperties.RECORD_READER_CONFIG_CLASS);
-    _recordReaderConfigClassName = batchConfigsMap.get(recordReaderConfigClassNameKey);
-
-    String fsPropPrefix = BatchConfigProperties.constructBatchProperty(_type, BatchConfigProperties.FS_PROP_PREFIX);
-    String recordReaderPropPrefix =
-        BatchConfigProperties.constructBatchProperty(_type, BatchConfigProperties.RECORD_READER_PROP_PREFIX);
+    _recordReaderClassName = batchConfigsMap.get(BatchConfigProperties.RECORD_READER_CLASS);
+    _recordReaderConfigClassName = batchConfigsMap.get(BatchConfigProperties.RECORD_READER_CONFIG_CLASS);
     for (Map.Entry<String, String> entry : batchConfigsMap.entrySet()) {
       String key = entry.getKey();
-      if (key.startsWith(fsPropPrefix)) {
+      if (key.startsWith(BatchConfigProperties.FS_PROP_PREFIX)) {
         _fsProps.put(key, entry.getValue());
-      } else if (key.startsWith(recordReaderPropPrefix)) {
+      } else if (key.startsWith(BatchConfigProperties.RECORD_READER_PROP_PREFIX)) {
         _recordReaderProps.put(key, entry.getValue());
       }
-      _batchConfigMap.put(key, entry.getValue());
     }
   }
 
@@ -91,10 +73,6 @@ public class BatchConfig {
     return _tableNameWithType;
   }
 
-  public String getType() {
-    return _type;
-  }
-
   public String getInputDirURI() {
     return _inputDirURI;
   }
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfigProperties.java b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfigProperties.java
index 5d276aa..857ed70 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfigProperties.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfigProperties.java
@@ -26,25 +26,45 @@ import org.apache.commons.lang3.StringUtils;
  */
 public class BatchConfigProperties {
 
-  public static final String DOT_SEPARATOR = ".";
-  public static final String BATCH_PREFIX = "batch";
+  public static final String TABLE_CONFIGS = "tableConfigs";
+  public static final String TABLE_NAME = "tableName";
 
-  public static final String BATCH_TYPE = "batchType";
   public static final String INPUT_DIR_URI = "inputDirURI";
   public static final String OUTPUT_DIR_URI = "outputDirURI";
   public static final String FS_CLASS = "fs.className";
   public static final String FS_PROP_PREFIX = "fs.prop";
   public static final String INPUT_FORMAT = "inputFormat";
+  public static final String INCLUDE_FILE_NAME_PATTERN = "includeFileNamePattern";
+  public static final String EXCLUDE_FILE_NAME_PATTERN = "excludeFileNamePattern";
   public static final String RECORD_READER_CLASS = "recordReader.className";
   public static final String RECORD_READER_CONFIG_CLASS = "recordReader.config.className";
   public static final String RECORD_READER_PROP_PREFIX = "recordReader.prop";
-
+  public static final String SCHEMA = "schema";
+  public static final String SCHEMA_URI = "schemaURI";
+  public static final String SEQUENCE_ID = "sequenceId";
+  public static final String SEGMENT_NAME_GENERATOR_TYPE = "segmentNameGenerator.type";
+  public static final String SEGMENT_NAME_GENERATOR_CONFIGS = "segmentNameGenerator.configs";
+  public static final String OVERWRITE_OUTPUT = "overwriteOutput";
   public static final String INPUT_DATA_FILE_URI_KEY = "input.data.file.uri";
-  /**
-   * Helper method to create a batch config property
-   */
-  public static String constructBatchProperty(String batchType, String property) {
-    return StringUtils.join(BATCH_PREFIX, batchType, property, DOT_SEPARATOR);
+  public static final String PUSH_MODE = "push.mode";
+  public static final String PUSH_CONTROLLER_URI = "push.controller.uri";
+  public static final String PUSH_SEGMENT_URI_PREFIX = "push.segmentUriPrefix";
+  public static final String PUSH_SEGMENT_URI_SUFFIX = "push.segmentUriSuffix";
+
+  public static final String INPUT_FILE_URI = "input.file.uri";
+  public static final String OUTPUT_SEGMENT_DIR_URI = "output.segment.dir.uri";
+
+  public enum SegmentIngestionType {
+    APPEND, REPLACE
   }
 
+  public class SegmentNameGeneratorType {
+    public static final String SIMPLE = "simple";
+    public static final String NORMALIZED_DATE = "normalizedDate";
+    public static final String FIXED = "fixed";
+  }
+
+  public enum SegmentPushType {
+    TAR, URI, METADATA
+  }
 }
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java
index 62499dd..f14895e 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java
@@ -19,17 +19,23 @@
 package org.apache.pinot.spi.utils;
 
 import com.google.common.base.Preconditions;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
 
 
 /**
  * Helper methods for extracting fields from IngestionConfig in a backward compatible manner
  */
 public final class IngestionConfigUtils {
+  public static final String DOT_SEPARATOR = ".";
+  private static final String DEFAULT_PUSH_MODE = "metadata";
+
   /**
    * Fetches the streamConfig from the given realtime table.
    * First, the ingestionConfigs->stream->streamConfigs will be checked.
@@ -42,8 +48,10 @@ public final class IngestionConfigUtils {
     Preconditions.checkState(tableConfig.getTableType() == TableType.REALTIME,
         "Cannot fetch streamConfigs for OFFLINE table: %s", tableNameWithType);
     Map<String, String> streamConfigMap = null;
-    if (tableConfig.getIngestionConfig() != null && tableConfig.getIngestionConfig().getStreamIngestionConfig() != null) {
-      List<Map<String, String>> streamConfigMaps = tableConfig.getIngestionConfig().getStreamIngestionConfig().getStreamConfigMaps();
+    if (tableConfig.getIngestionConfig() != null
+        && tableConfig.getIngestionConfig().getStreamIngestionConfig() != null) {
+      List<Map<String, String>> streamConfigMaps =
+          tableConfig.getIngestionConfig().getStreamIngestionConfig().getStreamConfigMaps();
       Preconditions.checkState(streamConfigMaps.size() == 1, "Only 1 stream supported per table");
       streamConfigMap = streamConfigMaps.get(0);
     }
@@ -92,4 +100,34 @@ public final class IngestionConfigUtils {
     return segmentIngestionFrequency;
   }
 
+  public static PinotConfiguration getFsProps(Map<String, String> batchConfigMap) {
+    return new PinotConfiguration(getPropsWithPrefix(batchConfigMap, BatchConfigProperties.FS_PROP_PREFIX + DOT_SEPARATOR));
+  }
+
+  public static Map<String, Object> getPropsWithPrefix(Map<String, String> batchConfigMap, String prefix) {
+    Map<String, Object> props = new HashMap<>();
+    props.putAll(getConfigMapWithPrefix(batchConfigMap, prefix));
+    return props;
+  }
+
+  public static Map<String, String> getConfigMapWithPrefix(Map<String, String> batchConfigMap, String prefix) {
+    Map<String, String> props = new HashMap<>();
+    for (String configKey : batchConfigMap.keySet()) {
+      if (configKey.startsWith(prefix)) {
+        String[] splits = configKey.split(prefix, 2);
+        if (splits.length > 1) {
+          props.put(splits[1], batchConfigMap.get(configKey));
+        }
+      }
+    }
+    return props;
+  }
+
+  public static String getPushMode(Map<String, String> batchConfigMap) {
+    String pushMode = batchConfigMap.get(BatchConfigProperties.PUSH_MODE);
+    if (pushMode == null) {
+      pushMode = DEFAULT_PUSH_MODE;
+    }
+    return pushMode;
+  }
 }
diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/ingestion/batch/BatchConfigTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/ingestion/batch/BatchConfigTest.java
index bc34f1a..75bf417 100644
--- a/pinot-spi/src/test/java/org/apache/pinot/spi/ingestion/batch/BatchConfigTest.java
+++ b/pinot-spi/src/test/java/org/apache/pinot/spi/ingestion/batch/BatchConfigTest.java
@@ -36,7 +36,6 @@ public class BatchConfigTest {
   public void testBatchConfig() {
     Map<String, String> batchConfigMap = new HashMap<>();
     String tableName = "foo_REALTIME";
-    String batchType = "s3";
     String inputDir = "s3://foo/input";
     String outputDir = "s3://foo/output";
     String fsClass = "org.apache.S3FS";
@@ -46,34 +45,18 @@ public class BatchConfigTest {
     String recordReaderClass = "org.foo.CSVRecordReader";
     String recordReaderConfigClass = "org.foo.CSVRecordReaderConfig";
     String separator = "|";
-    batchConfigMap.put(BatchConfigProperties.BATCH_TYPE, batchType);
-    batchConfigMap
-        .put(BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.INPUT_DIR_URI), inputDir);
-    batchConfigMap
-        .put(BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.OUTPUT_DIR_URI), outputDir);
-    batchConfigMap
-        .put(BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.FS_CLASS), fsClass);
-    batchConfigMap
-        .put(BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.INPUT_FORMAT), inputFormat);
-    batchConfigMap
-        .put(BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.RECORD_READER_CLASS),
-            recordReaderClass);
-    batchConfigMap
-        .put(BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.RECORD_READER_CONFIG_CLASS),
-            recordReaderConfigClass);
-    batchConfigMap
-        .put(BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.FS_PROP_PREFIX) + ".region",
-            region);
-    batchConfigMap.put(
-        BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.FS_PROP_PREFIX) + ".username",
-        username);
-    batchConfigMap.put(
-        BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.RECORD_READER_PROP_PREFIX)
-            + ".separator", separator);
+    batchConfigMap.put(BatchConfigProperties.INPUT_DIR_URI, inputDir);
+    batchConfigMap.put(BatchConfigProperties.OUTPUT_DIR_URI, outputDir);
+    batchConfigMap.put(BatchConfigProperties.FS_CLASS, fsClass);
+    batchConfigMap.put(BatchConfigProperties.INPUT_FORMAT, inputFormat);
+    batchConfigMap.put(BatchConfigProperties.RECORD_READER_CLASS, recordReaderClass);
+    batchConfigMap.put(BatchConfigProperties.RECORD_READER_CONFIG_CLASS, recordReaderConfigClass);
+    batchConfigMap.put(BatchConfigProperties.FS_PROP_PREFIX + ".region", region);
+    batchConfigMap.put(BatchConfigProperties.FS_PROP_PREFIX + ".username", username);
+    batchConfigMap.put(BatchConfigProperties.RECORD_READER_PROP_PREFIX + ".separator", separator);
 
     // config with all the right properties
     BatchConfig batchConfig = new BatchConfig(tableName, batchConfigMap);
-    assertEquals(batchConfig.getType(), batchType);
     assertEquals(batchConfig.getInputDirURI(), inputDir);
     assertEquals(batchConfig.getOutputDirURI(), outputDir);
     assertEquals(batchConfig.getFsClassName(), fsClass);
@@ -81,31 +64,16 @@ public class BatchConfigTest {
     assertEquals(batchConfig.getRecordReaderClassName(), recordReaderClass);
     assertEquals(batchConfig.getRecordReaderConfigClassName(), recordReaderConfigClass);
     assertEquals(batchConfig.getFsProps().size(), 2);
-    assertEquals(batchConfig.getFsProps().get(
-        BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.FS_PROP_PREFIX) + ".region"),
-        region);
-    assertEquals(batchConfig.getFsProps().get(
-        BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.FS_PROP_PREFIX) + ".username"),
-        username);
+    assertEquals(batchConfig.getFsProps().get(BatchConfigProperties.FS_PROP_PREFIX + ".region"), region);
+    assertEquals(batchConfig.getFsProps().get(BatchConfigProperties.FS_PROP_PREFIX + ".username"), username);
     assertEquals(batchConfig.getRecordReaderProps().size(), 1);
-    assertEquals(batchConfig.getRecordReaderProps().get(
-        BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.RECORD_READER_PROP_PREFIX)
-            + ".separator"), separator);
+    assertEquals(batchConfig.getRecordReaderProps().get(BatchConfigProperties.RECORD_READER_PROP_PREFIX + ".separator"),
+        separator);
     assertEquals(batchConfig.getTableNameWithType(), tableName);
 
     // Missing props
     Map<String, String> testBatchConfigMap = new HashMap<>(batchConfigMap);
-    testBatchConfigMap.remove(BatchConfigProperties.BATCH_TYPE);
-    try {
-      new BatchConfig(tableName, testBatchConfigMap);
-      Assert.fail("Should fail for missing 'batchType");
-    } catch (IllegalStateException e) {
-      // expected
-    }
-
-    testBatchConfigMap = new HashMap<>(batchConfigMap);
-    testBatchConfigMap
-        .remove(BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.INPUT_DIR_URI));
+    testBatchConfigMap.remove(BatchConfigProperties.INPUT_DIR_URI);
     try {
       new BatchConfig(tableName, testBatchConfigMap);
       Assert.fail("Should fail for missing 'inputDirURI");
@@ -114,8 +82,7 @@ public class BatchConfigTest {
     }
 
     testBatchConfigMap = new HashMap<>(batchConfigMap);
-    testBatchConfigMap
-        .remove(BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.OUTPUT_DIR_URI));
+    testBatchConfigMap.remove(BatchConfigProperties.OUTPUT_DIR_URI);
     try {
       new BatchConfig(tableName, testBatchConfigMap);
       Assert.fail("Should fail for missing 'outputDirURI");
@@ -124,8 +91,7 @@ public class BatchConfigTest {
     }
 
     testBatchConfigMap = new HashMap<>(batchConfigMap);
-    testBatchConfigMap
-        .remove(BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.INPUT_FORMAT));
+    testBatchConfigMap.remove(BatchConfigProperties.INPUT_FORMAT);
     try {
       new BatchConfig(tableName, testBatchConfigMap);
       Assert.fail("Should fail for missing 'inputFormat");
@@ -134,38 +100,12 @@ public class BatchConfigTest {
     }
 
     testBatchConfigMap = new HashMap<>(batchConfigMap);
-    testBatchConfigMap
-        .put(BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.INPUT_FORMAT), "moo");
+    testBatchConfigMap.put(BatchConfigProperties.INPUT_FORMAT, "moo");
     try {
       new BatchConfig(tableName, testBatchConfigMap);
       Assert.fail("Should fail for incorrect 'inputFormat");
     } catch (IllegalArgumentException e) {
       // expected
     }
-
-    testBatchConfigMap = new HashMap<>(batchConfigMap);
-    testBatchConfigMap
-        .remove(BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.RECORD_READER_CLASS));
-    try {
-      new BatchConfig(tableName, testBatchConfigMap);
-      Assert.fail("Should fail for missing 'recordReaderClassName");
-    } catch (IllegalStateException e) {
-      // expected
-    }
-
-    testBatchConfigMap = new HashMap<>(batchConfigMap);
-    testBatchConfigMap
-        .remove(BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.RECORD_READER_CONFIG_CLASS));
-    new BatchConfig(tableName, testBatchConfigMap);
-
-    testBatchConfigMap = new HashMap<>(batchConfigMap);
-    testBatchConfigMap
-        .remove(BatchConfigProperties.constructBatchProperty(batchType, BatchConfigProperties.FS_CLASS));
-    try {
-      new BatchConfig(tableName, testBatchConfigMap);
-      Assert.fail("Should fail for missing 'fsClassName");
-    } catch (IllegalStateException e) {
-      // expected
-    }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org