You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2021/06/15 21:06:38 UTC

[incubator-pinot] 01/01: Support data preprocessing for AVRO and ORC formats

This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch support-orc-format-preprocessing
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 07d12179170eda0abef6ac5d9ad17ed869fc2ec4
Author: Jack Li(Analytics Engineering) <jl...@jlli-mn1.linkedin.biz>
AuthorDate: Tue Jun 15 10:02:47 2021 -0700

    Support data preprocessing for AVRO and ORC formats
---
 .../v0_deprecated/pinot-hadoop/pom.xml             |   4 +
 .../hadoop/job/HadoopSegmentPreprocessingJob.java  | 371 +++++++++++++--------
 .../pinot/hadoop/job/InternalConfigConstants.java  |   3 +-
 .../job/mappers/AvroDataPreprocessingMapper.java   |  85 +++++
 .../job/mappers/OrcDataPreprocessingMapper.java    |  87 +++++
 .../job/mappers/SegmentPreprocessingMapper.java    |  53 ++-
 .../AvroDataPreprocessingPartitioner.java          |  77 +++++
 .../OrcDataPreprocessingPartitioner.java           |  83 +++++
 ...ucer.java => AvroDataPreprocessingReducer.java} |  45 +--
 ...ducer.java => OrcDataPreprocessingReducer.java} |  57 ++--
 .../hadoop/utils/preprocess/DataFileUtils.java     |  62 ++++
 .../utils/preprocess/DataPreprocessingUtils.java   |  76 +++++
 .../pinot/hadoop/utils/preprocess/HadoopUtils.java |  41 +++
 .../pinot/hadoop/utils/preprocess/OrcUtils.java    |  88 +++++
 .../hadoop/utils/preprocess/RawDataFormat.java     |  26 ++
 .../hadoop/utils/preprocess/TextComparator.java    |  41 +++
 pom.xml                                            |   5 +
 17 files changed, 1001 insertions(+), 203 deletions(-)

diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/pom.xml b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/pom.xml
index a0ea8ec..b8218e2 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/pom.xml
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/pom.xml
@@ -193,6 +193,10 @@
       <classifier>hadoop2</classifier>
     </dependency>
     <dependency>
+      <groupId>org.apache.orc</groupId>
+      <artifactId>orc-mapreduce</artifactId>
+    </dependency>
+    <dependency>
       <groupId>javax.xml.bind</groupId>
       <artifactId>jaxb-api</artifactId>
     </dependency>
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/HadoopSegmentPreprocessingJob.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/HadoopSegmentPreprocessingJob.java
index b4e87fc..c0edae7 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/HadoopSegmentPreprocessingJob.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/HadoopSegmentPreprocessingJob.java
@@ -21,7 +21,6 @@ package org.apache.pinot.hadoop.job;
 import com.google.common.base.Preconditions;
 import java.io.IOException;
 import java.io.InputStream;
-import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -31,15 +30,20 @@ import org.apache.avro.Schema;
 import org.apache.avro.file.DataFileStream;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.mapred.AvroKey;
-import org.apache.avro.mapred.AvroValue;
 import org.apache.avro.mapreduce.AvroJob;
+import org.apache.avro.mapreduce.AvroKeyInputFormat;
 import org.apache.avro.mapreduce.AvroKeyOutputFormat;
 import org.apache.avro.mapreduce.AvroMultipleOutputs;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobContext;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.MRJobConfig;
@@ -48,11 +52,25 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.pinot.hadoop.io.CombineAvroKeyInputFormat;
-import org.apache.pinot.hadoop.job.mappers.SegmentPreprocessingMapper;
+import org.apache.orc.OrcConf;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.mapred.OrcStruct;
+import org.apache.orc.mapred.OrcValue;
+import org.apache.orc.mapreduce.OrcInputFormat;
+import org.apache.orc.mapreduce.OrcOutputFormat;
+import org.apache.pinot.hadoop.job.mappers.AvroDataPreprocessingMapper;
+import org.apache.pinot.hadoop.job.mappers.OrcDataPreprocessingMapper;
+import org.apache.pinot.hadoop.job.partitioners.AvroDataPreprocessingPartitioner;
 import org.apache.pinot.hadoop.job.partitioners.GenericPartitioner;
-import org.apache.pinot.hadoop.job.reducers.SegmentPreprocessingReducer;
+import org.apache.pinot.hadoop.job.partitioners.OrcDataPreprocessingPartitioner;
+import org.apache.pinot.hadoop.job.reducers.AvroDataPreprocessingReducer;
+import org.apache.pinot.hadoop.job.reducers.OrcDataPreprocessingReducer;
 import org.apache.pinot.hadoop.utils.PinotHadoopJobPreparationHelper;
+import org.apache.pinot.hadoop.utils.preprocess.DataFileUtils;
+import org.apache.pinot.hadoop.utils.preprocess.HadoopUtils;
+import org.apache.pinot.hadoop.utils.preprocess.RawDataFormat;
+import org.apache.pinot.hadoop.utils.preprocess.TextComparator;
 import org.apache.pinot.ingestion.common.ControllerRestApi;
 import org.apache.pinot.ingestion.common.JobConfigConstants;
 import org.apache.pinot.ingestion.jobs.SegmentPreprocessingJob;
@@ -65,28 +83,36 @@ import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableCustomConfig;
 import org.apache.pinot.spi.data.DateTimeFieldSpec;
 import org.apache.pinot.spi.data.DateTimeFormatSpec;
+import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.utils.IngestionConfigUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
 /**
- * A Hadoop job which provides partitioning, sorting, and resizing against the input files, which is raw data in Avro format.
+ * A Hadoop job which provides partitioning, sorting, and resizing against the input files, which is raw data in either Avro or Orc format.
  * Thus, the output files are partitioned, sorted, resized after this job.
  * In order to run this job, the following configs need to be specified in job properties:
  * * enable.preprocessing: false by default. Enables preprocessing job.
  */
 public class HadoopSegmentPreprocessingJob extends SegmentPreprocessingJob {
-  private static final Logger _logger = LoggerFactory.getLogger(HadoopSegmentPreprocessingJob.class);
-  protected FileSystem _fileSystem;
+  private static final Logger LOGGER = LoggerFactory.getLogger(HadoopSegmentPreprocessingJob.class);
+
   private String _partitionColumn;
   private int _numPartitions;
   private String _partitionFunction;
-  private String _sortedColumn;
+
+  private String _sortingColumn;
+  private FieldSpec.DataType _sortingColumnType;
+
   private int _numOutputFiles;
+  private int _maxNumRecordsPerFile;
+
   private TableConfig _tableConfig;
   private org.apache.pinot.spi.data.Schema _pinotTableSchema;
 
+  private Set<String> _preprocessingOperations;
+
   public HadoopSegmentPreprocessingJob(final Properties properties) {
     super(properties);
   }
@@ -94,56 +120,102 @@ public class HadoopSegmentPreprocessingJob extends SegmentPreprocessingJob {
   public void run()
       throws Exception {
     if (!_enablePreprocessing) {
-      _logger.info("Pre-processing job is disabled.");
+      LOGGER.info("Pre-processing job is disabled.");
       return;
     } else {
-      _logger.info("Starting {}", getClass().getSimpleName());
+      LOGGER.info("Starting {}", getClass().getSimpleName());
     }
 
-    _fileSystem = FileSystem.get(_inputSegmentDir.toUri(), getConf());
-    final List<Path> inputDataPaths = getDataFilePaths(_inputSegmentDir);
-    Preconditions.checkState(inputDataPaths.size() != 0, "No files in the input directory.");
-
-    if (_fileSystem.exists(_preprocessedOutputDir)) {
-      _logger.warn("Found output folder {}, deleting", _preprocessedOutputDir);
-      _fileSystem.delete(_preprocessedOutputDir, true);
-    }
     setTableConfigAndSchema();
-
-    _logger.info("Initializing a pre-processing job");
-    Job job = Job.getInstance(getConf());
-
-    Path sampleAvroPath = inputDataPaths.get(0);
-    int numInputPaths = inputDataPaths.size();
-
-    setValidationConfigs(job, sampleAvroPath);
-    setHadoopJobConfigs(job, numInputPaths);
-
-    // Avro Schema configs.
-    Schema avroSchema = getSchema(sampleAvroPath);
-    _logger.info("Schema is: {}", avroSchema.toString(true));
-    setSchemaParams(job, avroSchema);
-
-    // Set up mapper output key
-    Set<Schema.Field> fieldSet = new HashSet<>();
-
+    fetchPreProcessingOperations();
     fetchPartitioningConfig();
     fetchSortingConfig();
     fetchResizingConfig();
-    validateConfigsAgainstSchema(avroSchema);
 
-    // Partition configs.
+    // Cleans up preprocessed output dir if exists
+    cleanUpPreprocessedOutputs(_preprocessedOutputDir);
+
+    final List<Path> avroFiles = DataFileUtils.getDataFiles(_inputSegmentDir, DataFileUtils.AVRO_FILE_EXTENSION);
+    final List<Path> orcFiles = DataFileUtils.getDataFiles(_inputSegmentDir, DataFileUtils.ORC_FILE_EXTENSION);
+
+    int numAvroFiles = avroFiles.size();
+    int numOrcFiles = orcFiles.size();
+    Preconditions.checkState(numAvroFiles == 0 || numOrcFiles == 0,
+        "Cannot preprocess mixed AVRO files: %s and ORC files: %s in directories: %s", avroFiles, orcFiles,
+        _inputSegmentDir);
+    Preconditions
+        .checkState(numAvroFiles > 0 || numOrcFiles > 0, "Failed to find any AVRO or ORC file in directories: %s",
+            _inputSegmentDir);
+    List<Path> inputDataPaths;
+    RawDataFormat rawDataFormat;
+    if (numAvroFiles > 0) {
+      rawDataFormat = RawDataFormat.AVRO;
+      inputDataPaths = avroFiles;
+      LOGGER.info("Find AVRO files: {} in directories: {}", avroFiles, _inputSegmentDir);
+    } else {
+      rawDataFormat = RawDataFormat.ORC;
+      inputDataPaths = orcFiles;
+      LOGGER.info("Find ORC files: {} in directories: {}", orcFiles, _inputSegmentDir);
+    }
+
+    LOGGER.info("Initializing a pre-processing job");
+    Job job = Job.getInstance(HadoopUtils.DEFAULT_CONFIGURATION);
+    Configuration jobConf = job.getConfiguration();
+    // Input and output paths.
+    Path sampleRawDataPath = inputDataPaths.get(0);
+    int numInputPaths = inputDataPaths.size();
+    setValidationConfigs(job, sampleRawDataPath);
+    for (Path inputFile : inputDataPaths) {
+      FileInputFormat.addInputPath(job, inputFile);
+    }
+    setHadoopJobConfigs(job);
+
+    // Sorting column
+    if (_sortingColumn != null) {
+      LOGGER.info("Adding sorting column: {} to job config", _sortingColumn);
+      jobConf.set(InternalConfigConstants.SORTING_COLUMN_CONFIG, _sortingColumn);
+      jobConf.set(InternalConfigConstants.SORTING_COLUMN_TYPE, _sortingColumnType.name());
+
+      switch (_sortingColumnType) {
+        case INT:
+          job.setMapOutputKeyClass(IntWritable.class);
+          break;
+        case LONG:
+          job.setMapOutputKeyClass(LongWritable.class);
+          break;
+        case FLOAT:
+          job.setMapOutputKeyClass(FloatWritable.class);
+          break;
+        case DOUBLE:
+          job.setMapOutputKeyClass(DoubleWritable.class);
+          break;
+        case STRING:
+          job.setMapOutputKeyClass(Text.class);
+          job.setSortComparatorClass(TextComparator.class);
+          break;
+        default:
+          throw new IllegalStateException();
+      }
+    } else {
+      job.setMapOutputKeyClass(NullWritable.class);
+    }
+
+    // Partition column
     int numReduceTasks = 0;
     if (_partitionColumn != null) {
       numReduceTasks = _numPartitions;
-      job.getConfiguration().set(InternalConfigConstants.ENABLE_PARTITIONING, "true");
+      jobConf.set(InternalConfigConstants.ENABLE_PARTITIONING, "true");
       job.setPartitionerClass(GenericPartitioner.class);
-      job.getConfiguration().set(InternalConfigConstants.PARTITION_COLUMN_CONFIG, _partitionColumn);
+      jobConf.set(InternalConfigConstants.PARTITION_COLUMN_CONFIG, _partitionColumn);
       if (_partitionFunction != null) {
-        job.getConfiguration().set(InternalConfigConstants.PARTITION_FUNCTION_CONFIG, _partitionFunction);
+        jobConf.set(InternalConfigConstants.PARTITION_FUNCTION_CONFIG, _partitionFunction);
+      }
+      jobConf.setInt(InternalConfigConstants.NUM_PARTITIONS_CONFIG, numReduceTasks);
+      if (rawDataFormat == RawDataFormat.AVRO) {
+        job.setPartitionerClass(AvroDataPreprocessingPartitioner.class);
+      } else {
+        job.setPartitionerClass(OrcDataPreprocessingPartitioner.class);
       }
-      job.getConfiguration().set(InternalConfigConstants.NUM_PARTITIONS_CONFIG, Integer.toString(numReduceTasks));
-      setMaxNumRecordsConfigIfSpecified(job);
     } else {
       if (_numOutputFiles > 0) {
         numReduceTasks = _numOutputFiles;
@@ -151,45 +223,64 @@ public class HadoopSegmentPreprocessingJob extends SegmentPreprocessingJob {
         // default number of input paths
         numReduceTasks = inputDataPaths.size();
       }
-      // Partitioning is disabled. Adding hashcode as one of the fields to mapper output key.
-      // so that all the rows can be spread evenly.
-      addHashCodeField(fieldSet);
     }
-    job.setInputFormatClass(CombineAvroKeyInputFormat.class);
-
-    _logger.info("Number of reduce tasks for pre-processing job: {}", numReduceTasks);
+    // Maximum number of records per output file
+    jobConf.set(InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE, Integer.toString(_maxNumRecordsPerFile));
+    // Number of reducers
+    LOGGER.info("Number of reduce tasks for pre-processing job: {}", numReduceTasks);
     job.setNumReduceTasks(numReduceTasks);
 
-    // Sort config.
-    if (_sortedColumn != null) {
-      _logger.info("Adding sorted column: {} to job config", _sortedColumn);
-      job.getConfiguration().set(InternalConfigConstants.SORTED_COLUMN_CONFIG, _sortedColumn);
-
-      addSortedColumnField(avroSchema, fieldSet);
+    // Mapper and reducer configs.
+    if (rawDataFormat == RawDataFormat.AVRO) {
+      Schema avroSchema = getAvroSchema(sampleRawDataPath);
+      LOGGER.info("Avro schema is: {}", avroSchema.toString(true));
+
+      // Set up mapper output key
+      validateConfigsAgainstSchema(avroSchema);
+
+      job.setInputFormatClass(AvroKeyInputFormat.class);
+      job.setMapperClass(AvroDataPreprocessingMapper.class);
+      jobConf.setInt(JobContext.NUM_MAPS, numInputPaths);
+
+      job.setReducerClass(AvroDataPreprocessingReducer.class);
+      AvroMultipleOutputs.addNamedOutput(job, "avro", AvroKeyOutputFormat.class, avroSchema);
+      AvroMultipleOutputs.setCountersEnabled(job, true);
+      // Use LazyOutputFormat to avoid creating empty files.
+      LazyOutputFormat.setOutputFormatClass(job, AvroKeyOutputFormat.class);
+      job.setOutputKeyClass(AvroKey.class);
+      job.setOutputValueClass(NullWritable.class);
+
+      AvroJob.setInputKeySchema(job, avroSchema);
+      AvroJob.setMapOutputValueSchema(job, avroSchema);
+      AvroJob.setOutputKeySchema(job, avroSchema);
     } else {
-      // If sorting is disabled, hashcode will be the only factor for sort/group comparator.
-      addHashCodeField(fieldSet);
+      String orcSchemaString = getOrcSchemaString(sampleRawDataPath);
+      LOGGER.info("Orc schema is: {}", orcSchemaString);
+
+      job.setInputFormatClass(OrcInputFormat.class);
+      job.setMapperClass(OrcDataPreprocessingMapper.class);
+      job.setMapOutputValueClass(OrcValue.class);
+      jobConf.setInt(JobContext.NUM_MAPS, numInputPaths);
+      OrcConf.MAPRED_SHUFFLE_VALUE_SCHEMA.setString(jobConf, orcSchemaString);
+
+      job.setReducerClass(OrcDataPreprocessingReducer.class);
+      // Use LazyOutputFormat to avoid creating empty files.
+      LazyOutputFormat.setOutputFormatClass(job, OrcOutputFormat.class);
+      job.setOutputKeyClass(NullWritable.class);
+      job.setOutputValueClass(OrcStruct.class);
+      OrcConf.MAPRED_OUTPUT_SCHEMA.setString(jobConf, orcSchemaString);
     }
 
-    // Creates a wrapper for the schema of output key in mapper.
-    Schema mapperOutputKeySchema = Schema.createRecord(/*name*/"record", /*doc*/"", /*namespace*/"", false);
-    mapperOutputKeySchema.setFields(new ArrayList<>(fieldSet));
-    _logger.info("Mapper output schema: {}", mapperOutputKeySchema);
-
-    AvroJob.setInputKeySchema(job, avroSchema);
-    AvroJob.setMapOutputKeySchema(job, mapperOutputKeySchema);
-    AvroJob.setMapOutputValueSchema(job, avroSchema);
-    AvroJob.setOutputKeySchema(job, avroSchema);
-
     // Since we aren't extending AbstractHadoopJob, we need to add the jars for the job to
     // distributed cache ourselves. Take a look at how the addFilesToDistributedCache is
     // implemented so that you know what it does.
-    _logger.info("HDFS class path: " + _pathToDependencyJar);
+    LOGGER.info("HDFS class path: " + _pathToDependencyJar);
     if (_pathToDependencyJar != null) {
-      _logger.info("Copying jars locally.");
-      PinotHadoopJobPreparationHelper.addDepsJarToDistributedCacheHelper(_fileSystem, job, _pathToDependencyJar);
+      LOGGER.info("Copying jars locally.");
+      PinotHadoopJobPreparationHelper
+          .addDepsJarToDistributedCacheHelper(HadoopUtils.DEFAULT_FILE_SYSTEM, job, _pathToDependencyJar);
     } else {
-      _logger.info("Property '{}' not specified.", JobConfigConstants.PATH_TO_DEPS_JAR);
+      LOGGER.info("Property '{}' not specified.", JobConfigConstants.PATH_TO_DEPS_JAR);
     }
 
     long startTime = System.currentTimeMillis();
@@ -199,11 +290,30 @@ public class HadoopSegmentPreprocessingJob extends SegmentPreprocessingJob {
       throw new RuntimeException("Job failed : " + job);
     }
 
-    _logger.info("Finished pre-processing job in {}ms", (System.currentTimeMillis() - startTime));
+    LOGGER.info("Finished pre-processing job in {}ms", (System.currentTimeMillis() - startTime));
+  }
+
+  private void fetchPreProcessingOperations() {
+    _preprocessingOperations = new HashSet<>();
+    TableCustomConfig customConfig = _tableConfig.getCustomConfig();
+    if (customConfig != null) {
+      Map<String, String> customConfigMap = customConfig.getCustomConfigs();
+      if (customConfigMap != null && !customConfigMap.isEmpty()) {
+        String preprocessingOperationsString = customConfigMap.getOrDefault("preprocessing.operations", "");
+        String[] preprocessingOpsArray = preprocessingOperationsString.split(",");
+        for (String preprocessingOps : preprocessingOpsArray) {
+          _preprocessingOperations.add(preprocessingOps.trim().toLowerCase());
+        }
+      }
+    }
   }
 
   private void fetchPartitioningConfig() {
     // Fetch partition info from table config.
+    if (!_preprocessingOperations.contains("partition")) {
+      LOGGER.info("Partitioning is disabled.");
+      return;
+    }
     SegmentPartitionConfig segmentPartitionConfig = _tableConfig.getIndexingConfig().getSegmentPartitionConfig();
     if (segmentPartitionConfig != null) {
       Map<String, ColumnPartitionConfig> columnPartitionMap = segmentPartitionConfig.getColumnPartitionMap();
@@ -215,23 +325,39 @@ public class HadoopSegmentPreprocessingJob extends SegmentPreprocessingJob {
         _partitionFunction = segmentPartitionConfig.getFunctionName(_partitionColumn);
       }
     } else {
-      _logger.info("Segment partition config is null for table: {}", _tableConfig.getTableName());
+      LOGGER.info("Segment partition config is null for table: {}", _tableConfig.getTableName());
     }
   }
 
   private void fetchSortingConfig() {
+    if (!_preprocessingOperations.contains("sort")) {
+      LOGGER.info("Sorting is disabled.");
+      return;
+    }
     // Fetch sorting info from table config.
     IndexingConfig indexingConfig = _tableConfig.getIndexingConfig();
     List<String> sortedColumns = indexingConfig.getSortedColumn();
     if (sortedColumns != null) {
       Preconditions.checkArgument(sortedColumns.size() <= 1, "There should be at most 1 sorted column in the table.");
       if (sortedColumns.size() == 1) {
-        _sortedColumn = sortedColumns.get(0);
+        _sortingColumn = sortedColumns.get(0);
+        FieldSpec fieldSpec = _pinotTableSchema.getFieldSpecFor(_sortingColumn);
+        Preconditions.checkState(fieldSpec != null, "Failed to find sorting column: {} in the schema", _sortingColumn);
+        Preconditions
+            .checkState(fieldSpec.isSingleValueField(), "Cannot sort on multi-value column: %s", _sortingColumn);
+        _sortingColumnType = fieldSpec.getDataType();
+        Preconditions.checkState(_sortingColumnType != FieldSpec.DataType.BYTES, "Cannot sort on BYTES column: %s",
+            _sortingColumn);
+        LOGGER.info("Sorting the data with column: {} of type: {}", _sortingColumn, _sortingColumnType);
       }
     }
   }
 
   private void fetchResizingConfig() {
+    if (!_preprocessingOperations.contains("resize")) {
+      LOGGER.info("Resizing is disabled.");
+      return;
+    }
     TableCustomConfig tableCustomConfig = _tableConfig.getCustomConfig();
     if (tableCustomConfig == null) {
       _numOutputFiles = 0;
@@ -246,14 +372,7 @@ public class HadoopSegmentPreprocessingJob extends SegmentPreprocessingJob {
     } else {
       _numOutputFiles = 0;
     }
-  }
 
-  private void setMaxNumRecordsConfigIfSpecified(Job job) {
-    TableCustomConfig tableCustomConfig = _tableConfig.getCustomConfig();
-    if (tableCustomConfig == null) {
-      return;
-    }
-    Map<String, String> customConfigsMap = tableCustomConfig.getCustomConfigs();
     if (customConfigsMap != null && customConfigsMap
         .containsKey(InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE)) {
       int maxNumRecords =
@@ -261,9 +380,8 @@ public class HadoopSegmentPreprocessingJob extends SegmentPreprocessingJob {
       Preconditions.checkArgument(maxNumRecords > 0,
           "The value of " + InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE
               + " should be positive. Current value: " + maxNumRecords);
-      _logger.info("Setting {} to {}", InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE, maxNumRecords);
-      job.getConfiguration()
-          .set(InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE, Integer.toString(maxNumRecords));
+      LOGGER.info("Setting {} to {}", InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE, maxNumRecords);
+      _maxNumRecordsPerFile = maxNumRecords;
     }
   }
 
@@ -273,13 +391,12 @@ public class HadoopSegmentPreprocessingJob extends SegmentPreprocessingJob {
    * @return Input schema
    * @throws IOException exception when accessing to IO
    */
-  private Schema getSchema(Path inputPathDir)
+  private Schema getAvroSchema(Path inputPathDir)
       throws IOException {
-    FileSystem fs = FileSystem.get(new Configuration());
     Schema avroSchema = null;
-    for (FileStatus fileStatus : fs.listStatus(inputPathDir)) {
+    for (FileStatus fileStatus : HadoopUtils.DEFAULT_FILE_SYSTEM.listStatus(inputPathDir)) {
       if (fileStatus.isFile() && fileStatus.getPath().getName().endsWith(".avro")) {
-        _logger.info("Extracting schema from " + fileStatus.getPath());
+        LOGGER.info("Extracting schema from " + fileStatus.getPath());
         try (DataFileStream<GenericRecord> dataStreamReader = getAvroReader(inputPathDir)) {
           avroSchema = dataStreamReader.getSchema();
         }
@@ -289,19 +406,17 @@ public class HadoopSegmentPreprocessingJob extends SegmentPreprocessingJob {
     return avroSchema;
   }
 
-  private void addSortedColumnField(Schema schema, Set<Schema.Field> fieldSet) {
-    // Sorting is enabled. Adding sorted column value to mapper output key.
-    Schema sortedColumnSchema = schema.getField(_sortedColumn).schema();
-    Schema sortedColumnAsKeySchema;
-    if (sortedColumnSchema.getType().equals(Schema.Type.UNION)) {
-      sortedColumnAsKeySchema = Schema.createUnion(sortedColumnSchema.getTypes());
-    } else if (sortedColumnSchema.getType().equals(Schema.Type.ARRAY)) {
-      sortedColumnAsKeySchema = Schema.createArray(sortedColumnSchema.getElementType());
-    } else {
-      sortedColumnAsKeySchema = Schema.create(sortedColumnSchema.getType());
+  /**
+   * Finds the orc file and return its orc schema.
+   */
+  private String getOrcSchemaString(Path orcFile) {
+    String orcSchemaString;
+    try (Reader reader = OrcFile.createReader(orcFile, OrcFile.readerOptions(HadoopUtils.DEFAULT_CONFIGURATION))) {
+      orcSchemaString = reader.getSchema().toString();
+    } catch (Exception e) {
+      throw new IllegalStateException("Caught exception while extracting ORC schema from file: " + orcFile, e);
     }
-    Schema.Field columnField = new Schema.Field(_sortedColumn, sortedColumnAsKeySchema, "sortedColumn", null);
-    fieldSet.add(columnField);
+    return orcSchemaString;
   }
 
   private void validateConfigsAgainstSchema(Schema schema) {
@@ -314,22 +429,17 @@ public class HadoopSegmentPreprocessingJob extends SegmentPreprocessingJob {
       try {
         PartitionFunctionFactory.PartitionFunctionType.fromString(_partitionFunction);
       } catch (IllegalArgumentException e) {
-        _logger.error("Partition function needs to be one of Modulo, Murmur, ByteArray, HashCode, it is currently {}",
+        LOGGER.error("Partition function needs to be one of Modulo, Murmur, ByteArray, HashCode, it is currently {}",
             _partitionColumn);
         throw new IllegalArgumentException(e);
       }
     }
-    if (_sortedColumn != null) {
-      Preconditions.checkArgument(schema.getField(_sortedColumn) != null,
-          String.format("Sorted column: %s is not found from the schema of input files.", _sortedColumn));
+    if (_sortingColumn != null) {
+      Preconditions.checkArgument(schema.getField(_sortingColumn) != null,
+          String.format("Sorted column: %s is not found from the schema of input files.", _sortingColumn));
     }
   }
 
-  private void addHashCodeField(Set<Schema.Field> fieldSet) {
-    Schema.Field hashCodeField = new Schema.Field("hashcode", Schema.create(Schema.Type.INT), "hashcode", null);
-    fieldSet.add(hashCodeField);
-  }
-
   @Override
   protected org.apache.pinot.spi.data.Schema getSchema()
       throws IOException {
@@ -376,12 +486,10 @@ public class HadoopSegmentPreprocessingJob extends SegmentPreprocessingJob {
         DateTimeFieldSpec dateTimeFieldSpec = _pinotTableSchema.getSpecForTimeColumn(timeColumnName);
         if (dateTimeFieldSpec != null) {
           DateTimeFormatSpec formatSpec = new DateTimeFormatSpec(dateTimeFieldSpec.getFormat());
-          job.getConfiguration()
-              .set(InternalConfigConstants.SEGMENT_TIME_TYPE, formatSpec.getColumnUnit().toString());
+          job.getConfiguration().set(InternalConfigConstants.SEGMENT_TIME_TYPE, formatSpec.getColumnUnit().toString());
           job.getConfiguration()
               .set(InternalConfigConstants.SEGMENT_TIME_FORMAT, formatSpec.getTimeFormat().toString());
-          job.getConfiguration()
-              .set(InternalConfigConstants.SEGMENT_TIME_SDF_PATTERN, formatSpec.getSDFPattern());
+          job.getConfiguration().set(InternalConfigConstants.SEGMENT_TIME_SDF_PATTERN, formatSpec.getSDFPattern());
         }
       }
       job.getConfiguration().set(InternalConfigConstants.SEGMENT_PUSH_FREQUENCY,
@@ -393,41 +501,30 @@ public class HadoopSegmentPreprocessingJob extends SegmentPreprocessingJob {
     }
   }
 
-  private void setHadoopJobConfigs(Job job, int numInputPaths) {
+  private void setHadoopJobConfigs(Job job) {
+    job.setJarByClass(HadoopSegmentPreprocessingJob.class);
+    job.setJobName(getClass().getName());
+    FileOutputFormat.setOutputPath(job, _preprocessedOutputDir);
     job.getConfiguration().set(JobContext.JOB_NAME, this.getClass().getName());
     // Turn this on to always firstly use class paths that user specifies.
     job.getConfiguration().set(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, "true");
     // Turn this off since we don't need an empty file in the output directory
     job.getConfiguration().set(FileOutputCommitter.SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, "false");
 
-    job.setJarByClass(HadoopSegmentPreprocessingJob.class);
-
     String hadoopTokenFileLocation = System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
     if (hadoopTokenFileLocation != null) {
       job.getConfiguration().set(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY, hadoopTokenFileLocation);
     }
-
-    // Mapper configs.
-    job.setMapperClass(SegmentPreprocessingMapper.class);
-    job.setMapOutputKeyClass(AvroKey.class);
-    job.setMapOutputValueClass(AvroValue.class);
-    job.getConfiguration().setInt(JobContext.NUM_MAPS, numInputPaths);
-
-    // Reducer configs.
-    job.setReducerClass(SegmentPreprocessingReducer.class);
-    job.setOutputKeyClass(AvroKey.class);
-    job.setOutputValueClass(NullWritable.class);
   }
 
-  private void setSchemaParams(Job job, Schema avroSchema)
+  /**
+   * Cleans up outputs in preprocessed output directory.
+   */
+  public static void cleanUpPreprocessedOutputs(Path preprocessedOutputDir)
       throws IOException {
-    AvroMultipleOutputs.addNamedOutput(job, "avro", AvroKeyOutputFormat.class, avroSchema);
-    AvroMultipleOutputs.setCountersEnabled(job, true);
-    // Use LazyOutputFormat to avoid creating empty files.
-    LazyOutputFormat.setOutputFormatClass(job, AvroKeyOutputFormat.class);
-
-    // Input and output paths.
-    FileInputFormat.setInputPaths(job, _inputSegmentDir);
-    FileOutputFormat.setOutputPath(job, _preprocessedOutputDir);
+    if (HadoopUtils.DEFAULT_FILE_SYSTEM.exists(preprocessedOutputDir)) {
+      LOGGER.warn("Found output folder {}, deleting", preprocessedOutputDir);
+      HadoopUtils.DEFAULT_FILE_SYSTEM.delete(preprocessedOutputDir, true);
+    }
   }
 }
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/InternalConfigConstants.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/InternalConfigConstants.java
index 3a7938d..c44f0f9 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/InternalConfigConstants.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/InternalConfigConstants.java
@@ -36,7 +36,8 @@ public class InternalConfigConstants {
   public static final String NUM_PARTITIONS_CONFIG = "num.partitions";
   public static final String PARTITION_FUNCTION_CONFIG = "partition.function";
 
-  public static final String SORTED_COLUMN_CONFIG = "sorted.column";
+  public static final String SORTING_COLUMN_CONFIG = "sorting.column";
+  public static final String SORTING_COLUMN_TYPE = "sorting.type";
   public static final String ENABLE_PARTITIONING = "enable.partitioning";
 
   // max records per file in each partition. No effect otherwise.
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/AvroDataPreprocessingMapper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/AvroDataPreprocessingMapper.java
new file mode 100644
index 0000000..6278e8e
--- /dev/null
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/AvroDataPreprocessingMapper.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.hadoop.job.mappers;
+
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapred.AvroValue;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.pinot.hadoop.job.InternalConfigConstants;
+import org.apache.pinot.hadoop.utils.preprocess.DataPreprocessingUtils;
+import org.apache.pinot.plugin.inputformat.avro.AvroRecordExtractor;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class AvroDataPreprocessingMapper extends Mapper<AvroKey<GenericRecord>, NullWritable, WritableComparable, AvroValue<GenericRecord>> {
+  private static final Logger LOGGER = LoggerFactory.getLogger(AvroDataPreprocessingMapper.class);
+
+  private String _sortingColumn = null;
+  private FieldSpec.DataType _sortingColumnType = null;
+  private AvroRecordExtractor _avroRecordExtractor;
+
+  @Override
+  public void setup(Context context) {
+    Configuration configuration = context.getConfiguration();
+    _avroRecordExtractor = new AvroRecordExtractor();
+    String sortingColumnConfig = configuration.get(InternalConfigConstants.SORTING_COLUMN_CONFIG);
+    if (sortingColumnConfig != null) {
+      _sortingColumn = sortingColumnConfig;
+      _sortingColumnType = FieldSpec.DataType.valueOf(configuration.get(InternalConfigConstants.SORTING_COLUMN_TYPE));
+      LOGGER.info("Initialized AvroDataPreprocessingMapper with sortingColumn: {} of type: {}", _sortingColumn,
+          _sortingColumnType);
+    } else {
+      LOGGER.info("Initialized AvroDataPreprocessingMapper without sorting column");
+    }
+  }
+
+  @Override
+  public void map(AvroKey<GenericRecord> key, NullWritable value, Context context)
+      throws IOException, InterruptedException {
+    GenericRecord record = key.datum();
+    if (_sortingColumn != null) {
+      Object object = record.get(_sortingColumn);
+      Preconditions
+          .checkState(object != null, "Failed to find value for sorting column: %s in record: %s", _sortingColumn,
+              record);
+      Object convertedValue = _avroRecordExtractor.convert(object);
+      Preconditions.checkState(convertedValue != null, "Invalid value: %s for sorting column: %s in record: %s", object,
+          _sortingColumn, record);
+      WritableComparable outputKey;
+      try {
+        outputKey = DataPreprocessingUtils.convertToWritableComparable(convertedValue, _sortingColumnType);
+      } catch (Exception e) {
+        throw new IllegalStateException(
+            String.format("Caught exception while processing sorting column: %s in record: %s", _sortingColumn, record),
+            e);
+      }
+      context.write(outputKey, new AvroValue<>(record));
+    } else {
+      context.write(NullWritable.get(), new AvroValue<>(record));
+    }
+  }
+}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/OrcDataPreprocessingMapper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/OrcDataPreprocessingMapper.java
new file mode 100644
index 0000000..d7d0694
--- /dev/null
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/OrcDataPreprocessingMapper.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.hadoop.job.mappers;
+
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.orc.mapred.OrcStruct;
+import org.apache.orc.mapred.OrcValue;
+import org.apache.pinot.hadoop.job.InternalConfigConstants;
+import org.apache.pinot.hadoop.utils.preprocess.DataPreprocessingUtils;
+import org.apache.pinot.hadoop.utils.preprocess.OrcUtils;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class OrcDataPreprocessingMapper extends Mapper<NullWritable, OrcStruct, WritableComparable, OrcValue> {
+  private static final Logger LOGGER = LoggerFactory.getLogger(OrcDataPreprocessingMapper.class);
+
+  private final OrcValue _valueWrapper = new OrcValue();
+  private String _sortingColumn = null;
+  private FieldSpec.DataType _sortingColumnType = null;
+  private int _sortingColumnId = -1;
+
+  @Override
+  public void setup(Context context) {
+    Configuration configuration = context.getConfiguration();
+    String sortingColumnConfig = configuration.get(InternalConfigConstants.SORTING_COLUMN_CONFIG);
+    if (sortingColumnConfig != null) {
+      _sortingColumn = sortingColumnConfig;
+      _sortingColumnType = FieldSpec.DataType.valueOf(configuration.get(InternalConfigConstants.SORTING_COLUMN_TYPE));
+      LOGGER.info("Initialized OrcDataPreprocessingMapper with sortingColumn: {} of type: {}", _sortingColumn,
+          _sortingColumnType);
+    } else {
+      LOGGER.info("Initialized OrcDataPreprocessingMapper without sorting column");
+    }
+  }
+
+  @Override
+  public void map(NullWritable key, OrcStruct value, Context context)
+      throws IOException, InterruptedException {
+    _valueWrapper.value = value;
+    if (_sortingColumn != null) {
+      if (_sortingColumnId == -1) {
+        List<String> fieldNames = value.getSchema().getFieldNames();
+        _sortingColumnId = fieldNames.indexOf(_sortingColumn);
+        Preconditions.checkState(_sortingColumnId != -1, "Failed to find sorting column: %s in the ORC fields: %s",
+            _sortingColumn, fieldNames);
+        LOGGER.info("Field id for sorting column: {} is: {}", _sortingColumn, _sortingColumnId);
+      }
+      WritableComparable sortingColumnValue = value.getFieldValue(_sortingColumnId);
+      WritableComparable outputKey;
+      try {
+        outputKey = DataPreprocessingUtils
+            .convertToWritableComparable(OrcUtils.convert(sortingColumnValue), _sortingColumnType);
+      } catch (Exception e) {
+        throw new IllegalStateException(String
+            .format("Caught exception while processing sorting column: %s, id: %d in ORC struct: %s", _sortingColumn,
+                _sortingColumnId, value), e);
+      }
+      context.write(outputKey, _valueWrapper);
+    } else {
+      context.write(NullWritable.get(), _valueWrapper);
+    }
+  }
+}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentPreprocessingMapper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentPreprocessingMapper.java
index e3f088d..6decee1 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentPreprocessingMapper.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentPreprocessingMapper.java
@@ -20,6 +20,7 @@ package org.apache.pinot.hadoop.job.mappers;
 
 import com.google.common.base.Preconditions;
 import java.io.IOException;
+import java.util.Map;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
@@ -40,6 +41,7 @@ import org.slf4j.LoggerFactory;
 
 public class SegmentPreprocessingMapper extends Mapper<AvroKey<GenericRecord>, NullWritable, AvroKey<GenericRecord>, AvroValue<GenericRecord>> {
   private static final Logger LOGGER = LoggerFactory.getLogger(SegmentPreprocessingMapper.class);
+  private Configuration _jobConf;
   private String _sortedColumn = null;
   private String _timeColumn = null;
   private Schema _outputKeySchema;
@@ -52,43 +54,42 @@ public class SegmentPreprocessingMapper extends Mapper<AvroKey<GenericRecord>, N
 
   @Override
   public void setup(final Context context) {
-    Configuration configuration = context.getConfiguration();
-
-    String tableName = configuration.get(JobConfigConstants.SEGMENT_TABLE_NAME);
-
-    _isAppend = configuration.get(InternalConfigConstants.IS_APPEND).equalsIgnoreCase("true");
+    _jobConf = context.getConfiguration();
+    logConfigurations();
+    String tableName = _jobConf.get(JobConfigConstants.SEGMENT_TABLE_NAME);
 
+    _isAppend = "true".equalsIgnoreCase(_jobConf.get(InternalConfigConstants.IS_APPEND));
     if (_isAppend) {
       // Get time column name
-      _timeColumn = configuration.get(InternalConfigConstants.TIME_COLUMN_CONFIG);
+      _timeColumn = _jobConf.get(InternalConfigConstants.TIME_COLUMN_CONFIG);
 
       // Get sample time column value
-      String timeColumnValue = configuration.get(InternalConfigConstants.TIME_COLUMN_VALUE);
-      String pushFrequency = configuration.get(InternalConfigConstants.SEGMENT_PUSH_FREQUENCY);
+      String timeColumnValue = _jobConf.get(InternalConfigConstants.TIME_COLUMN_VALUE);
+      String pushFrequency = _jobConf.get(InternalConfigConstants.SEGMENT_PUSH_FREQUENCY);
 
-      String timeType = configuration.get(InternalConfigConstants.SEGMENT_TIME_TYPE);
-      String timeFormat = configuration.get(InternalConfigConstants.SEGMENT_TIME_FORMAT);
+      String timeType = _jobConf.get(InternalConfigConstants.SEGMENT_TIME_TYPE);
+      String timeFormat = _jobConf.get(InternalConfigConstants.SEGMENT_TIME_FORMAT);
       DateTimeFormatSpec dateTimeFormatSpec;
       if (timeFormat.equals(DateTimeFieldSpec.TimeFormat.EPOCH.toString())) {
         dateTimeFormatSpec = new DateTimeFormatSpec(1, timeType, timeFormat);
       } else {
         dateTimeFormatSpec = new DateTimeFormatSpec(1, timeType, timeFormat,
-            configuration.get(InternalConfigConstants.SEGMENT_TIME_SDF_PATTERN));
+            _jobConf.get(InternalConfigConstants.SEGMENT_TIME_SDF_PATTERN));
       }
       _normalizedDateSegmentNameGenerator =
           new NormalizedDateSegmentNameGenerator(tableName, null, false, "APPEND", pushFrequency, dateTimeFormatSpec);
       _sampleNormalizedTimeColumnValue = _normalizedDateSegmentNameGenerator.getNormalizedDate(timeColumnValue);
     }
 
-    String sortedColumn = configuration.get(InternalConfigConstants.SORTED_COLUMN_CONFIG);
+    String sortedColumn = _jobConf.get(InternalConfigConstants.SORTING_COLUMN_CONFIG);
     // Logging the configs for the mapper
     LOGGER.info("Sorted Column: " + sortedColumn);
     if (sortedColumn != null) {
       _sortedColumn = sortedColumn;
     }
-    _outputKeySchema = AvroJob.getMapOutputKeySchema(configuration);
-    _outputSchema = AvroJob.getMapOutputValueSchema(configuration);
-    _enablePartitioning = Boolean.parseBoolean(configuration.get(InternalConfigConstants.ENABLE_PARTITIONING, "false"));
+    _outputKeySchema = AvroJob.getMapOutputKeySchema(_jobConf);
+    _outputSchema = AvroJob.getMapOutputValueSchema(_jobConf);
+    _enablePartitioning = Boolean.parseBoolean(_jobConf.get(InternalConfigConstants.ENABLE_PARTITIONING, "false"));
   }
 
   @Override
@@ -130,4 +131,26 @@ public class SegmentPreprocessingMapper extends Mapper<AvroKey<GenericRecord>, N
       throw e;
     }
   }
+
+  protected void logConfigurations() {
+    StringBuilder stringBuilder = new StringBuilder();
+    stringBuilder.append('{');
+    boolean firstEntry = true;
+    for (Map.Entry<String, String> entry : _jobConf) {
+      if (!firstEntry) {
+        stringBuilder.append(", ");
+      } else {
+        firstEntry = false;
+      }
+
+      stringBuilder.append(entry.getKey());
+      stringBuilder.append('=');
+      stringBuilder.append(entry.getValue());
+    }
+    stringBuilder.append('}');
+
+    LOGGER.info("*********************************************************************");
+    LOGGER.info("Job Configurations: {}", stringBuilder.toString());
+    LOGGER.info("*********************************************************************");
+  }
 }
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/AvroDataPreprocessingPartitioner.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/AvroDataPreprocessingPartitioner.java
new file mode 100644
index 0000000..74799c7
--- /dev/null
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/AvroDataPreprocessingPartitioner.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.hadoop.job.partitioners;
+
+import com.google.common.base.Preconditions;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.mapred.AvroValue;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.pinot.hadoop.job.InternalConfigConstants;
+import org.apache.pinot.plugin.inputformat.avro.AvroRecordExtractor;
+import org.apache.pinot.segment.spi.partition.PartitionFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class AvroDataPreprocessingPartitioner extends Partitioner<WritableComparable, AvroValue<GenericRecord>> implements Configurable {
+  private static final Logger LOGGER = LoggerFactory.getLogger(AvroDataPreprocessingPartitioner.class);
+
+  private Configuration _conf;
+  private String _partitionColumn;
+  private PartitionFunction _partitionFunction;
+  private AvroRecordExtractor _avroRecordExtractor;
+
+  @Override
+  public void setConf(Configuration conf) {
+    _conf = conf;
+    _avroRecordExtractor = new AvroRecordExtractor();
+    _partitionColumn = conf.get(InternalConfigConstants.PARTITION_COLUMN_CONFIG);
+    String partitionFunctionName = conf.get(InternalConfigConstants.PARTITION_FUNCTION_CONFIG);
+    int numPartitions = Integer.parseInt(conf.get(InternalConfigConstants.NUM_PARTITIONS_CONFIG));
+    _partitionFunction = PartitionFunctionFactory.getPartitionFunction(partitionFunctionName, numPartitions);
+    LOGGER.info(
+        "Initialized AvroDataPreprocessingPartitioner with partitionColumn: {}, partitionFunction: {}, numPartitions: {}",
+        _partitionColumn, partitionFunctionName, numPartitions);
+  }
+
+  @Override
+  public Configuration getConf() {
+    return _conf;
+  }
+
+  @Override
+  public int getPartition(WritableComparable key, AvroValue<GenericRecord> value, int numPartitions) {
+    GenericRecord record = value.datum();
+    Object object = record.get(_partitionColumn);
+    Preconditions
+        .checkState(object != null, "Failed to find value for partition column: %s in record: %s", _partitionColumn,
+            record);
+    Object convertedValue = _avroRecordExtractor.convert(object);
+    Preconditions.checkState(convertedValue != null, "Invalid value: %s for partition column: %s in record: %s", object,
+        _partitionColumn, record);
+    Preconditions.checkState(convertedValue instanceof Number || convertedValue instanceof String,
+        "Value for partition column: %s must be either a Number or a String, found: %s in record: %s", _partitionColumn,
+        convertedValue.getClass(), record);
+    // NOTE: Always partition with String type value because Broker uses String type value to prune segments
+    return _partitionFunction.getPartition(convertedValue.toString());
+  }
+}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/OrcDataPreprocessingPartitioner.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/OrcDataPreprocessingPartitioner.java
new file mode 100644
index 0000000..bef2cef
--- /dev/null
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/OrcDataPreprocessingPartitioner.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.hadoop.job.partitioners;
+
+import com.google.common.base.Preconditions;
+import java.util.List;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.orc.mapred.OrcStruct;
+import org.apache.orc.mapred.OrcValue;
+import org.apache.pinot.hadoop.job.InternalConfigConstants;
+import org.apache.pinot.hadoop.utils.preprocess.OrcUtils;
+import org.apache.pinot.segment.spi.partition.PartitionFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class OrcDataPreprocessingPartitioner extends Partitioner<WritableComparable, OrcValue> implements Configurable {
+  private static final Logger LOGGER = LoggerFactory.getLogger(OrcDataPreprocessingPartitioner.class);
+
+  private Configuration _conf;
+  private String _partitionColumn;
+  private PartitionFunction _partitionFunction;
+  private int _partitionColumnId = -1;
+
+  @Override
+  public void setConf(Configuration conf) {
+    _conf = conf;
+    _partitionColumn = conf.get(InternalConfigConstants.PARTITION_COLUMN_CONFIG);
+    String partitionFunctionName = conf.get(InternalConfigConstants.PARTITION_FUNCTION_CONFIG);
+    int numPartitions = Integer.parseInt(conf.get(InternalConfigConstants.NUM_PARTITIONS_CONFIG));
+    _partitionFunction = PartitionFunctionFactory.getPartitionFunction(partitionFunctionName, numPartitions);
+    LOGGER.info(
+        "Initialized OrcDataPreprocessingPartitioner with partitionColumn: {}, partitionFunction: {}, numPartitions: {}",
+        _partitionColumn, partitionFunctionName, numPartitions);
+  }
+
+  @Override
+  public Configuration getConf() {
+    return _conf;
+  }
+
+  @Override
+  public int getPartition(WritableComparable key, OrcValue value, int numPartitions) {
+    OrcStruct orcStruct = (OrcStruct) value.value;
+    if (_partitionColumnId == -1) {
+      List<String> fieldNames = orcStruct.getSchema().getFieldNames();
+      _partitionColumnId = fieldNames.indexOf(_partitionColumn);
+      Preconditions.checkState(_partitionColumnId != -1, "Failed to find partition column: %s in the ORC fields: %s",
+          _partitionColumn, fieldNames);
+      LOGGER.info("Field id for partition column: {} is: {}", _partitionColumn, _partitionColumnId);
+    }
+    WritableComparable partitionColumnValue = orcStruct.getFieldValue(_partitionColumnId);
+    Object convertedValue;
+    try {
+      convertedValue = OrcUtils.convert(partitionColumnValue);
+    } catch (Exception e) {
+      throw new IllegalStateException(String
+          .format("Caught exception while processing partition column: %s, id: %d in ORC struct: %s", _partitionColumn,
+              _partitionColumnId, orcStruct), e);
+    }
+    // NOTE: Always partition with String type value because Broker uses String type value to prune segments
+    return _partitionFunction.getPartition(convertedValue.toString());
+  }
+}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/SegmentPreprocessingReducer.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/AvroDataPreprocessingReducer.java
similarity index 63%
copy from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/SegmentPreprocessingReducer.java
copy to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/AvroDataPreprocessingReducer.java
index 9c07e6f..7e37ec5 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/SegmentPreprocessingReducer.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/AvroDataPreprocessingReducer.java
@@ -19,7 +19,6 @@
 package org.apache.pinot.hadoop.job.reducers;
 
 import java.io.IOException;
-import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.mapred.AvroKey;
 import org.apache.avro.mapred.AvroValue;
@@ -33,33 +32,43 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
-public class SegmentPreprocessingReducer<T> extends Reducer<T, AvroValue<GenericRecord>, AvroKey<GenericRecord>, NullWritable> {
-  private static final Logger LOGGER = LoggerFactory.getLogger(SegmentPreprocessingReducer.class);
+public class AvroDataPreprocessingReducer<T> extends Reducer<T, AvroValue<GenericRecord>, AvroKey<GenericRecord>, NullWritable> {
+  private static final Logger LOGGER = LoggerFactory.getLogger(AvroDataPreprocessingReducer.class);
 
   private AvroMultipleOutputs _multipleOutputs;
-  private AtomicInteger _counter;
-  private int _maxNumberOfRecords;
+  private long _numRecords;
+  private int _maxNumRecordsPerFile;
   private String _filePrefix;
 
   @Override
   public void setup(Context context) {
-    LOGGER.info("Using multiple outputs strategy.");
     Configuration configuration = context.getConfiguration();
-    _multipleOutputs = new AvroMultipleOutputs(context);
-    _counter = new AtomicInteger();
     // If it's 0, the output file won't be split into multiple files.
     // If not, output file will be split when the number of records reaches this number.
-    _maxNumberOfRecords = configuration.getInt(InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE, 0);
-    LOGGER.info("Maximum number of records per file: {}", _maxNumberOfRecords);
-    _filePrefix = RandomStringUtils.randomAlphanumeric(4);
+    _maxNumRecordsPerFile = configuration.getInt(InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE, 0);
+    if (_maxNumRecordsPerFile > 0) {
+      LOGGER.info("Using multiple outputs strategy.");
+      _multipleOutputs = new AvroMultipleOutputs(context);
+      _numRecords = 0L;
+      _filePrefix = RandomStringUtils.randomAlphanumeric(4);
+      LOGGER.info("Initialized AvroDataPreprocessingReducer with maxNumRecordsPerFile: {}", _maxNumRecordsPerFile);
+    } else {
+      LOGGER.info("Initialized AvroDataPreprocessingReducer without limit on maxNumRecordsPerFile");
+    }
   }
 
   @Override
   public void reduce(final T inputRecord, final Iterable<AvroValue<GenericRecord>> values, final Context context)
       throws IOException, InterruptedException {
-    for (final AvroValue<GenericRecord> value : values) {
-      String fileName = generateFileName();
-      _multipleOutputs.write(new AvroKey<>(value.datum()), NullWritable.get(), fileName);
+    if (_maxNumRecordsPerFile > 0) {
+      for (final AvroValue<GenericRecord> value : values) {
+        String fileName = _filePrefix + (_numRecords++ / _maxNumRecordsPerFile);
+        _multipleOutputs.write(new AvroKey<>(value.datum()), NullWritable.get(), fileName);
+      }
+    } else {
+      for (final AvroValue<GenericRecord> value : values) {
+        context.write(new AvroKey<>(value.datum()), NullWritable.get());
+      }
     }
   }
 
@@ -73,12 +82,4 @@ public class SegmentPreprocessingReducer<T> extends Reducer<T, AvroValue<Generic
     }
     LOGGER.info("Finished cleaning up reducer.");
   }
-
-  private String generateFileName() {
-    if (_maxNumberOfRecords == 0) {
-      return _filePrefix;
-    } else {
-      return _filePrefix + (_counter.getAndIncrement() / _maxNumberOfRecords);
-    }
-  }
 }
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/SegmentPreprocessingReducer.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/OrcDataPreprocessingReducer.java
similarity index 54%
rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/SegmentPreprocessingReducer.java
rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/OrcDataPreprocessingReducer.java
index 9c07e6f..70bf0dd 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/SegmentPreprocessingReducer.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/OrcDataPreprocessingReducer.java
@@ -19,47 +19,56 @@
 package org.apache.pinot.hadoop.job.reducers;
 
 import java.io.IOException;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.mapred.AvroKey;
-import org.apache.avro.mapred.AvroValue;
-import org.apache.avro.mapreduce.AvroMultipleOutputs;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
+import org.apache.orc.mapred.OrcStruct;
+import org.apache.orc.mapred.OrcValue;
 import org.apache.pinot.hadoop.job.InternalConfigConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
-public class SegmentPreprocessingReducer<T> extends Reducer<T, AvroValue<GenericRecord>, AvroKey<GenericRecord>, NullWritable> {
-  private static final Logger LOGGER = LoggerFactory.getLogger(SegmentPreprocessingReducer.class);
+public class OrcDataPreprocessingReducer extends Reducer<WritableComparable, OrcValue, NullWritable, OrcStruct> {
+  private static final Logger LOGGER = LoggerFactory.getLogger(OrcDataPreprocessingReducer.class);
 
-  private AvroMultipleOutputs _multipleOutputs;
-  private AtomicInteger _counter;
-  private int _maxNumberOfRecords;
+  private int _maxNumRecordsPerFile;
+  private MultipleOutputs<NullWritable, OrcStruct> _multipleOutputs;
+  private long _numRecords;
   private String _filePrefix;
 
   @Override
   public void setup(Context context) {
-    LOGGER.info("Using multiple outputs strategy.");
     Configuration configuration = context.getConfiguration();
-    _multipleOutputs = new AvroMultipleOutputs(context);
-    _counter = new AtomicInteger();
     // If it's 0, the output file won't be split into multiple files.
     // If not, output file will be split when the number of records reaches this number.
-    _maxNumberOfRecords = configuration.getInt(InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE, 0);
-    LOGGER.info("Maximum number of records per file: {}", _maxNumberOfRecords);
-    _filePrefix = RandomStringUtils.randomAlphanumeric(4);
+    _maxNumRecordsPerFile = configuration.getInt(InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE, 0);
+    if (_maxNumRecordsPerFile > 0) {
+      LOGGER.info("Using multiple outputs strategy.");
+      _multipleOutputs = new MultipleOutputs<>(context);
+      _numRecords = 0L;
+      _filePrefix = RandomStringUtils.randomAlphanumeric(4);
+      LOGGER.info("Initialized OrcDataPreprocessingReducer with maxNumRecordsPerFile: {}", _maxNumRecordsPerFile);
+    } else {
+      LOGGER.info("Initialized OrcDataPreprocessingReducer without limit on maxNumRecordsPerFile");
+    }
   }
 
   @Override
-  public void reduce(final T inputRecord, final Iterable<AvroValue<GenericRecord>> values, final Context context)
+  public void reduce(WritableComparable key, Iterable<OrcValue> values, Context context)
       throws IOException, InterruptedException {
-    for (final AvroValue<GenericRecord> value : values) {
-      String fileName = generateFileName();
-      _multipleOutputs.write(new AvroKey<>(value.datum()), NullWritable.get(), fileName);
+    if (_maxNumRecordsPerFile > 0) {
+      for (final OrcValue value : values) {
+        String fileName = _filePrefix + (_numRecords++ / _maxNumRecordsPerFile);
+        _multipleOutputs.write(NullWritable.get(), (OrcStruct) value.value, fileName);
+      }
+    } else {
+      for (final OrcValue value : values) {
+        context.write(NullWritable.get(), (OrcStruct) value.value);
+      }
     }
   }
 
@@ -73,12 +82,4 @@ public class SegmentPreprocessingReducer<T> extends Reducer<T, AvroValue<Generic
     }
     LOGGER.info("Finished cleaning up reducer.");
   }
-
-  private String generateFileName() {
-    if (_maxNumberOfRecords == 0) {
-      return _filePrefix;
-    } else {
-      return _filePrefix + (_counter.getAndIncrement() / _maxNumberOfRecords);
-    }
-  }
 }
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/DataFileUtils.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/DataFileUtils.java
new file mode 100644
index 0000000..58e1c1d
--- /dev/null
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/DataFileUtils.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.hadoop.utils.preprocess;
+
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+
+
+public class DataFileUtils {
+  private DataFileUtils() {
+  }
+
+  public static final String AVRO_FILE_EXTENSION = ".avro";
+  public static final String ORC_FILE_EXTENSION = ".orc";
+
+  /**
+   * Returns the data files under the input directory with the given file extension.
+   */
+  public static List<Path> getDataFiles(Path inputDir, String dataFileExtension)
+      throws IOException {
+    FileStatus fileStatus = HadoopUtils.DEFAULT_FILE_SYSTEM.getFileStatus(inputDir);
+    Preconditions.checkState(fileStatus.isDirectory(), "Path: %s is not a directory", inputDir);
+    List<Path> dataFiles = new ArrayList<>();
+    getDataFilesHelper(HadoopUtils.DEFAULT_FILE_SYSTEM.listStatus(inputDir), dataFileExtension, dataFiles);
+    return dataFiles;
+  }
+
+  private static void getDataFilesHelper(FileStatus[] fileStatuses, String dataFileExtension, List<Path> dataFiles)
+      throws IOException {
+    for (FileStatus fileStatus : fileStatuses) {
+      Path path = fileStatus.getPath();
+      if (fileStatus.isDirectory()) {
+        getDataFilesHelper(HadoopUtils.DEFAULT_FILE_SYSTEM.listStatus(path), dataFileExtension, dataFiles);
+      } else {
+        Preconditions.checkState(fileStatus.isFile(), "Path: %s is neither a directory nor a file", path);
+        if (path.getName().endsWith(dataFileExtension)) {
+          dataFiles.add(path);
+        }
+      }
+    }
+  }
+}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/DataPreprocessingUtils.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/DataPreprocessingUtils.java
new file mode 100644
index 0000000..234e2d3
--- /dev/null
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/DataPreprocessingUtils.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.hadoop.utils.preprocess;
+
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.pinot.spi.data.FieldSpec;
+
+
+public class DataPreprocessingUtils {
+  private DataPreprocessingUtils() {
+  }
+
+  /**
+   * Converts a value into {@link WritableComparable} based on the given data type.
+   * <p>NOTE: The passed in value must be either a Number or a String.
+   */
+  public static WritableComparable convertToWritableComparable(Object value, FieldSpec.DataType dataType) {
+    if (value instanceof Number) {
+      Number numberValue = (Number) value;
+      switch (dataType) {
+        case INT:
+          return new IntWritable(numberValue.intValue());
+        case LONG:
+          return new LongWritable(numberValue.longValue());
+        case FLOAT:
+          return new FloatWritable(numberValue.floatValue());
+        case DOUBLE:
+          return new DoubleWritable(numberValue.doubleValue());
+        case STRING:
+          return new Text(numberValue.toString());
+        default:
+          throw new IllegalArgumentException("Unsupported data type: " + dataType);
+      }
+    } else if (value instanceof String) {
+      String stringValue = (String) value;
+      switch (dataType) {
+        case INT:
+          return new IntWritable(Integer.parseInt(stringValue));
+        case LONG:
+          return new LongWritable(Long.parseLong(stringValue));
+        case FLOAT:
+          return new FloatWritable(Float.parseFloat(stringValue));
+        case DOUBLE:
+          return new DoubleWritable(Double.parseDouble(stringValue));
+        case STRING:
+          return new Text(stringValue);
+        default:
+          throw new IllegalArgumentException("Unsupported data type: " + dataType);
+      }
+    } else {
+      throw new IllegalArgumentException(
+          String.format("Value: %s must be either a Number or a String, found: %s", value, value.getClass()));
+    }
+  }
+}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/HadoopUtils.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/HadoopUtils.java
new file mode 100644
index 0000000..0596259
--- /dev/null
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/HadoopUtils.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.hadoop.utils.preprocess;
+
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+
+
+public class HadoopUtils {
+  private HadoopUtils() {
+  }
+
+  public static final Configuration DEFAULT_CONFIGURATION;
+  public static final FileSystem DEFAULT_FILE_SYSTEM;
+
+  static {
+    DEFAULT_CONFIGURATION = new Configuration();
+    try {
+      DEFAULT_FILE_SYSTEM = FileSystem.get(DEFAULT_CONFIGURATION);
+    } catch (IOException e) {
+      throw new IllegalStateException("Failed to get the default file system", e);
+    }
+  }
+}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/OrcUtils.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/OrcUtils.java
new file mode 100644
index 0000000..dcfc3b5
--- /dev/null
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/OrcUtils.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.hadoop.utils.preprocess;
+
+import org.apache.hadoop.hive.serde2.io.DateWritable;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.ShortWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.orc.mapred.OrcTimestamp;
+
+
+public class OrcUtils {
+  private OrcUtils() {
+  }
+
+  /**
+   * Converts the ORC value into Number or String.
+   * <p>The following ORC types are supported:
+   * <ul>
+   *   <li>IntWritable -> Integer</li>
+   *   <li>LongWritable -> Long</li>
+   *   <li>FloatWritable -> Float</li>
+   *   <li>DoubleWritable -> Double</li>
+   *   <li>Text -> String</li>
+   *   <li>BooleanWritable -> String</li>
+   *   <li>ByteWritable -> Byte</li>
+   *   <li>ShortWritable -> Short</li>
+   *   <li>DateWritable -> Long</li>
+   *   <li>OrcTimestamp -> Long</li>
+   * </ul>
+   */
+  public static Object convert(WritableComparable orcValue) {
+    if (orcValue instanceof IntWritable) {
+      return ((IntWritable) orcValue).get();
+    }
+    if (orcValue instanceof LongWritable) {
+      return ((LongWritable) orcValue).get();
+    }
+    if (orcValue instanceof FloatWritable) {
+      return ((FloatWritable) orcValue).get();
+    }
+    if (orcValue instanceof DoubleWritable) {
+      return ((DoubleWritable) orcValue).get();
+    }
+    if (orcValue instanceof Text) {
+      return orcValue.toString();
+    }
+    if (orcValue instanceof BooleanWritable) {
+      return Boolean.toString(((BooleanWritable) orcValue).get());
+    }
+    if (orcValue instanceof ByteWritable) {
+      return ((ByteWritable) orcValue).get();
+    }
+    if (orcValue instanceof ShortWritable) {
+      return ((ShortWritable) orcValue).get();
+    }
+    if (orcValue instanceof DateWritable) {
+      return ((DateWritable) orcValue).get().getTime();
+    }
+    if (orcValue instanceof OrcTimestamp) {
+      return ((OrcTimestamp) orcValue).getTime();
+    }
+    throw new IllegalArgumentException(
+        String.format("Illegal ORC value: %s, class: %s", orcValue, orcValue.getClass()));
+  }
+}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/RawDataFormat.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/RawDataFormat.java
new file mode 100644
index 0000000..0ae3ca3
--- /dev/null
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/RawDataFormat.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.hadoop.utils.preprocess;
+
+/**
+ * The current supported data formats in Pinot Preprocessing jobs
+ */
+public enum RawDataFormat {
+  AVRO, ORC
+}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/TextComparator.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/TextComparator.java
new file mode 100644
index 0000000..65f1222
--- /dev/null
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/TextComparator.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.hadoop.utils.preprocess;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.pinot.spi.utils.StringUtils;
+
+
+/**
+ * Override the Text comparison logic to compare with the decoded String instead of the byte array.
+ */
+public class TextComparator extends WritableComparator {
+  public TextComparator() {
+    super(Text.class);
+  }
+
+  @Override
+  public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+    int n1 = WritableUtils.decodeVIntSize(b1[s1]);
+    int n2 = WritableUtils.decodeVIntSize(b2[s2]);
+    return StringUtils.decodeUtf8(b1, s1 + n1, l1 - n1).compareTo(StringUtils.decodeUtf8(b2, s2 + n2, l2 - n2));
+  }
+}
diff --git a/pom.xml b/pom.xml
index aaf122f..7337462 100644
--- a/pom.xml
+++ b/pom.xml
@@ -944,6 +944,11 @@
         <classifier>hadoop2</classifier>
       </dependency>
       <dependency>
+        <groupId>org.apache.orc</groupId>
+        <artifactId>orc-mapreduce</artifactId>
+        <version>1.5.9</version>
+      </dependency>
+      <dependency>
         <groupId>org.codehaus.jackson</groupId>
         <artifactId>jackson-core-asl</artifactId>
         <version>1.9.13</version>

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org