You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2021/06/22 22:08:37 UTC

[incubator-pinot] branch master updated: Support data preprocessing for AVRO and ORC formats (#7062)

This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 01b6c22  Support data preprocessing for AVRO and ORC formats (#7062)
01b6c22 is described below

commit 01b6c221059c7d90ff618c47b09ea6db338a0b54
Author: Jialiang Li <jl...@linkedin.com>
AuthorDate: Tue Jun 22 15:07:53 2021 -0700

    Support data preprocessing for AVRO and ORC formats (#7062)
    
    Co-authored-by: Jack Li(Analytics Engineering) <jl...@jlli-mn1.linkedin.biz>
---
 .../v0_deprecated/pinot-hadoop/pom.xml             |   4 +
 .../hadoop/job/HadoopSegmentPreprocessingJob.java  | 382 +++++++--------------
 .../pinot/hadoop/job/InternalConfigConstants.java  |  13 +-
 .../job/mappers/AvroDataPreprocessingMapper.java   |  85 +++++
 .../job/mappers/OrcDataPreprocessingMapper.java    |  87 +++++
 .../job/mappers/SegmentPreprocessingMapper.java    |  53 ++-
 .../AvroDataPreprocessingPartitioner.java          |  77 +++++
 .../OrcDataPreprocessingPartitioner.java           |  83 +++++
 .../preprocess/AvroDataPreprocessingHelper.java    | 155 +++++++++
 .../job/preprocess/DataPreprocessingHelper.java    | 228 ++++++++++++
 .../preprocess/DataPreprocessingHelperFactory.java |  55 +++
 .../job/preprocess/OrcDataPreprocessingHelper.java | 231 +++++++++++++
 ...ucer.java => AvroDataPreprocessingReducer.java} |  45 +--
 ...ducer.java => OrcDataPreprocessingReducer.java} |  57 +--
 .../hadoop/utils/preprocess/DataFileUtils.java     |  62 ++++
 .../utils/preprocess/DataPreprocessingUtils.java   | 100 ++++++
 .../pinot/hadoop/utils/preprocess/HadoopUtils.java |  41 +++
 .../pinot/hadoop/utils/preprocess/OrcUtils.java    |  88 +++++
 .../hadoop/utils/preprocess/TextComparator.java    |  41 +++
 .../ingestion/jobs/SegmentPreprocessingJob.java    |  26 +-
 .../java/org/apache/pinot/spi/data/FieldSpec.java  |   7 +
 pom.xml                                            |   5 +
 22 files changed, 1569 insertions(+), 356 deletions(-)

diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/pom.xml b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/pom.xml
index 1c21482..339c832 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/pom.xml
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/pom.xml
@@ -200,6 +200,10 @@
       <classifier>hadoop2</classifier>
     </dependency>
     <dependency>
+      <groupId>org.apache.orc</groupId>
+      <artifactId>orc-mapreduce</artifactId>
+    </dependency>
+    <dependency>
       <groupId>javax.xml.bind</groupId>
       <artifactId>jaxb-api</artifactId>
     </dependency>
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/HadoopSegmentPreprocessingJob.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/HadoopSegmentPreprocessingJob.java
index b4e87fc..81a9902 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/HadoopSegmentPreprocessingJob.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/HadoopSegmentPreprocessingJob.java
@@ -27,66 +27,53 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
-import org.apache.avro.Schema;
-import org.apache.avro.file.DataFileStream;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.mapred.AvroKey;
-import org.apache.avro.mapred.AvroValue;
-import org.apache.avro.mapreduce.AvroJob;
-import org.apache.avro.mapreduce.AvroKeyOutputFormat;
-import org.apache.avro.mapreduce.AvroMultipleOutputs;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapred.JobContext;
 import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.pinot.hadoop.io.CombineAvroKeyInputFormat;
-import org.apache.pinot.hadoop.job.mappers.SegmentPreprocessingMapper;
-import org.apache.pinot.hadoop.job.partitioners.GenericPartitioner;
-import org.apache.pinot.hadoop.job.reducers.SegmentPreprocessingReducer;
+import org.apache.pinot.hadoop.job.preprocess.DataPreprocessingHelper;
+import org.apache.pinot.hadoop.job.preprocess.DataPreprocessingHelperFactory;
 import org.apache.pinot.hadoop.utils.PinotHadoopJobPreparationHelper;
+import org.apache.pinot.hadoop.utils.preprocess.DataPreprocessingUtils;
+import org.apache.pinot.hadoop.utils.preprocess.HadoopUtils;
 import org.apache.pinot.ingestion.common.ControllerRestApi;
 import org.apache.pinot.ingestion.common.JobConfigConstants;
 import org.apache.pinot.ingestion.jobs.SegmentPreprocessingJob;
-import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory;
 import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
+import org.apache.pinot.spi.config.table.FieldConfig;
 import org.apache.pinot.spi.config.table.IndexingConfig;
 import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
-import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableCustomConfig;
-import org.apache.pinot.spi.data.DateTimeFieldSpec;
-import org.apache.pinot.spi.data.DateTimeFormatSpec;
-import org.apache.pinot.spi.utils.IngestionConfigUtils;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
 /**
- * A Hadoop job which provides partitioning, sorting, and resizing against the input files, which is raw data in Avro format.
+ * A Hadoop job which provides partitioning, sorting, and resizing against the input files, which is raw data in either Avro or Orc format.
  * Thus, the output files are partitioned, sorted, resized after this job.
  * In order to run this job, the following configs need to be specified in job properties:
  * * enable.preprocessing: false by default. Enables preprocessing job.
  */
 public class HadoopSegmentPreprocessingJob extends SegmentPreprocessingJob {
-  private static final Logger _logger = LoggerFactory.getLogger(HadoopSegmentPreprocessingJob.class);
-  protected FileSystem _fileSystem;
+  private static final Logger LOGGER = LoggerFactory.getLogger(HadoopSegmentPreprocessingJob.class);
+
   private String _partitionColumn;
   private int _numPartitions;
   private String _partitionFunction;
-  private String _sortedColumn;
+
+  private String _sortingColumn;
+  private FieldSpec.DataType _sortingColumnType;
+
   private int _numOutputFiles;
+  private int _maxNumRecordsPerFile;
+
   private TableConfig _tableConfig;
   private org.apache.pinot.spi.data.Schema _pinotTableSchema;
 
+  private Set<DataPreprocessingUtils.Operation> _preprocessingOperations;
+
   public HadoopSegmentPreprocessingJob(final Properties properties) {
     super(properties);
   }
@@ -94,102 +81,39 @@ public class HadoopSegmentPreprocessingJob extends SegmentPreprocessingJob {
   public void run()
       throws Exception {
     if (!_enablePreprocessing) {
-      _logger.info("Pre-processing job is disabled.");
+      LOGGER.info("Pre-processing job is disabled.");
       return;
     } else {
-      _logger.info("Starting {}", getClass().getSimpleName());
+      LOGGER.info("Starting {}", getClass().getSimpleName());
     }
 
-    _fileSystem = FileSystem.get(_inputSegmentDir.toUri(), getConf());
-    final List<Path> inputDataPaths = getDataFilePaths(_inputSegmentDir);
-    Preconditions.checkState(inputDataPaths.size() != 0, "No files in the input directory.");
-
-    if (_fileSystem.exists(_preprocessedOutputDir)) {
-      _logger.warn("Found output folder {}, deleting", _preprocessedOutputDir);
-      _fileSystem.delete(_preprocessedOutputDir, true);
-    }
     setTableConfigAndSchema();
-
-    _logger.info("Initializing a pre-processing job");
-    Job job = Job.getInstance(getConf());
-
-    Path sampleAvroPath = inputDataPaths.get(0);
-    int numInputPaths = inputDataPaths.size();
-
-    setValidationConfigs(job, sampleAvroPath);
-    setHadoopJobConfigs(job, numInputPaths);
-
-    // Avro Schema configs.
-    Schema avroSchema = getSchema(sampleAvroPath);
-    _logger.info("Schema is: {}", avroSchema.toString(true));
-    setSchemaParams(job, avroSchema);
-
-    // Set up mapper output key
-    Set<Schema.Field> fieldSet = new HashSet<>();
-
+    fetchPreProcessingOperations();
     fetchPartitioningConfig();
     fetchSortingConfig();
     fetchResizingConfig();
-    validateConfigsAgainstSchema(avroSchema);
-
-    // Partition configs.
-    int numReduceTasks = 0;
-    if (_partitionColumn != null) {
-      numReduceTasks = _numPartitions;
-      job.getConfiguration().set(InternalConfigConstants.ENABLE_PARTITIONING, "true");
-      job.setPartitionerClass(GenericPartitioner.class);
-      job.getConfiguration().set(InternalConfigConstants.PARTITION_COLUMN_CONFIG, _partitionColumn);
-      if (_partitionFunction != null) {
-        job.getConfiguration().set(InternalConfigConstants.PARTITION_FUNCTION_CONFIG, _partitionFunction);
-      }
-      job.getConfiguration().set(InternalConfigConstants.NUM_PARTITIONS_CONFIG, Integer.toString(numReduceTasks));
-      setMaxNumRecordsConfigIfSpecified(job);
-    } else {
-      if (_numOutputFiles > 0) {
-        numReduceTasks = _numOutputFiles;
-      } else {
-        // default number of input paths
-        numReduceTasks = inputDataPaths.size();
-      }
-      // Partitioning is disabled. Adding hashcode as one of the fields to mapper output key.
-      // so that all the rows can be spread evenly.
-      addHashCodeField(fieldSet);
-    }
-    job.setInputFormatClass(CombineAvroKeyInputFormat.class);
-
-    _logger.info("Number of reduce tasks for pre-processing job: {}", numReduceTasks);
-    job.setNumReduceTasks(numReduceTasks);
-
-    // Sort config.
-    if (_sortedColumn != null) {
-      _logger.info("Adding sorted column: {} to job config", _sortedColumn);
-      job.getConfiguration().set(InternalConfigConstants.SORTED_COLUMN_CONFIG, _sortedColumn);
 
-      addSortedColumnField(avroSchema, fieldSet);
-    } else {
-      // If sorting is disabled, hashcode will be the only factor for sort/group comparator.
-      addHashCodeField(fieldSet);
-    }
+    // Cleans up preprocessed output dir if exists
+    cleanUpPreprocessedOutputs(_preprocessedOutputDir);
 
-    // Creates a wrapper for the schema of output key in mapper.
-    Schema mapperOutputKeySchema = Schema.createRecord(/*name*/"record", /*doc*/"", /*namespace*/"", false);
-    mapperOutputKeySchema.setFields(new ArrayList<>(fieldSet));
-    _logger.info("Mapper output schema: {}", mapperOutputKeySchema);
+    DataPreprocessingHelper dataPreprocessingHelper =
+        DataPreprocessingHelperFactory.generateDataPreprocessingHelper(_inputSegmentDir, _preprocessedOutputDir);
+    dataPreprocessingHelper
+        .registerConfigs(_tableConfig, _pinotTableSchema, _partitionColumn, _numPartitions, _partitionFunction,
+            _sortingColumn, _sortingColumnType, _numOutputFiles, _maxNumRecordsPerFile);
 
-    AvroJob.setInputKeySchema(job, avroSchema);
-    AvroJob.setMapOutputKeySchema(job, mapperOutputKeySchema);
-    AvroJob.setMapOutputValueSchema(job, avroSchema);
-    AvroJob.setOutputKeySchema(job, avroSchema);
+    Job job = dataPreprocessingHelper.setUpJob();
 
     // Since we aren't extending AbstractHadoopJob, we need to add the jars for the job to
     // distributed cache ourselves. Take a look at how the addFilesToDistributedCache is
     // implemented so that you know what it does.
-    _logger.info("HDFS class path: " + _pathToDependencyJar);
+    LOGGER.info("HDFS class path: " + _pathToDependencyJar);
     if (_pathToDependencyJar != null) {
-      _logger.info("Copying jars locally.");
-      PinotHadoopJobPreparationHelper.addDepsJarToDistributedCacheHelper(_fileSystem, job, _pathToDependencyJar);
+      LOGGER.info("Copying jars locally.");
+      PinotHadoopJobPreparationHelper
+          .addDepsJarToDistributedCacheHelper(HadoopUtils.DEFAULT_FILE_SYSTEM, job, _pathToDependencyJar);
     } else {
-      _logger.info("Property '{}' not specified.", JobConfigConstants.PATH_TO_DEPS_JAR);
+      LOGGER.info("Property '{}' not specified.", JobConfigConstants.PATH_TO_DEPS_JAR);
     }
 
     long startTime = System.currentTimeMillis();
@@ -199,11 +123,28 @@ public class HadoopSegmentPreprocessingJob extends SegmentPreprocessingJob {
       throw new RuntimeException("Job failed : " + job);
     }
 
-    _logger.info("Finished pre-processing job in {}ms", (System.currentTimeMillis() - startTime));
+    LOGGER.info("Finished pre-processing job in {}ms", (System.currentTimeMillis() - startTime));
+  }
+
+  private void fetchPreProcessingOperations() {
+    _preprocessingOperations = new HashSet<>();
+    TableCustomConfig customConfig = _tableConfig.getCustomConfig();
+    if (customConfig != null) {
+      Map<String, String> customConfigMap = customConfig.getCustomConfigs();
+      if (customConfigMap != null && !customConfigMap.isEmpty()) {
+        String preprocessingOperationsString =
+            customConfigMap.getOrDefault(InternalConfigConstants.PREPROCESS_OPERATIONS, "");
+        DataPreprocessingUtils.getOperations(_preprocessingOperations, preprocessingOperationsString);
+      }
+    }
   }
 
   private void fetchPartitioningConfig() {
     // Fetch partition info from table config.
+    if (!_preprocessingOperations.contains(DataPreprocessingUtils.Operation.PARTITION)) {
+      LOGGER.info("Partitioning is disabled.");
+      return;
+    }
     SegmentPartitionConfig segmentPartitionConfig = _tableConfig.getIndexingConfig().getSegmentPartitionConfig();
     if (segmentPartitionConfig != null) {
       Map<String, ColumnPartitionConfig> columnPartitionMap = segmentPartitionConfig.getColumnPartitionMap();
@@ -215,123 +156,96 @@ public class HadoopSegmentPreprocessingJob extends SegmentPreprocessingJob {
         _partitionFunction = segmentPartitionConfig.getFunctionName(_partitionColumn);
       }
     } else {
-      _logger.info("Segment partition config is null for table: {}", _tableConfig.getTableName());
+      LOGGER.info("Segment partition config is null for table: {}", _tableConfig.getTableName());
     }
   }
 
   private void fetchSortingConfig() {
-    // Fetch sorting info from table config.
+    if (!_preprocessingOperations.contains(DataPreprocessingUtils.Operation.SORT)) {
+      LOGGER.info("Sorting is disabled.");
+      return;
+    }
+    // Fetch sorting info from table config first.
+    List<String> sortingColumns = new ArrayList<>();
+    List<FieldConfig> fieldConfigs = _tableConfig.getFieldConfigList();
+    if (fieldConfigs != null && !fieldConfigs.isEmpty()) {
+      for (FieldConfig fieldConfig : fieldConfigs) {
+        if (fieldConfig.getIndexType() == FieldConfig.IndexType.SORTED) {
+          sortingColumns.add(fieldConfig.getName());
+        }
+      }
+    }
+    if (!sortingColumns.isEmpty()) {
+      Preconditions.checkArgument(sortingColumns.size() == 1, "There should be at most 1 sorted column in the table.");
+      _sortingColumn = sortingColumns.get(0);
+      return;
+    }
+
+    // There is no sorted column specified in field configs, try to find sorted column from indexing config.
     IndexingConfig indexingConfig = _tableConfig.getIndexingConfig();
     List<String> sortedColumns = indexingConfig.getSortedColumn();
     if (sortedColumns != null) {
       Preconditions.checkArgument(sortedColumns.size() <= 1, "There should be at most 1 sorted column in the table.");
       if (sortedColumns.size() == 1) {
-        _sortedColumn = sortedColumns.get(0);
+        _sortingColumn = sortedColumns.get(0);
+        FieldSpec fieldSpec = _pinotTableSchema.getFieldSpecFor(_sortingColumn);
+        Preconditions.checkState(fieldSpec != null, "Failed to find sorting column: {} in the schema", _sortingColumn);
+        Preconditions
+            .checkState(fieldSpec.isSingleValueField(), "Cannot sort on multi-value column: %s", _sortingColumn);
+        _sortingColumnType = fieldSpec.getDataType();
+        Preconditions
+            .checkState(_sortingColumnType.canBeASortedColumn(), "Cannot sort on %s column: %s", _sortingColumnType,
+                _sortingColumn);
+        LOGGER.info("Sorting the data with column: {} of type: {}", _sortingColumn, _sortingColumnType);
       }
     }
   }
 
   private void fetchResizingConfig() {
+    if (!_preprocessingOperations.contains(DataPreprocessingUtils.Operation.RESIZE)) {
+      LOGGER.info("Resizing is disabled.");
+      return;
+    }
     TableCustomConfig tableCustomConfig = _tableConfig.getCustomConfig();
     if (tableCustomConfig == null) {
       _numOutputFiles = 0;
       return;
     }
     Map<String, String> customConfigsMap = tableCustomConfig.getCustomConfigs();
-    if (customConfigsMap != null && customConfigsMap.containsKey(InternalConfigConstants.PREPROCESS_NUM_FILES)) {
-      _numOutputFiles = Integer.parseInt(customConfigsMap.get(InternalConfigConstants.PREPROCESS_NUM_FILES));
+    if (customConfigsMap != null && customConfigsMap.containsKey(InternalConfigConstants.PREPROCESSING_NUM_REDUCERS)) {
+      _numOutputFiles = Integer.parseInt(customConfigsMap.get(InternalConfigConstants.PREPROCESSING_NUM_REDUCERS));
       Preconditions.checkState(_numOutputFiles > 0, String
-          .format("The value of %s should be positive! Current value: %s", InternalConfigConstants.PREPROCESS_NUM_FILES,
-              _numOutputFiles));
+          .format("The value of %s should be positive! Current value: %s",
+              InternalConfigConstants.PREPROCESSING_NUM_REDUCERS, _numOutputFiles));
     } else {
       _numOutputFiles = 0;
     }
-  }
 
-  private void setMaxNumRecordsConfigIfSpecified(Job job) {
-    TableCustomConfig tableCustomConfig = _tableConfig.getCustomConfig();
-    if (tableCustomConfig == null) {
-      return;
-    }
-    Map<String, String> customConfigsMap = tableCustomConfig.getCustomConfigs();
-    if (customConfigsMap != null && customConfigsMap
-        .containsKey(InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE)) {
-      int maxNumRecords =
-          Integer.parseInt(customConfigsMap.get(InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE));
+    if (customConfigsMap != null) {
+      int maxNumRecords;
+      if (customConfigsMap.containsKey(InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE)) {
+        LOGGER.warn("The config: {} from custom config is deprecated. Use {} instead.",
+            InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE,
+            InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE);
+        maxNumRecords = Integer.parseInt(customConfigsMap.get(InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE));
+      } else if (customConfigsMap.containsKey(InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE)) {
+        maxNumRecords =
+            Integer.parseInt(customConfigsMap.get(InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE));
+      } else {
+        return;
+      }
+      // TODO: add a in-built maximum value for this config to avoid having too many small files.
+      // E.g. if the config is set to 1 which is smaller than this in-built value, the job should be abort from generating too many small files.
       Preconditions.checkArgument(maxNumRecords > 0,
-          "The value of " + InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE
+          "The value of " + InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE
               + " should be positive. Current value: " + maxNumRecords);
-      _logger.info("Setting {} to {}", InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE, maxNumRecords);
-      job.getConfiguration()
-          .set(InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE, Integer.toString(maxNumRecords));
+      LOGGER.info("Setting {} to {}", InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE, maxNumRecords);
+      _maxNumRecordsPerFile = maxNumRecords;
     }
   }
 
-  /**
-   * Finds the avro file in the input folder, and returns its avro schema
-   * @param inputPathDir Path to input directory
-   * @return Input schema
-   * @throws IOException exception when accessing to IO
-   */
-  private Schema getSchema(Path inputPathDir)
-      throws IOException {
-    FileSystem fs = FileSystem.get(new Configuration());
-    Schema avroSchema = null;
-    for (FileStatus fileStatus : fs.listStatus(inputPathDir)) {
-      if (fileStatus.isFile() && fileStatus.getPath().getName().endsWith(".avro")) {
-        _logger.info("Extracting schema from " + fileStatus.getPath());
-        try (DataFileStream<GenericRecord> dataStreamReader = getAvroReader(inputPathDir)) {
-          avroSchema = dataStreamReader.getSchema();
-        }
-        break;
-      }
-    }
-    return avroSchema;
-  }
-
-  private void addSortedColumnField(Schema schema, Set<Schema.Field> fieldSet) {
-    // Sorting is enabled. Adding sorted column value to mapper output key.
-    Schema sortedColumnSchema = schema.getField(_sortedColumn).schema();
-    Schema sortedColumnAsKeySchema;
-    if (sortedColumnSchema.getType().equals(Schema.Type.UNION)) {
-      sortedColumnAsKeySchema = Schema.createUnion(sortedColumnSchema.getTypes());
-    } else if (sortedColumnSchema.getType().equals(Schema.Type.ARRAY)) {
-      sortedColumnAsKeySchema = Schema.createArray(sortedColumnSchema.getElementType());
-    } else {
-      sortedColumnAsKeySchema = Schema.create(sortedColumnSchema.getType());
-    }
-    Schema.Field columnField = new Schema.Field(_sortedColumn, sortedColumnAsKeySchema, "sortedColumn", null);
-    fieldSet.add(columnField);
-  }
-
-  private void validateConfigsAgainstSchema(Schema schema) {
-    if (_partitionColumn != null) {
-      Preconditions.checkArgument(schema.getField(_partitionColumn) != null,
-          String.format("Partition column: %s is not found from the schema of input files.", _partitionColumn));
-      Preconditions.checkArgument(_numPartitions > 0,
-          String.format("Number of partitions should be positive. Current value: %s", _numPartitions));
-      Preconditions.checkArgument(_partitionFunction != null, "Partition function should not be null!");
-      try {
-        PartitionFunctionFactory.PartitionFunctionType.fromString(_partitionFunction);
-      } catch (IllegalArgumentException e) {
-        _logger.error("Partition function needs to be one of Modulo, Murmur, ByteArray, HashCode, it is currently {}",
-            _partitionColumn);
-        throw new IllegalArgumentException(e);
-      }
-    }
-    if (_sortedColumn != null) {
-      Preconditions.checkArgument(schema.getField(_sortedColumn) != null,
-          String.format("Sorted column: %s is not found from the schema of input files.", _sortedColumn));
-    }
-  }
-
-  private void addHashCodeField(Set<Schema.Field> fieldSet) {
-    Schema.Field hashCodeField = new Schema.Field("hashcode", Schema.create(Schema.Type.INT), "hashcode", null);
-    fieldSet.add(hashCodeField);
-  }
-
   @Override
-  protected org.apache.pinot.spi.data.Schema getSchema()
+  protected Schema getSchema()
       throws IOException {
     try (ControllerRestApi controllerRestApi = getControllerRestApi()) {
       if (controllerRestApi != null) {
@@ -360,74 +274,14 @@ public class HadoopSegmentPreprocessingJob extends SegmentPreprocessingJob {
     Preconditions.checkState(_pinotTableSchema != null, "Schema cannot be null");
   }
 
-  private void setValidationConfigs(Job job, Path path)
+  /**
+   * Cleans up outputs in preprocessed output directory.
+   */
+  public static void cleanUpPreprocessedOutputs(Path preprocessedOutputDir)
       throws IOException {
-    SegmentsValidationAndRetentionConfig validationConfig = _tableConfig.getValidationConfig();
-
-    // TODO: Serialize and deserialize validation config by creating toJson and fromJson
-    // If the use case is an append use case, check that one time unit is contained in one file. If there is more than one,
-    // the job should be disabled, as we should not resize for these use cases. Therefore, setting the time column name
-    // and value
-    if (IngestionConfigUtils.getBatchSegmentIngestionType(_tableConfig).equalsIgnoreCase("APPEND")) {
-      job.getConfiguration().set(InternalConfigConstants.IS_APPEND, "true");
-      String timeColumnName = validationConfig.getTimeColumnName();
-      job.getConfiguration().set(InternalConfigConstants.TIME_COLUMN_CONFIG, timeColumnName);
-      if (timeColumnName != null) {
-        DateTimeFieldSpec dateTimeFieldSpec = _pinotTableSchema.getSpecForTimeColumn(timeColumnName);
-        if (dateTimeFieldSpec != null) {
-          DateTimeFormatSpec formatSpec = new DateTimeFormatSpec(dateTimeFieldSpec.getFormat());
-          job.getConfiguration()
-              .set(InternalConfigConstants.SEGMENT_TIME_TYPE, formatSpec.getColumnUnit().toString());
-          job.getConfiguration()
-              .set(InternalConfigConstants.SEGMENT_TIME_FORMAT, formatSpec.getTimeFormat().toString());
-          job.getConfiguration()
-              .set(InternalConfigConstants.SEGMENT_TIME_SDF_PATTERN, formatSpec.getSDFPattern());
-        }
-      }
-      job.getConfiguration().set(InternalConfigConstants.SEGMENT_PUSH_FREQUENCY,
-          IngestionConfigUtils.getBatchSegmentIngestionFrequency(_tableConfig));
-      try (DataFileStream<GenericRecord> dataStreamReader = getAvroReader(path)) {
-        job.getConfiguration()
-            .set(InternalConfigConstants.TIME_COLUMN_VALUE, dataStreamReader.next().get(timeColumnName).toString());
-      }
-    }
-  }
-
-  private void setHadoopJobConfigs(Job job, int numInputPaths) {
-    job.getConfiguration().set(JobContext.JOB_NAME, this.getClass().getName());
-    // Turn this on to always firstly use class paths that user specifies.
-    job.getConfiguration().set(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, "true");
-    // Turn this off since we don't need an empty file in the output directory
-    job.getConfiguration().set(FileOutputCommitter.SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, "false");
-
-    job.setJarByClass(HadoopSegmentPreprocessingJob.class);
-
-    String hadoopTokenFileLocation = System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
-    if (hadoopTokenFileLocation != null) {
-      job.getConfiguration().set(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY, hadoopTokenFileLocation);
+    if (HadoopUtils.DEFAULT_FILE_SYSTEM.exists(preprocessedOutputDir)) {
+      LOGGER.warn("Found output folder {}, deleting", preprocessedOutputDir);
+      HadoopUtils.DEFAULT_FILE_SYSTEM.delete(preprocessedOutputDir, true);
     }
-
-    // Mapper configs.
-    job.setMapperClass(SegmentPreprocessingMapper.class);
-    job.setMapOutputKeyClass(AvroKey.class);
-    job.setMapOutputValueClass(AvroValue.class);
-    job.getConfiguration().setInt(JobContext.NUM_MAPS, numInputPaths);
-
-    // Reducer configs.
-    job.setReducerClass(SegmentPreprocessingReducer.class);
-    job.setOutputKeyClass(AvroKey.class);
-    job.setOutputValueClass(NullWritable.class);
-  }
-
-  private void setSchemaParams(Job job, Schema avroSchema)
-      throws IOException {
-    AvroMultipleOutputs.addNamedOutput(job, "avro", AvroKeyOutputFormat.class, avroSchema);
-    AvroMultipleOutputs.setCountersEnabled(job, true);
-    // Use LazyOutputFormat to avoid creating empty files.
-    LazyOutputFormat.setOutputFormatClass(job, AvroKeyOutputFormat.class);
-
-    // Input and output paths.
-    FileInputFormat.setInputPaths(job, _inputSegmentDir);
-    FileOutputFormat.setOutputPath(job, _preprocessedOutputDir);
   }
 }
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/InternalConfigConstants.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/InternalConfigConstants.java
index 3a7938d..3701db2 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/InternalConfigConstants.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/InternalConfigConstants.java
@@ -31,19 +31,26 @@ public class InternalConfigConstants {
   public static final String SEGMENT_TIME_FORMAT = "segment.time.format";
   public static final String SEGMENT_TIME_SDF_PATTERN = "segment.time.sdf.pattern";
 
+  // The operations of preprocessing that is enabled.
+  public static final String PREPROCESS_OPERATIONS = "preprocessing.operations";
+
   // Partitioning configs
   public static final String PARTITION_COLUMN_CONFIG = "partition.column";
   public static final String NUM_PARTITIONS_CONFIG = "num.partitions";
   public static final String PARTITION_FUNCTION_CONFIG = "partition.function";
 
-  public static final String SORTED_COLUMN_CONFIG = "sorted.column";
+  public static final String SORTING_COLUMN_CONFIG = "sorting.column";
+  public static final String SORTING_COLUMN_TYPE = "sorting.type";
   public static final String ENABLE_PARTITIONING = "enable.partitioning";
 
-  // max records per file in each partition. No effect otherwise.
+  @Deprecated
+  // Use PREPROCESSING_MAX_NUM_RECORDS_PER_FILE.
   public static final String PARTITION_MAX_RECORDS_PER_FILE = "partition.max.records.per.file";
+  // max records per file in each partition. No effect otherwise.
+  public static final String PREPROCESSING_MAX_NUM_RECORDS_PER_FILE = "preprocessing.max.num.records.per.file";
 
   // Number of segments we want generated.
-  public static final String PREPROCESS_NUM_FILES = "preprocess.num.files";
+  public static final String PREPROCESSING_NUM_REDUCERS = "preprocessing.num.reducers";
 
   public static final String FAIL_ON_SCHEMA_MISMATCH = "fail.on.schema.mismatch";
 }
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/AvroDataPreprocessingMapper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/AvroDataPreprocessingMapper.java
new file mode 100644
index 0000000..6278e8e
--- /dev/null
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/AvroDataPreprocessingMapper.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.hadoop.job.mappers;
+
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapred.AvroValue;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.pinot.hadoop.job.InternalConfigConstants;
+import org.apache.pinot.hadoop.utils.preprocess.DataPreprocessingUtils;
+import org.apache.pinot.plugin.inputformat.avro.AvroRecordExtractor;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class AvroDataPreprocessingMapper extends Mapper<AvroKey<GenericRecord>, NullWritable, WritableComparable, AvroValue<GenericRecord>> {
+  private static final Logger LOGGER = LoggerFactory.getLogger(AvroDataPreprocessingMapper.class);
+
+  private String _sortingColumn = null;
+  private FieldSpec.DataType _sortingColumnType = null;
+  private AvroRecordExtractor _avroRecordExtractor;
+
+  @Override
+  public void setup(Context context) {
+    Configuration configuration = context.getConfiguration();
+    _avroRecordExtractor = new AvroRecordExtractor();
+    String sortingColumnConfig = configuration.get(InternalConfigConstants.SORTING_COLUMN_CONFIG);
+    if (sortingColumnConfig != null) {
+      _sortingColumn = sortingColumnConfig;
+      _sortingColumnType = FieldSpec.DataType.valueOf(configuration.get(InternalConfigConstants.SORTING_COLUMN_TYPE));
+      LOGGER.info("Initialized AvroDataPreprocessingMapper with sortingColumn: {} of type: {}", _sortingColumn,
+          _sortingColumnType);
+    } else {
+      LOGGER.info("Initialized AvroDataPreprocessingMapper without sorting column");
+    }
+  }
+
+  @Override
+  public void map(AvroKey<GenericRecord> key, NullWritable value, Context context)
+      throws IOException, InterruptedException {
+    GenericRecord record = key.datum();
+    if (_sortingColumn != null) {
+      Object object = record.get(_sortingColumn);
+      Preconditions
+          .checkState(object != null, "Failed to find value for sorting column: %s in record: %s", _sortingColumn,
+              record);
+      Object convertedValue = _avroRecordExtractor.convert(object);
+      Preconditions.checkState(convertedValue != null, "Invalid value: %s for sorting column: %s in record: %s", object,
+          _sortingColumn, record);
+      WritableComparable outputKey;
+      try {
+        outputKey = DataPreprocessingUtils.convertToWritableComparable(convertedValue, _sortingColumnType);
+      } catch (Exception e) {
+        throw new IllegalStateException(
+            String.format("Caught exception while processing sorting column: %s in record: %s", _sortingColumn, record),
+            e);
+      }
+      context.write(outputKey, new AvroValue<>(record));
+    } else {
+      context.write(NullWritable.get(), new AvroValue<>(record));
+    }
+  }
+}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/OrcDataPreprocessingMapper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/OrcDataPreprocessingMapper.java
new file mode 100644
index 0000000..d7d0694
--- /dev/null
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/OrcDataPreprocessingMapper.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.hadoop.job.mappers;
+
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.orc.mapred.OrcStruct;
+import org.apache.orc.mapred.OrcValue;
+import org.apache.pinot.hadoop.job.InternalConfigConstants;
+import org.apache.pinot.hadoop.utils.preprocess.DataPreprocessingUtils;
+import org.apache.pinot.hadoop.utils.preprocess.OrcUtils;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class OrcDataPreprocessingMapper extends Mapper<NullWritable, OrcStruct, WritableComparable, OrcValue> {
+  private static final Logger LOGGER = LoggerFactory.getLogger(OrcDataPreprocessingMapper.class);
+
+  private final OrcValue _valueWrapper = new OrcValue();
+  private String _sortingColumn = null;
+  private FieldSpec.DataType _sortingColumnType = null;
+  private int _sortingColumnId = -1;
+
+  @Override
+  public void setup(Context context) {
+    Configuration configuration = context.getConfiguration();
+    String sortingColumnConfig = configuration.get(InternalConfigConstants.SORTING_COLUMN_CONFIG);
+    if (sortingColumnConfig != null) {
+      _sortingColumn = sortingColumnConfig;
+      _sortingColumnType = FieldSpec.DataType.valueOf(configuration.get(InternalConfigConstants.SORTING_COLUMN_TYPE));
+      LOGGER.info("Initialized OrcDataPreprocessingMapper with sortingColumn: {} of type: {}", _sortingColumn,
+          _sortingColumnType);
+    } else {
+      LOGGER.info("Initialized OrcDataPreprocessingMapper without sorting column");
+    }
+  }
+
+  @Override
+  public void map(NullWritable key, OrcStruct value, Context context)
+      throws IOException, InterruptedException {
+    _valueWrapper.value = value;
+    if (_sortingColumn != null) {
+      if (_sortingColumnId == -1) {
+        List<String> fieldNames = value.getSchema().getFieldNames();
+        _sortingColumnId = fieldNames.indexOf(_sortingColumn);
+        Preconditions.checkState(_sortingColumnId != -1, "Failed to find sorting column: %s in the ORC fields: %s",
+            _sortingColumn, fieldNames);
+        LOGGER.info("Field id for sorting column: {} is: {}", _sortingColumn, _sortingColumnId);
+      }
+      WritableComparable sortingColumnValue = value.getFieldValue(_sortingColumnId);
+      WritableComparable outputKey;
+      try {
+        outputKey = DataPreprocessingUtils
+            .convertToWritableComparable(OrcUtils.convert(sortingColumnValue), _sortingColumnType);
+      } catch (Exception e) {
+        throw new IllegalStateException(String
+            .format("Caught exception while processing sorting column: %s, id: %d in ORC struct: %s", _sortingColumn,
+                _sortingColumnId, value), e);
+      }
+      context.write(outputKey, _valueWrapper);
+    } else {
+      context.write(NullWritable.get(), _valueWrapper);
+    }
+  }
+}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentPreprocessingMapper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentPreprocessingMapper.java
index e3f088d..3d3fcec 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentPreprocessingMapper.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentPreprocessingMapper.java
@@ -20,6 +20,7 @@ package org.apache.pinot.hadoop.job.mappers;
 
 import com.google.common.base.Preconditions;
 import java.io.IOException;
+import java.util.Map;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
@@ -40,6 +41,7 @@ import org.slf4j.LoggerFactory;
 
 public class SegmentPreprocessingMapper extends Mapper<AvroKey<GenericRecord>, NullWritable, AvroKey<GenericRecord>, AvroValue<GenericRecord>> {
   private static final Logger LOGGER = LoggerFactory.getLogger(SegmentPreprocessingMapper.class);
+  private Configuration _jobConf;
   private String _sortedColumn = null;
   private String _timeColumn = null;
   private Schema _outputKeySchema;
@@ -52,43 +54,42 @@ public class SegmentPreprocessingMapper extends Mapper<AvroKey<GenericRecord>, N
 
   @Override
   public void setup(final Context context) {
-    Configuration configuration = context.getConfiguration();
-
-    String tableName = configuration.get(JobConfigConstants.SEGMENT_TABLE_NAME);
-
-    _isAppend = configuration.get(InternalConfigConstants.IS_APPEND).equalsIgnoreCase("true");
+    _jobConf = context.getConfiguration();
+    logConfigurations();
+    String tableName = _jobConf.get(JobConfigConstants.SEGMENT_TABLE_NAME);
 
+    _isAppend = "true".equalsIgnoreCase(_jobConf.get(InternalConfigConstants.IS_APPEND));
     if (_isAppend) {
       // Get time column name
-      _timeColumn = configuration.get(InternalConfigConstants.TIME_COLUMN_CONFIG);
+      _timeColumn = _jobConf.get(InternalConfigConstants.TIME_COLUMN_CONFIG);
 
       // Get sample time column value
-      String timeColumnValue = configuration.get(InternalConfigConstants.TIME_COLUMN_VALUE);
-      String pushFrequency = configuration.get(InternalConfigConstants.SEGMENT_PUSH_FREQUENCY);
+      String timeColumnValue = _jobConf.get(InternalConfigConstants.TIME_COLUMN_VALUE);
+      String pushFrequency = _jobConf.get(InternalConfigConstants.SEGMENT_PUSH_FREQUENCY);
 
-      String timeType = configuration.get(InternalConfigConstants.SEGMENT_TIME_TYPE);
-      String timeFormat = configuration.get(InternalConfigConstants.SEGMENT_TIME_FORMAT);
+      String timeType = _jobConf.get(InternalConfigConstants.SEGMENT_TIME_TYPE);
+      String timeFormat = _jobConf.get(InternalConfigConstants.SEGMENT_TIME_FORMAT);
       DateTimeFormatSpec dateTimeFormatSpec;
       if (timeFormat.equals(DateTimeFieldSpec.TimeFormat.EPOCH.toString())) {
         dateTimeFormatSpec = new DateTimeFormatSpec(1, timeType, timeFormat);
       } else {
         dateTimeFormatSpec = new DateTimeFormatSpec(1, timeType, timeFormat,
-            configuration.get(InternalConfigConstants.SEGMENT_TIME_SDF_PATTERN));
+            _jobConf.get(InternalConfigConstants.SEGMENT_TIME_SDF_PATTERN));
       }
       _normalizedDateSegmentNameGenerator =
           new NormalizedDateSegmentNameGenerator(tableName, null, false, "APPEND", pushFrequency, dateTimeFormatSpec);
       _sampleNormalizedTimeColumnValue = _normalizedDateSegmentNameGenerator.getNormalizedDate(timeColumnValue);
     }
 
-    String sortedColumn = configuration.get(InternalConfigConstants.SORTED_COLUMN_CONFIG);
+    String sortedColumn = _jobConf.get(InternalConfigConstants.SORTING_COLUMN_CONFIG);
     // Logging the configs for the mapper
     LOGGER.info("Sorted Column: " + sortedColumn);
     if (sortedColumn != null) {
       _sortedColumn = sortedColumn;
     }
-    _outputKeySchema = AvroJob.getMapOutputKeySchema(configuration);
-    _outputSchema = AvroJob.getMapOutputValueSchema(configuration);
-    _enablePartitioning = Boolean.parseBoolean(configuration.get(InternalConfigConstants.ENABLE_PARTITIONING, "false"));
+    _outputKeySchema = AvroJob.getMapOutputKeySchema(_jobConf);
+    _outputSchema = AvroJob.getMapOutputValueSchema(_jobConf);
+    _enablePartitioning = Boolean.parseBoolean(_jobConf.get(InternalConfigConstants.ENABLE_PARTITIONING, "false"));
   }
 
   @Override
@@ -130,4 +131,26 @@ public class SegmentPreprocessingMapper extends Mapper<AvroKey<GenericRecord>, N
       throw e;
     }
   }
+
+  protected void logConfigurations() {
+    StringBuilder stringBuilder = new StringBuilder();
+    stringBuilder.append('{');
+    boolean firstEntry = true;
+    for (Map.Entry<String, String> entry : _jobConf) {
+      if (!firstEntry) {
+        stringBuilder.append(", \n");
+      } else {
+        firstEntry = false;
+      }
+
+      stringBuilder.append(entry.getKey());
+      stringBuilder.append('=');
+      stringBuilder.append(entry.getValue());
+    }
+    stringBuilder.append('}');
+
+    LOGGER.info("*********************************************************************");
+    LOGGER.info("Job Configurations: {}", stringBuilder.toString());
+    LOGGER.info("*********************************************************************");
+  }
 }
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/AvroDataPreprocessingPartitioner.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/AvroDataPreprocessingPartitioner.java
new file mode 100644
index 0000000..74799c7
--- /dev/null
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/AvroDataPreprocessingPartitioner.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.hadoop.job.partitioners;
+
+import com.google.common.base.Preconditions;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.mapred.AvroValue;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.pinot.hadoop.job.InternalConfigConstants;
+import org.apache.pinot.plugin.inputformat.avro.AvroRecordExtractor;
+import org.apache.pinot.segment.spi.partition.PartitionFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class AvroDataPreprocessingPartitioner extends Partitioner<WritableComparable, AvroValue<GenericRecord>> implements Configurable {
+  private static final Logger LOGGER = LoggerFactory.getLogger(AvroDataPreprocessingPartitioner.class);
+
+  private Configuration _conf;
+  private String _partitionColumn;
+  private PartitionFunction _partitionFunction;
+  private AvroRecordExtractor _avroRecordExtractor;
+
+  @Override
+  public void setConf(Configuration conf) {
+    _conf = conf;
+    _avroRecordExtractor = new AvroRecordExtractor();
+    _partitionColumn = conf.get(InternalConfigConstants.PARTITION_COLUMN_CONFIG);
+    String partitionFunctionName = conf.get(InternalConfigConstants.PARTITION_FUNCTION_CONFIG);
+    int numPartitions = Integer.parseInt(conf.get(InternalConfigConstants.NUM_PARTITIONS_CONFIG));
+    _partitionFunction = PartitionFunctionFactory.getPartitionFunction(partitionFunctionName, numPartitions);
+    LOGGER.info(
+        "Initialized AvroDataPreprocessingPartitioner with partitionColumn: {}, partitionFunction: {}, numPartitions: {}",
+        _partitionColumn, partitionFunctionName, numPartitions);
+  }
+
+  @Override
+  public Configuration getConf() {
+    return _conf;
+  }
+
+  @Override
+  public int getPartition(WritableComparable key, AvroValue<GenericRecord> value, int numPartitions) {
+    GenericRecord record = value.datum();
+    Object object = record.get(_partitionColumn);
+    Preconditions
+        .checkState(object != null, "Failed to find value for partition column: %s in record: %s", _partitionColumn,
+            record);
+    Object convertedValue = _avroRecordExtractor.convert(object);
+    Preconditions.checkState(convertedValue != null, "Invalid value: %s for partition column: %s in record: %s", object,
+        _partitionColumn, record);
+    Preconditions.checkState(convertedValue instanceof Number || convertedValue instanceof String,
+        "Value for partition column: %s must be either a Number or a String, found: %s in record: %s", _partitionColumn,
+        convertedValue.getClass(), record);
+    // NOTE: Always partition with String type value because Broker uses String type value to prune segments
+    return _partitionFunction.getPartition(convertedValue.toString());
+  }
+}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/OrcDataPreprocessingPartitioner.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/OrcDataPreprocessingPartitioner.java
new file mode 100644
index 0000000..bef2cef
--- /dev/null
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/OrcDataPreprocessingPartitioner.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.hadoop.job.partitioners;
+
+import com.google.common.base.Preconditions;
+import java.util.List;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.orc.mapred.OrcStruct;
+import org.apache.orc.mapred.OrcValue;
+import org.apache.pinot.hadoop.job.InternalConfigConstants;
+import org.apache.pinot.hadoop.utils.preprocess.OrcUtils;
+import org.apache.pinot.segment.spi.partition.PartitionFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class OrcDataPreprocessingPartitioner extends Partitioner<WritableComparable, OrcValue> implements Configurable {
+  private static final Logger LOGGER = LoggerFactory.getLogger(OrcDataPreprocessingPartitioner.class);
+
+  private Configuration _conf;
+  private String _partitionColumn;
+  private PartitionFunction _partitionFunction;
+  private int _partitionColumnId = -1;
+
+  @Override
+  public void setConf(Configuration conf) {
+    _conf = conf;
+    _partitionColumn = conf.get(InternalConfigConstants.PARTITION_COLUMN_CONFIG);
+    String partitionFunctionName = conf.get(InternalConfigConstants.PARTITION_FUNCTION_CONFIG);
+    int numPartitions = Integer.parseInt(conf.get(InternalConfigConstants.NUM_PARTITIONS_CONFIG));
+    _partitionFunction = PartitionFunctionFactory.getPartitionFunction(partitionFunctionName, numPartitions);
+    LOGGER.info(
+        "Initialized OrcDataPreprocessingPartitioner with partitionColumn: {}, partitionFunction: {}, numPartitions: {}",
+        _partitionColumn, partitionFunctionName, numPartitions);
+  }
+
+  @Override
+  public Configuration getConf() {
+    return _conf;
+  }
+
+  @Override
+  public int getPartition(WritableComparable key, OrcValue value, int numPartitions) {
+    OrcStruct orcStruct = (OrcStruct) value.value;
+    if (_partitionColumnId == -1) {
+      List<String> fieldNames = orcStruct.getSchema().getFieldNames();
+      _partitionColumnId = fieldNames.indexOf(_partitionColumn);
+      Preconditions.checkState(_partitionColumnId != -1, "Failed to find partition column: %s in the ORC fields: %s",
+          _partitionColumn, fieldNames);
+      LOGGER.info("Field id for partition column: {} is: {}", _partitionColumn, _partitionColumnId);
+    }
+    WritableComparable partitionColumnValue = orcStruct.getFieldValue(_partitionColumnId);
+    Object convertedValue;
+    try {
+      convertedValue = OrcUtils.convert(partitionColumnValue);
+    } catch (Exception e) {
+      throw new IllegalStateException(String
+          .format("Caught exception while processing partition column: %s, id: %d in ORC struct: %s", _partitionColumn,
+              _partitionColumnId, orcStruct), e);
+    }
+    // NOTE: Always partition with String type value because Broker uses String type value to prune segments
+    return _partitionFunction.getPartition(convertedValue.toString());
+  }
+}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/AvroDataPreprocessingHelper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/AvroDataPreprocessingHelper.java
new file mode 100644
index 0000000..9e5f5f2
--- /dev/null
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/AvroDataPreprocessingHelper.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.hadoop.job.preprocess;
+
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.util.List;
+import java.util.zip.GZIPInputStream;
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapreduce.AvroJob;
+import org.apache.avro.mapreduce.AvroKeyInputFormat;
+import org.apache.avro.mapreduce.AvroKeyOutputFormat;
+import org.apache.avro.mapreduce.AvroMultipleOutputs;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
+import org.apache.pinot.hadoop.job.mappers.AvroDataPreprocessingMapper;
+import org.apache.pinot.hadoop.job.partitioners.AvroDataPreprocessingPartitioner;
+import org.apache.pinot.hadoop.job.reducers.AvroDataPreprocessingReducer;
+import org.apache.pinot.hadoop.utils.preprocess.HadoopUtils;
+import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class AvroDataPreprocessingHelper extends DataPreprocessingHelper {
+  private static final Logger LOGGER = LoggerFactory.getLogger(AvroDataPreprocessingHelper.class);
+
+  public AvroDataPreprocessingHelper(List<Path> inputDataPaths, Path outputPath) {
+    super(inputDataPaths, outputPath);
+  }
+
+  @Override
+  public Class<? extends Partitioner> getPartitioner() {
+    return AvroDataPreprocessingPartitioner.class;
+  }
+
+  @Override
+  public void setUpMapperReducerConfigs(Job job)
+      throws IOException {
+    Schema avroSchema = getAvroSchema(_sampleRawDataPath);
+    LOGGER.info("Avro schema is: {}", avroSchema.toString(true));
+    validateConfigsAgainstSchema(avroSchema);
+
+    job.setInputFormatClass(AvroKeyInputFormat.class);
+    job.setMapperClass(AvroDataPreprocessingMapper.class);
+
+    job.setReducerClass(AvroDataPreprocessingReducer.class);
+    AvroMultipleOutputs.addNamedOutput(job, "avro", AvroKeyOutputFormat.class, avroSchema);
+    AvroMultipleOutputs.setCountersEnabled(job, true);
+    // Use LazyOutputFormat to avoid creating empty files.
+    LazyOutputFormat.setOutputFormatClass(job, AvroKeyOutputFormat.class);
+    job.setOutputKeyClass(AvroKey.class);
+    job.setOutputValueClass(NullWritable.class);
+
+    AvroJob.setInputKeySchema(job, avroSchema);
+    AvroJob.setMapOutputValueSchema(job, avroSchema);
+    AvroJob.setOutputKeySchema(job, avroSchema);
+  }
+
+  @Override
+  String getSampleTimeColumnValue(String timeColumnName)
+      throws IOException {
+    String sampleTimeColumnValue;
+    try (DataFileStream<GenericRecord> dataStreamReader = getAvroReader(_sampleRawDataPath)) {
+      sampleTimeColumnValue = dataStreamReader.next().get(timeColumnName).toString();
+    }
+    return sampleTimeColumnValue;
+  }
+
+  /**
+   * Finds the avro file in the input folder, and returns its avro schema
+   * @param inputPathDir Path to input directory
+   * @return Input schema
+   * @throws IOException exception when accessing to IO
+   */
+  private Schema getAvroSchema(Path inputPathDir)
+      throws IOException {
+    Schema avroSchema = null;
+    for (FileStatus fileStatus : HadoopUtils.DEFAULT_FILE_SYSTEM.listStatus(inputPathDir)) {
+      if (fileStatus.isFile() && fileStatus.getPath().getName().endsWith(".avro")) {
+        LOGGER.info("Extracting schema from " + fileStatus.getPath());
+        try (DataFileStream<GenericRecord> dataStreamReader = getAvroReader(inputPathDir)) {
+          avroSchema = dataStreamReader.getSchema();
+        }
+        break;
+      }
+    }
+    return avroSchema;
+  }
+
+  /**
+   * Helper method that returns avro reader for the given avro file.
+   * If file name ends in 'gz' then returns the GZIP version, otherwise gives the regular reader.
+   *
+   * @param avroFile File to read
+   * @return Avro reader for the file.
+   * @throws IOException exception when accessing to IO
+   */
+  private DataFileStream<GenericRecord> getAvroReader(Path avroFile)
+      throws IOException {
+    FileSystem fs = FileSystem.get(new Configuration());
+    if (avroFile.getName().endsWith("gz")) {
+      return new DataFileStream<>(new GZIPInputStream(fs.open(avroFile)), new GenericDatumReader<>());
+    } else {
+      return new DataFileStream<>(fs.open(avroFile), new GenericDatumReader<>());
+    }
+  }
+
+  private void validateConfigsAgainstSchema(Schema schema) {
+    if (_partitionColumn != null) {
+      Preconditions.checkArgument(schema.getField(_partitionColumn) != null,
+          String.format("Partition column: %s is not found from the schema of input files.", _partitionColumn));
+      Preconditions.checkArgument(_numPartitions > 0,
+          String.format("Number of partitions should be positive. Current value: %s", _numPartitions));
+      Preconditions.checkArgument(_partitionFunction != null, "Partition function should not be null!");
+      try {
+        PartitionFunctionFactory.PartitionFunctionType.fromString(_partitionFunction);
+      } catch (IllegalArgumentException e) {
+        LOGGER.error("Partition function needs to be one of Modulo, Murmur, ByteArray, HashCode, it is currently {}",
+            _partitionColumn);
+        throw new IllegalArgumentException(e);
+      }
+    }
+    if (_sortingColumn != null) {
+      Preconditions.checkArgument(schema.getField(_sortingColumn) != null,
+          String.format("Sorted column: %s is not found from the schema of input files.", _sortingColumn));
+    }
+  }
+}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelper.java
new file mode 100644
index 0000000..a505d09
--- /dev/null
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelper.java
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.hadoop.job.preprocess;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.pinot.hadoop.job.HadoopSegmentPreprocessingJob;
+import org.apache.pinot.hadoop.job.InternalConfigConstants;
+import org.apache.pinot.hadoop.job.partitioners.GenericPartitioner;
+import org.apache.pinot.hadoop.utils.preprocess.HadoopUtils;
+import org.apache.pinot.hadoop.utils.preprocess.TextComparator;
+import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.DateTimeFieldSpec;
+import org.apache.pinot.spi.data.DateTimeFormatSpec;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.IngestionConfigUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public abstract class DataPreprocessingHelper {
+  private static final Logger LOGGER = LoggerFactory.getLogger(DataPreprocessingHelper.class);
+
+  String _partitionColumn;
+  int _numPartitions;
+  String _partitionFunction;
+
+  String _sortingColumn;
+  private FieldSpec.DataType _sortingColumnType;
+
+  private int _numOutputFiles;
+  private int _maxNumRecordsPerFile;
+
+  private TableConfig _tableConfig;
+  private Schema _pinotTableSchema;
+
+  List<Path> _inputDataPaths;
+  Path _sampleRawDataPath;
+  Path _outputPath;
+
+  public DataPreprocessingHelper(List<Path> inputDataPaths, Path outputPath) {
+    _inputDataPaths = inputDataPaths;
+    _sampleRawDataPath = inputDataPaths.get(0);
+    _outputPath = outputPath;
+  }
+
+  public void registerConfigs(TableConfig tableConfig, Schema tableSchema, String partitionColumn, int numPartitions,
+      String partitionFunction, String sortingColumn, FieldSpec.DataType sortingColumnType, int numOutputFiles,
+      int maxNumRecordsPerFile) {
+    _tableConfig = tableConfig;
+    _pinotTableSchema = tableSchema;
+    _partitionColumn = partitionColumn;
+    _numPartitions = numPartitions;
+    _partitionFunction = partitionFunction;
+
+    _sortingColumn = sortingColumn;
+    _sortingColumnType = sortingColumnType;
+
+    _numOutputFiles = numOutputFiles;
+    _maxNumRecordsPerFile = maxNumRecordsPerFile;
+  }
+
+  public Job setUpJob()
+      throws IOException {
+    LOGGER.info("Initializing a pre-processing job");
+    Job job = Job.getInstance(HadoopUtils.DEFAULT_CONFIGURATION);
+    Configuration jobConf = job.getConfiguration();
+    // Input and output paths.
+    int numInputPaths = _inputDataPaths.size();
+    jobConf.setInt(JobContext.NUM_MAPS, numInputPaths);
+    setValidationConfigs(job, _sampleRawDataPath);
+    for (Path inputFile : _inputDataPaths) {
+      FileInputFormat.addInputPath(job, inputFile);
+    }
+    setHadoopJobConfigs(job);
+
+    // Sorting column
+    if (_sortingColumn != null) {
+      LOGGER.info("Adding sorting column: {} to job config", _sortingColumn);
+      jobConf.set(InternalConfigConstants.SORTING_COLUMN_CONFIG, _sortingColumn);
+      jobConf.set(InternalConfigConstants.SORTING_COLUMN_TYPE, _sortingColumnType.name());
+
+      switch (_sortingColumnType) {
+        case INT:
+          job.setMapOutputKeyClass(IntWritable.class);
+          break;
+        case LONG:
+          job.setMapOutputKeyClass(LongWritable.class);
+          break;
+        case FLOAT:
+          job.setMapOutputKeyClass(FloatWritable.class);
+          break;
+        case DOUBLE:
+          job.setMapOutputKeyClass(DoubleWritable.class);
+          break;
+        case STRING:
+          job.setMapOutputKeyClass(Text.class);
+          job.setSortComparatorClass(TextComparator.class);
+          break;
+        default:
+          throw new IllegalStateException();
+      }
+    } else {
+      job.setMapOutputKeyClass(NullWritable.class);
+    }
+
+    // Partition column
+    int numReduceTasks = 0;
+    if (_partitionColumn != null) {
+      numReduceTasks = _numPartitions;
+      jobConf.set(InternalConfigConstants.ENABLE_PARTITIONING, "true");
+      job.setPartitionerClass(GenericPartitioner.class);
+      jobConf.set(InternalConfigConstants.PARTITION_COLUMN_CONFIG, _partitionColumn);
+      if (_partitionFunction != null) {
+        jobConf.set(InternalConfigConstants.PARTITION_FUNCTION_CONFIG, _partitionFunction);
+      }
+      jobConf.setInt(InternalConfigConstants.NUM_PARTITIONS_CONFIG, numReduceTasks);
+      job.setPartitionerClass(getPartitioner());
+    } else {
+      if (_numOutputFiles > 0) {
+        numReduceTasks = _numOutputFiles;
+      } else {
+        // default number of input paths
+        numReduceTasks = _inputDataPaths.size();
+      }
+    }
+    // Maximum number of records per output file
+    jobConf
+        .set(InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE, Integer.toString(_maxNumRecordsPerFile));
+    // Number of reducers
+    LOGGER.info("Number of reduce tasks for pre-processing job: {}", numReduceTasks);
+    job.setNumReduceTasks(numReduceTasks);
+
+    setUpMapperReducerConfigs(job);
+
+    return job;
+  }
+
+  abstract Class<? extends Partitioner> getPartitioner();
+
+  abstract void setUpMapperReducerConfigs(Job job)
+      throws IOException;
+
+  abstract String getSampleTimeColumnValue(String timeColumnName)
+      throws IOException;
+
+  private void setValidationConfigs(Job job, Path path)
+      throws IOException {
+    SegmentsValidationAndRetentionConfig validationConfig = _tableConfig.getValidationConfig();
+
+    // TODO: Serialize and deserialize validation config by creating toJson and fromJson
+    // If the use case is an append use case, check that one time unit is contained in one file. If there is more than one,
+    // the job should be disabled, as we should not resize for these use cases. Therefore, setting the time column name
+    // and value
+    if (IngestionConfigUtils.getBatchSegmentIngestionType(_tableConfig).equalsIgnoreCase("APPEND")) {
+      job.getConfiguration().set(InternalConfigConstants.IS_APPEND, "true");
+      String timeColumnName = validationConfig.getTimeColumnName();
+      job.getConfiguration().set(InternalConfigConstants.TIME_COLUMN_CONFIG, timeColumnName);
+      if (timeColumnName != null) {
+        DateTimeFieldSpec dateTimeFieldSpec = _pinotTableSchema.getSpecForTimeColumn(timeColumnName);
+        if (dateTimeFieldSpec != null) {
+          DateTimeFormatSpec formatSpec = new DateTimeFormatSpec(dateTimeFieldSpec.getFormat());
+          job.getConfiguration().set(InternalConfigConstants.SEGMENT_TIME_TYPE, formatSpec.getColumnUnit().toString());
+          job.getConfiguration()
+              .set(InternalConfigConstants.SEGMENT_TIME_FORMAT, formatSpec.getTimeFormat().toString());
+          job.getConfiguration().set(InternalConfigConstants.SEGMENT_TIME_SDF_PATTERN, formatSpec.getSDFPattern());
+        }
+      }
+      job.getConfiguration().set(InternalConfigConstants.SEGMENT_PUSH_FREQUENCY,
+          IngestionConfigUtils.getBatchSegmentIngestionFrequency(_tableConfig));
+
+      String sampleTimeColumnValue = getSampleTimeColumnValue(timeColumnName);
+      if (sampleTimeColumnValue != null) {
+        job.getConfiguration().set(InternalConfigConstants.TIME_COLUMN_VALUE, sampleTimeColumnValue);
+      }
+    }
+  }
+
+  private void setHadoopJobConfigs(Job job) {
+    job.setJarByClass(HadoopSegmentPreprocessingJob.class);
+    job.setJobName(getClass().getName());
+    FileOutputFormat.setOutputPath(job, _outputPath);
+    job.getConfiguration().set(JobContext.JOB_NAME, this.getClass().getName());
+    // Turn this on to always firstly use class paths that user specifies.
+    job.getConfiguration().set(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, "true");
+    // Turn this off since we don't need an empty file in the output directory
+    job.getConfiguration().set(FileOutputCommitter.SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, "false");
+
+    String hadoopTokenFileLocation = System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
+    if (hadoopTokenFileLocation != null) {
+      job.getConfiguration().set(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY, hadoopTokenFileLocation);
+    }
+  }
+}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelperFactory.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelperFactory.java
new file mode 100644
index 0000000..2e91773
--- /dev/null
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelperFactory.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.hadoop.job.preprocess;
+
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.util.List;
+import org.apache.hadoop.fs.Path;
+import org.apache.pinot.hadoop.utils.preprocess.DataFileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class DataPreprocessingHelperFactory {
+  private static final Logger LOGGER = LoggerFactory.getLogger(DataPreprocessingHelperFactory.class);
+
+  public static DataPreprocessingHelper generateDataPreprocessingHelper(Path inputPaths, Path outputPath)
+      throws IOException {
+    final List<Path> avroFiles = DataFileUtils.getDataFiles(inputPaths, DataFileUtils.AVRO_FILE_EXTENSION);
+    final List<Path> orcFiles = DataFileUtils.getDataFiles(inputPaths, DataFileUtils.ORC_FILE_EXTENSION);
+
+    int numAvroFiles = avroFiles.size();
+    int numOrcFiles = orcFiles.size();
+    Preconditions.checkState(numAvroFiles == 0 || numOrcFiles == 0,
+        "Cannot preprocess mixed AVRO files: %s and ORC files: %s in directories: %s", avroFiles, orcFiles,
+        inputPaths);
+    Preconditions
+        .checkState(numAvroFiles > 0 || numOrcFiles > 0, "Failed to find any AVRO or ORC file in directories: %s",
+            inputPaths);
+
+    if (numAvroFiles > 0) {
+      LOGGER.info("Found AVRO files: {} in directories: {}", avroFiles, inputPaths);
+      return new AvroDataPreprocessingHelper(avroFiles, outputPath);
+    } else {
+      LOGGER.info("Found ORC files: {} in directories: {}", orcFiles, inputPaths);
+      return new OrcDataPreprocessingHelper(orcFiles, outputPath);
+    }
+  }
+}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/OrcDataPreprocessingHelper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/OrcDataPreprocessingHelper.java
new file mode 100644
index 0000000..aec0bb0
--- /dev/null
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/OrcDataPreprocessingHelper.java
@@ -0,0 +1,231 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.hadoop.job.preprocess;
+
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
+import org.apache.orc.OrcConf;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.RecordReader;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.mapred.OrcStruct;
+import org.apache.orc.mapred.OrcValue;
+import org.apache.orc.mapreduce.OrcInputFormat;
+import org.apache.orc.mapreduce.OrcOutputFormat;
+import org.apache.pinot.hadoop.job.mappers.OrcDataPreprocessingMapper;
+import org.apache.pinot.hadoop.job.partitioners.OrcDataPreprocessingPartitioner;
+import org.apache.pinot.hadoop.job.reducers.OrcDataPreprocessingReducer;
+import org.apache.pinot.hadoop.utils.preprocess.HadoopUtils;
+import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory;
+import org.apache.pinot.spi.utils.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class OrcDataPreprocessingHelper extends DataPreprocessingHelper {
+  private static final Logger LOGGER = LoggerFactory.getLogger(OrcDataPreprocessingHelper.class);
+
+  public OrcDataPreprocessingHelper(List<Path> inputDataPaths, Path outputPath) {
+    super(inputDataPaths, outputPath);
+  }
+
+  @Override
+  Class<? extends Partitioner> getPartitioner() {
+    return OrcDataPreprocessingPartitioner.class;
+  }
+
+  @Override
+  void setUpMapperReducerConfigs(Job job) {
+    TypeDescription orcSchema = getOrcSchema(_sampleRawDataPath);
+    String orcSchemaString = orcSchema.toString();
+    LOGGER.info("Orc schema is: {}", orcSchemaString);
+    validateConfigsAgainstSchema(orcSchema);
+
+    job.setInputFormatClass(OrcInputFormat.class);
+    job.setMapperClass(OrcDataPreprocessingMapper.class);
+    job.setMapOutputValueClass(OrcValue.class);
+    Configuration jobConf = job.getConfiguration();
+    OrcConf.MAPRED_SHUFFLE_VALUE_SCHEMA.setString(jobConf, orcSchemaString);
+
+    job.setReducerClass(OrcDataPreprocessingReducer.class);
+    // Use LazyOutputFormat to avoid creating empty files.
+    LazyOutputFormat.setOutputFormatClass(job, OrcOutputFormat.class);
+    job.setOutputKeyClass(NullWritable.class);
+    job.setOutputValueClass(OrcStruct.class);
+    OrcConf.MAPRED_OUTPUT_SCHEMA.setString(jobConf, orcSchemaString);
+  }
+
+  @Override
+  String getSampleTimeColumnValue(String timeColumnName)
+      throws IOException {
+    try (Reader reader = OrcFile
+        .createReader(_sampleRawDataPath, OrcFile.readerOptions(HadoopUtils.DEFAULT_CONFIGURATION))) {
+      Reader.Options options = new Reader.Options();
+      options.range(0, 1);
+      RecordReader records = reader.rows(options);
+      TypeDescription orcSchema = reader.getSchema();
+      VectorizedRowBatch vectorizedRowBatch = orcSchema.createRowBatch();
+
+      if (records.nextBatch(vectorizedRowBatch)) {
+        List<String> orcFields = orcSchema.getFieldNames();
+        int numFields = orcFields.size();
+        for (int i = 0; i < numFields; i++) {
+          String fieldName = orcFields.get(i);
+          if (timeColumnName.equals(fieldName)) {
+            ColumnVector columnVector = vectorizedRowBatch.cols[i];
+            TypeDescription fieldType = orcSchema.getChildren().get(i);
+            TypeDescription.Category category = fieldType.getCategory();
+            return getValue(fieldName, columnVector, category);
+          }
+        }
+      }
+    }
+    return null;
+  }
+
+  private String getValue(String field, ColumnVector columnVector, TypeDescription.Category category) {
+    switch (category) {
+      case BOOLEAN:
+        LongColumnVector longColumnVector = (LongColumnVector) columnVector;
+        if (longColumnVector.noNulls || !longColumnVector.isNull[0]) {
+          return Boolean.toString(longColumnVector.vector[0] == 1);
+        } else {
+          return null;
+        }
+      case BYTE:
+      case SHORT:
+      case INT:
+        // Extract to Integer
+        longColumnVector = (LongColumnVector) columnVector;
+        if (longColumnVector.noNulls || !longColumnVector.isNull[0]) {
+          return Integer.toString((int) longColumnVector.vector[0]);
+        } else {
+          return null;
+        }
+      case LONG:
+      case DATE:
+        // Extract to Long
+        longColumnVector = (LongColumnVector) columnVector;
+        if (longColumnVector.noNulls || !longColumnVector.isNull[0]) {
+          return Long.toString(longColumnVector.vector[0]);
+        } else {
+          return null;
+        }
+      case TIMESTAMP:
+        // Extract to Long
+        TimestampColumnVector timestampColumnVector = (TimestampColumnVector) columnVector;
+        if (timestampColumnVector.noNulls || !timestampColumnVector.isNull[0]) {
+          return Long.toString(timestampColumnVector.time[0]);
+        } else {
+          return null;
+        }
+      case FLOAT:
+        // Extract to Float
+        DoubleColumnVector doubleColumnVector = (DoubleColumnVector) columnVector;
+        if (doubleColumnVector.noNulls || !doubleColumnVector.isNull[0]) {
+          return Float.toString((float) doubleColumnVector.vector[0]);
+        } else {
+          return null;
+        }
+      case DOUBLE:
+        // Extract to Double
+        doubleColumnVector = (DoubleColumnVector) columnVector;
+        if (doubleColumnVector.noNulls || !doubleColumnVector.isNull[0]) {
+          return Double.toString(doubleColumnVector.vector[0]);
+        } else {
+          return null;
+        }
+      case STRING:
+      case VARCHAR:
+      case CHAR:
+        // Extract to String
+        BytesColumnVector bytesColumnVector = (BytesColumnVector) columnVector;
+        if (bytesColumnVector.noNulls || !bytesColumnVector.isNull[0]) {
+          int length = bytesColumnVector.length[0];
+          return StringUtils.decodeUtf8(bytesColumnVector.vector[0], bytesColumnVector.start[0], length);
+        } else {
+          return null;
+        }
+      case BINARY:
+        // Extract to byte[]
+        bytesColumnVector = (BytesColumnVector) columnVector;
+        if (bytesColumnVector.noNulls || !bytesColumnVector.isNull[0]) {
+          int length = bytesColumnVector.length[0];
+          byte[] bytes = new byte[length];
+          System.arraycopy(bytesColumnVector.vector[0], bytesColumnVector.start[0], bytes, 0, length);
+          return new String(bytes, StandardCharsets.UTF_8);
+        } else {
+          return null;
+        }
+      default:
+        // Unsupported types
+        throw new IllegalStateException("Unsupported field type: " + category + " for field: " + field);
+    }
+  }
+
+  /**
+   * Finds the orc file and return its orc schema.
+   */
+  private TypeDescription getOrcSchema(Path orcFile) {
+    TypeDescription orcSchema;
+    try (Reader reader = OrcFile.createReader(orcFile, OrcFile.readerOptions(HadoopUtils.DEFAULT_CONFIGURATION))) {
+      orcSchema = reader.getSchema();
+    } catch (Exception e) {
+      throw new IllegalStateException("Caught exception while extracting ORC schema from file: " + orcFile, e);
+    }
+    return orcSchema;
+  }
+
+  private void validateConfigsAgainstSchema(TypeDescription schema) {
+    List<String> fieldNames = schema.getFieldNames();
+    if (_partitionColumn != null) {
+      Preconditions.checkArgument(fieldNames.contains(_partitionColumn),
+          String.format("Partition column: %s is not found from the schema of input files.", _partitionColumn));
+      Preconditions.checkArgument(_numPartitions > 0,
+          String.format("Number of partitions should be positive. Current value: %s", _numPartitions));
+      Preconditions.checkArgument(_partitionFunction != null, "Partition function should not be null!");
+      try {
+        PartitionFunctionFactory.PartitionFunctionType.fromString(_partitionFunction);
+      } catch (IllegalArgumentException e) {
+        LOGGER.error("Partition function needs to be one of Modulo, Murmur, ByteArray, HashCode, it is currently {}",
+            _partitionColumn);
+        throw new IllegalArgumentException(e);
+      }
+    }
+    if (_sortingColumn != null) {
+      Preconditions.checkArgument(fieldNames.contains(_sortingColumn),
+          String.format("Sorted column: %s is not found from the schema of input files.", _sortingColumn));
+    }
+  }
+}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/SegmentPreprocessingReducer.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/AvroDataPreprocessingReducer.java
similarity index 63%
copy from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/SegmentPreprocessingReducer.java
copy to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/AvroDataPreprocessingReducer.java
index 9c07e6f..5fcbf10 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/SegmentPreprocessingReducer.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/AvroDataPreprocessingReducer.java
@@ -19,7 +19,6 @@
 package org.apache.pinot.hadoop.job.reducers;
 
 import java.io.IOException;
-import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.mapred.AvroKey;
 import org.apache.avro.mapred.AvroValue;
@@ -33,33 +32,43 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
-public class SegmentPreprocessingReducer<T> extends Reducer<T, AvroValue<GenericRecord>, AvroKey<GenericRecord>, NullWritable> {
-  private static final Logger LOGGER = LoggerFactory.getLogger(SegmentPreprocessingReducer.class);
+public class AvroDataPreprocessingReducer<T> extends Reducer<T, AvroValue<GenericRecord>, AvroKey<GenericRecord>, NullWritable> {
+  private static final Logger LOGGER = LoggerFactory.getLogger(AvroDataPreprocessingReducer.class);
 
   private AvroMultipleOutputs _multipleOutputs;
-  private AtomicInteger _counter;
-  private int _maxNumberOfRecords;
+  private long _numRecords;
+  private int _maxNumRecordsPerFile;
   private String _filePrefix;
 
   @Override
   public void setup(Context context) {
-    LOGGER.info("Using multiple outputs strategy.");
     Configuration configuration = context.getConfiguration();
-    _multipleOutputs = new AvroMultipleOutputs(context);
-    _counter = new AtomicInteger();
     // If it's 0, the output file won't be split into multiple files.
     // If not, output file will be split when the number of records reaches this number.
-    _maxNumberOfRecords = configuration.getInt(InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE, 0);
-    LOGGER.info("Maximum number of records per file: {}", _maxNumberOfRecords);
-    _filePrefix = RandomStringUtils.randomAlphanumeric(4);
+    _maxNumRecordsPerFile = configuration.getInt(InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE, 0);
+    if (_maxNumRecordsPerFile > 0) {
+      LOGGER.info("Using multiple outputs strategy.");
+      _multipleOutputs = new AvroMultipleOutputs(context);
+      _numRecords = 0L;
+      _filePrefix = RandomStringUtils.randomAlphanumeric(4);
+      LOGGER.info("Initialized AvroDataPreprocessingReducer with maxNumRecordsPerFile: {}", _maxNumRecordsPerFile);
+    } else {
+      LOGGER.info("Initialized AvroDataPreprocessingReducer without limit on maxNumRecordsPerFile");
+    }
   }
 
   @Override
   public void reduce(final T inputRecord, final Iterable<AvroValue<GenericRecord>> values, final Context context)
       throws IOException, InterruptedException {
-    for (final AvroValue<GenericRecord> value : values) {
-      String fileName = generateFileName();
-      _multipleOutputs.write(new AvroKey<>(value.datum()), NullWritable.get(), fileName);
+    if (_maxNumRecordsPerFile > 0) {
+      for (final AvroValue<GenericRecord> value : values) {
+        String fileName = _filePrefix + (_numRecords++ / _maxNumRecordsPerFile);
+        _multipleOutputs.write(new AvroKey<>(value.datum()), NullWritable.get(), fileName);
+      }
+    } else {
+      for (final AvroValue<GenericRecord> value : values) {
+        context.write(new AvroKey<>(value.datum()), NullWritable.get());
+      }
     }
   }
 
@@ -73,12 +82,4 @@ public class SegmentPreprocessingReducer<T> extends Reducer<T, AvroValue<Generic
     }
     LOGGER.info("Finished cleaning up reducer.");
   }
-
-  private String generateFileName() {
-    if (_maxNumberOfRecords == 0) {
-      return _filePrefix;
-    } else {
-      return _filePrefix + (_counter.getAndIncrement() / _maxNumberOfRecords);
-    }
-  }
 }
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/SegmentPreprocessingReducer.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/OrcDataPreprocessingReducer.java
similarity index 54%
rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/SegmentPreprocessingReducer.java
rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/OrcDataPreprocessingReducer.java
index 9c07e6f..a3387a2 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/SegmentPreprocessingReducer.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/OrcDataPreprocessingReducer.java
@@ -19,47 +19,56 @@
 package org.apache.pinot.hadoop.job.reducers;
 
 import java.io.IOException;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.mapred.AvroKey;
-import org.apache.avro.mapred.AvroValue;
-import org.apache.avro.mapreduce.AvroMultipleOutputs;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
+import org.apache.orc.mapred.OrcStruct;
+import org.apache.orc.mapred.OrcValue;
 import org.apache.pinot.hadoop.job.InternalConfigConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
-public class SegmentPreprocessingReducer<T> extends Reducer<T, AvroValue<GenericRecord>, AvroKey<GenericRecord>, NullWritable> {
-  private static final Logger LOGGER = LoggerFactory.getLogger(SegmentPreprocessingReducer.class);
+public class OrcDataPreprocessingReducer extends Reducer<WritableComparable, OrcValue, NullWritable, OrcStruct> {
+  private static final Logger LOGGER = LoggerFactory.getLogger(OrcDataPreprocessingReducer.class);
 
-  private AvroMultipleOutputs _multipleOutputs;
-  private AtomicInteger _counter;
-  private int _maxNumberOfRecords;
+  private int _maxNumRecordsPerFile;
+  private MultipleOutputs<NullWritable, OrcStruct> _multipleOutputs;
+  private long _numRecords;
   private String _filePrefix;
 
   @Override
   public void setup(Context context) {
-    LOGGER.info("Using multiple outputs strategy.");
     Configuration configuration = context.getConfiguration();
-    _multipleOutputs = new AvroMultipleOutputs(context);
-    _counter = new AtomicInteger();
     // If it's 0, the output file won't be split into multiple files.
     // If not, output file will be split when the number of records reaches this number.
-    _maxNumberOfRecords = configuration.getInt(InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE, 0);
-    LOGGER.info("Maximum number of records per file: {}", _maxNumberOfRecords);
-    _filePrefix = RandomStringUtils.randomAlphanumeric(4);
+    _maxNumRecordsPerFile = configuration.getInt(InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE, 0);
+    if (_maxNumRecordsPerFile > 0) {
+      LOGGER.info("Using multiple outputs strategy.");
+      _multipleOutputs = new MultipleOutputs<>(context);
+      _numRecords = 0L;
+      _filePrefix = RandomStringUtils.randomAlphanumeric(4);
+      LOGGER.info("Initialized OrcDataPreprocessingReducer with maxNumRecordsPerFile: {}", _maxNumRecordsPerFile);
+    } else {
+      LOGGER.info("Initialized OrcDataPreprocessingReducer without limit on maxNumRecordsPerFile");
+    }
   }
 
   @Override
-  public void reduce(final T inputRecord, final Iterable<AvroValue<GenericRecord>> values, final Context context)
+  public void reduce(WritableComparable key, Iterable<OrcValue> values, Context context)
       throws IOException, InterruptedException {
-    for (final AvroValue<GenericRecord> value : values) {
-      String fileName = generateFileName();
-      _multipleOutputs.write(new AvroKey<>(value.datum()), NullWritable.get(), fileName);
+    if (_maxNumRecordsPerFile > 0) {
+      for (final OrcValue value : values) {
+        String fileName = _filePrefix + (_numRecords++ / _maxNumRecordsPerFile);
+        _multipleOutputs.write(NullWritable.get(), (OrcStruct) value.value, fileName);
+      }
+    } else {
+      for (final OrcValue value : values) {
+        context.write(NullWritable.get(), (OrcStruct) value.value);
+      }
     }
   }
 
@@ -73,12 +82,4 @@ public class SegmentPreprocessingReducer<T> extends Reducer<T, AvroValue<Generic
     }
     LOGGER.info("Finished cleaning up reducer.");
   }
-
-  private String generateFileName() {
-    if (_maxNumberOfRecords == 0) {
-      return _filePrefix;
-    } else {
-      return _filePrefix + (_counter.getAndIncrement() / _maxNumberOfRecords);
-    }
-  }
 }
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/DataFileUtils.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/DataFileUtils.java
new file mode 100644
index 0000000..58e1c1d
--- /dev/null
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/DataFileUtils.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.hadoop.utils.preprocess;
+
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+
+
+public class DataFileUtils {
+  private DataFileUtils() {
+  }
+
+  public static final String AVRO_FILE_EXTENSION = ".avro";
+  public static final String ORC_FILE_EXTENSION = ".orc";
+
+  /**
+   * Returns the data files under the input directory with the given file extension.
+   */
+  public static List<Path> getDataFiles(Path inputDir, String dataFileExtension)
+      throws IOException {
+    FileStatus fileStatus = HadoopUtils.DEFAULT_FILE_SYSTEM.getFileStatus(inputDir);
+    Preconditions.checkState(fileStatus.isDirectory(), "Path: %s is not a directory", inputDir);
+    List<Path> dataFiles = new ArrayList<>();
+    getDataFilesHelper(HadoopUtils.DEFAULT_FILE_SYSTEM.listStatus(inputDir), dataFileExtension, dataFiles);
+    return dataFiles;
+  }
+
+  private static void getDataFilesHelper(FileStatus[] fileStatuses, String dataFileExtension, List<Path> dataFiles)
+      throws IOException {
+    for (FileStatus fileStatus : fileStatuses) {
+      Path path = fileStatus.getPath();
+      if (fileStatus.isDirectory()) {
+        getDataFilesHelper(HadoopUtils.DEFAULT_FILE_SYSTEM.listStatus(path), dataFileExtension, dataFiles);
+      } else {
+        Preconditions.checkState(fileStatus.isFile(), "Path: %s is neither a directory nor a file", path);
+        if (path.getName().endsWith(dataFileExtension)) {
+          dataFiles.add(path);
+        }
+      }
+    }
+  }
+}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/DataPreprocessingUtils.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/DataPreprocessingUtils.java
new file mode 100644
index 0000000..bf399af
--- /dev/null
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/DataPreprocessingUtils.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.hadoop.utils.preprocess;
+
+import java.util.Set;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.pinot.spi.data.FieldSpec;
+
+
+public class DataPreprocessingUtils {
+  private DataPreprocessingUtils() {
+  }
+
+  /**
+   * Converts a value into {@link WritableComparable} based on the given data type.
+   * <p>NOTE: The passed in value must be either a Number or a String.
+   */
+  public static WritableComparable convertToWritableComparable(Object value, FieldSpec.DataType dataType) {
+    if (value instanceof Number) {
+      Number numberValue = (Number) value;
+      switch (dataType) {
+        case INT:
+          return new IntWritable(numberValue.intValue());
+        case LONG:
+          return new LongWritable(numberValue.longValue());
+        case FLOAT:
+          return new FloatWritable(numberValue.floatValue());
+        case DOUBLE:
+          return new DoubleWritable(numberValue.doubleValue());
+        case STRING:
+          return new Text(numberValue.toString());
+        default:
+          throw new IllegalArgumentException("Unsupported data type: " + dataType);
+      }
+    } else if (value instanceof String) {
+      String stringValue = (String) value;
+      switch (dataType) {
+        case INT:
+          return new IntWritable(Integer.parseInt(stringValue));
+        case LONG:
+          return new LongWritable(Long.parseLong(stringValue));
+        case FLOAT:
+          return new FloatWritable(Float.parseFloat(stringValue));
+        case DOUBLE:
+          return new DoubleWritable(Double.parseDouble(stringValue));
+        case STRING:
+          return new Text(stringValue);
+        default:
+          throw new IllegalArgumentException("Unsupported data type: " + dataType);
+      }
+    } else {
+      throw new IllegalArgumentException(
+          String.format("Value: %s must be either a Number or a String, found: %s", value, value.getClass()));
+    }
+  }
+
+  public static Set<Operation> getOperations(Set<Operation> operationSet, String preprocessingOperationsString) {
+    String[] preprocessingOpsArray = preprocessingOperationsString.split(",");
+    for (String preprocessingOps : preprocessingOpsArray) {
+      operationSet.add(Operation.getOperation(preprocessingOps.trim().toUpperCase()));
+    }
+    return operationSet;
+  }
+
+  public enum Operation {
+    PARTITION,
+    SORT,
+    RESIZE;
+
+    public static Operation getOperation(String operationString) {
+      for (Operation operation : Operation.values()) {
+        if (operation.name().equals(operationString)) {
+          return operation;
+        }
+      }
+      throw new IllegalArgumentException("Unsupported data preprocessing operation: " + operationString);
+    }
+  }
+}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/HadoopUtils.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/HadoopUtils.java
new file mode 100644
index 0000000..0596259
--- /dev/null
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/HadoopUtils.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.hadoop.utils.preprocess;
+
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+
+
+public class HadoopUtils {
+  private HadoopUtils() {
+  }
+
+  public static final Configuration DEFAULT_CONFIGURATION;
+  public static final FileSystem DEFAULT_FILE_SYSTEM;
+
+  static {
+    DEFAULT_CONFIGURATION = new Configuration();
+    try {
+      DEFAULT_FILE_SYSTEM = FileSystem.get(DEFAULT_CONFIGURATION);
+    } catch (IOException e) {
+      throw new IllegalStateException("Failed to get the default file system", e);
+    }
+  }
+}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/OrcUtils.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/OrcUtils.java
new file mode 100644
index 0000000..dcfc3b5
--- /dev/null
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/OrcUtils.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.hadoop.utils.preprocess;
+
+import org.apache.hadoop.hive.serde2.io.DateWritable;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.ShortWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.orc.mapred.OrcTimestamp;
+
+
+public class OrcUtils {
+  private OrcUtils() {
+  }
+
+  /**
+   * Converts the ORC value into Number or String.
+   * <p>The following ORC types are supported:
+   * <ul>
+   *   <li>IntWritable -> Integer</li>
+   *   <li>LongWritable -> Long</li>
+   *   <li>FloatWritable -> Float</li>
+   *   <li>DoubleWritable -> Double</li>
+   *   <li>Text -> String</li>
+   *   <li>BooleanWritable -> String</li>
+   *   <li>ByteWritable -> Byte</li>
+   *   <li>ShortWritable -> Short</li>
+   *   <li>DateWritable -> Long</li>
+   *   <li>OrcTimestamp -> Long</li>
+   * </ul>
+   */
+  public static Object convert(WritableComparable orcValue) {
+    if (orcValue instanceof IntWritable) {
+      return ((IntWritable) orcValue).get();
+    }
+    if (orcValue instanceof LongWritable) {
+      return ((LongWritable) orcValue).get();
+    }
+    if (orcValue instanceof FloatWritable) {
+      return ((FloatWritable) orcValue).get();
+    }
+    if (orcValue instanceof DoubleWritable) {
+      return ((DoubleWritable) orcValue).get();
+    }
+    if (orcValue instanceof Text) {
+      return orcValue.toString();
+    }
+    if (orcValue instanceof BooleanWritable) {
+      return Boolean.toString(((BooleanWritable) orcValue).get());
+    }
+    if (orcValue instanceof ByteWritable) {
+      return ((ByteWritable) orcValue).get();
+    }
+    if (orcValue instanceof ShortWritable) {
+      return ((ShortWritable) orcValue).get();
+    }
+    if (orcValue instanceof DateWritable) {
+      return ((DateWritable) orcValue).get().getTime();
+    }
+    if (orcValue instanceof OrcTimestamp) {
+      return ((OrcTimestamp) orcValue).getTime();
+    }
+    throw new IllegalArgumentException(
+        String.format("Illegal ORC value: %s, class: %s", orcValue, orcValue.getClass()));
+  }
+}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/TextComparator.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/TextComparator.java
new file mode 100644
index 0000000..65f1222
--- /dev/null
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/TextComparator.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.hadoop.utils.preprocess;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.pinot.spi.utils.StringUtils;
+
+
+/**
+ * Override the Text comparison logic to compare with the decoded String instead of the byte array.
+ */
+public class TextComparator extends WritableComparator {
+  public TextComparator() {
+    super(Text.class);
+  }
+
+  @Override
+  public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+    int n1 = WritableUtils.decodeVIntSize(b1[s1]);
+    int n2 = WritableUtils.decodeVIntSize(b2[s2]);
+    return StringUtils.decodeUtf8(b1, s1 + n1, l1 - n1).compareTo(StringUtils.decodeUtf8(b2, s2 + n2, l2 - n2));
+  }
+}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/jobs/SegmentPreprocessingJob.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/jobs/SegmentPreprocessingJob.java
index 2e9d98c..2e3b023 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/jobs/SegmentPreprocessingJob.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/jobs/SegmentPreprocessingJob.java
@@ -22,15 +22,11 @@ import com.google.common.base.Preconditions;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.Properties;
-import java.util.zip.GZIPInputStream;
-import org.apache.avro.file.DataFileStream;
-import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.pinot.ingestion.common.ControllerRestApi;
 import org.apache.pinot.ingestion.common.JobConfigConstants;
+import org.apache.pinot.spi.data.Schema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -75,26 +71,8 @@ public abstract class SegmentPreprocessingJob extends BaseSegmentJob {
   protected abstract void run()
       throws Exception;
 
-  /**
-   * Helper method that returns avro reader for the given avro file.
-   * If file name ends in 'gz' then returns the GZIP version, otherwise gives the regular reader.
-   *
-   * @param avroFile File to read
-   * @return Avro reader for the file.
-   * @throws IOException exception when accessing to IO
-   */
-  protected DataFileStream<GenericRecord> getAvroReader(Path avroFile)
-      throws IOException {
-    FileSystem fs = FileSystem.get(new Configuration());
-    if (avroFile.getName().endsWith("gz")) {
-      return new DataFileStream<>(new GZIPInputStream(fs.open(avroFile)), new GenericDatumReader<>());
-    } else {
-      return new DataFileStream<>(fs.open(avroFile), new GenericDatumReader<>());
-    }
-  }
-
   @Override
-  protected org.apache.pinot.spi.data.Schema getSchema()
+  protected Schema getSchema()
       throws IOException {
     try (ControllerRestApi controllerRestApi = getControllerRestApi()) {
       if (controllerRestApi != null) {
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java
index f966f9b..7af16df 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java
@@ -502,6 +502,13 @@ public abstract class FieldSpec implements Comparable<FieldSpec>, Serializable {
         throw new IllegalArgumentException(String.format("Cannot convert value: '%s' to type: %s", value, this));
       }
     }
+
+    /**
+     * Checks whether the data type can be a sorted column.
+     */
+    public boolean canBeASortedColumn() {
+      return this != BYTES && this != JSON && this != STRUCT && this != MAP && this != LIST;
+    }
   }
 
   @Override
diff --git a/pom.xml b/pom.xml
index a116022..d3013da 100644
--- a/pom.xml
+++ b/pom.xml
@@ -950,6 +950,11 @@
         <classifier>hadoop2</classifier>
       </dependency>
       <dependency>
+        <groupId>org.apache.orc</groupId>
+        <artifactId>orc-mapreduce</artifactId>
+        <version>1.5.9</version>
+      </dependency>
+      <dependency>
         <groupId>org.codehaus.jackson</groupId>
         <artifactId>jackson-core-asl</artifactId>
         <version>1.9.13</version>

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org