You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2021/06/15 21:22:51 UTC

[incubator-pinot] branch support-orc-format-preprocessing updated (07d1217 -> 83609aa)

This is an automated email from the ASF dual-hosted git repository.

jlli pushed a change to branch support-orc-format-preprocessing
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git.


 discard 07d1217  Support data preprocessing for AVRO and ORC formats
     new 83609aa  Support data preprocessing for AVRO and ORC formats

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (07d1217)
            \
             N -- N -- N   refs/heads/support-orc-format-preprocessing (83609aa)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../hadoop/job/HadoopSegmentPreprocessingJob.java  | 22 ++++++++++++----------
 .../pinot/hadoop/job/InternalConfigConstants.java  |  7 +++++--
 .../job/reducers/AvroDataPreprocessingReducer.java |  2 +-
 .../job/reducers/OrcDataPreprocessingReducer.java  |  2 +-
 4 files changed, 19 insertions(+), 14 deletions(-)

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 01/01: Support data preprocessing for AVRO and ORC formats

Posted by jl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch support-orc-format-preprocessing
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 83609aaccb1929b5aaea15b07b500fed501e6f62
Author: Jack Li(Analytics Engineering) <jl...@jlli-mn1.linkedin.biz>
AuthorDate: Tue Jun 15 10:02:47 2021 -0700

    Support data preprocessing for AVRO and ORC formats
---
 .../v0_deprecated/pinot-hadoop/pom.xml             |   4 +
 .../hadoop/job/HadoopSegmentPreprocessingJob.java  | 387 +++++++++++++--------
 .../pinot/hadoop/job/InternalConfigConstants.java  |  10 +-
 .../job/mappers/AvroDataPreprocessingMapper.java   |  85 +++++
 .../job/mappers/OrcDataPreprocessingMapper.java    |  87 +++++
 .../job/mappers/SegmentPreprocessingMapper.java    |  53 ++-
 .../AvroDataPreprocessingPartitioner.java          |  77 ++++
 .../OrcDataPreprocessingPartitioner.java           |  83 +++++
 ...ucer.java => AvroDataPreprocessingReducer.java} |  45 +--
 ...ducer.java => OrcDataPreprocessingReducer.java} |  57 +--
 .../hadoop/utils/preprocess/DataFileUtils.java     |  62 ++++
 .../utils/preprocess/DataPreprocessingUtils.java   |  76 ++++
 .../pinot/hadoop/utils/preprocess/HadoopUtils.java |  41 +++
 .../pinot/hadoop/utils/preprocess/OrcUtils.java    |  88 +++++
 .../hadoop/utils/preprocess/RawDataFormat.java     |  26 ++
 .../hadoop/utils/preprocess/TextComparator.java    |  41 +++
 pom.xml                                            |   5 +
 17 files changed, 1015 insertions(+), 212 deletions(-)

diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/pom.xml b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/pom.xml
index a0ea8ec..b8218e2 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/pom.xml
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/pom.xml
@@ -193,6 +193,10 @@
       <classifier>hadoop2</classifier>
     </dependency>
     <dependency>
+      <groupId>org.apache.orc</groupId>
+      <artifactId>orc-mapreduce</artifactId>
+    </dependency>
+    <dependency>
       <groupId>javax.xml.bind</groupId>
       <artifactId>jaxb-api</artifactId>
     </dependency>
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/HadoopSegmentPreprocessingJob.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/HadoopSegmentPreprocessingJob.java
index b4e87fc..2bf603a 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/HadoopSegmentPreprocessingJob.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/HadoopSegmentPreprocessingJob.java
@@ -21,7 +21,6 @@ package org.apache.pinot.hadoop.job;
 import com.google.common.base.Preconditions;
 import java.io.IOException;
 import java.io.InputStream;
-import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -31,15 +30,20 @@ import org.apache.avro.Schema;
 import org.apache.avro.file.DataFileStream;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.mapred.AvroKey;
-import org.apache.avro.mapred.AvroValue;
 import org.apache.avro.mapreduce.AvroJob;
+import org.apache.avro.mapreduce.AvroKeyInputFormat;
 import org.apache.avro.mapreduce.AvroKeyOutputFormat;
 import org.apache.avro.mapreduce.AvroMultipleOutputs;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobContext;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.MRJobConfig;
@@ -48,11 +52,25 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.pinot.hadoop.io.CombineAvroKeyInputFormat;
-import org.apache.pinot.hadoop.job.mappers.SegmentPreprocessingMapper;
+import org.apache.orc.OrcConf;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.mapred.OrcStruct;
+import org.apache.orc.mapred.OrcValue;
+import org.apache.orc.mapreduce.OrcInputFormat;
+import org.apache.orc.mapreduce.OrcOutputFormat;
+import org.apache.pinot.hadoop.job.mappers.AvroDataPreprocessingMapper;
+import org.apache.pinot.hadoop.job.mappers.OrcDataPreprocessingMapper;
+import org.apache.pinot.hadoop.job.partitioners.AvroDataPreprocessingPartitioner;
 import org.apache.pinot.hadoop.job.partitioners.GenericPartitioner;
-import org.apache.pinot.hadoop.job.reducers.SegmentPreprocessingReducer;
+import org.apache.pinot.hadoop.job.partitioners.OrcDataPreprocessingPartitioner;
+import org.apache.pinot.hadoop.job.reducers.AvroDataPreprocessingReducer;
+import org.apache.pinot.hadoop.job.reducers.OrcDataPreprocessingReducer;
 import org.apache.pinot.hadoop.utils.PinotHadoopJobPreparationHelper;
+import org.apache.pinot.hadoop.utils.preprocess.DataFileUtils;
+import org.apache.pinot.hadoop.utils.preprocess.HadoopUtils;
+import org.apache.pinot.hadoop.utils.preprocess.RawDataFormat;
+import org.apache.pinot.hadoop.utils.preprocess.TextComparator;
 import org.apache.pinot.ingestion.common.ControllerRestApi;
 import org.apache.pinot.ingestion.common.JobConfigConstants;
 import org.apache.pinot.ingestion.jobs.SegmentPreprocessingJob;
@@ -65,28 +83,36 @@ import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableCustomConfig;
 import org.apache.pinot.spi.data.DateTimeFieldSpec;
 import org.apache.pinot.spi.data.DateTimeFormatSpec;
+import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.utils.IngestionConfigUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
 /**
- * A Hadoop job which provides partitioning, sorting, and resizing against the input files, which is raw data in Avro format.
+ * A Hadoop job which provides partitioning, sorting, and resizing against the input files, which is raw data in either Avro or Orc format.
  * Thus, the output files are partitioned, sorted, resized after this job.
  * In order to run this job, the following configs need to be specified in job properties:
  * * enable.preprocessing: false by default. Enables preprocessing job.
  */
 public class HadoopSegmentPreprocessingJob extends SegmentPreprocessingJob {
-  private static final Logger _logger = LoggerFactory.getLogger(HadoopSegmentPreprocessingJob.class);
-  protected FileSystem _fileSystem;
+  private static final Logger LOGGER = LoggerFactory.getLogger(HadoopSegmentPreprocessingJob.class);
+
   private String _partitionColumn;
   private int _numPartitions;
   private String _partitionFunction;
-  private String _sortedColumn;
+
+  private String _sortingColumn;
+  private FieldSpec.DataType _sortingColumnType;
+
   private int _numOutputFiles;
+  private int _maxNumRecordsPerFile;
+
   private TableConfig _tableConfig;
   private org.apache.pinot.spi.data.Schema _pinotTableSchema;
 
+  private Set<String> _preprocessingOperations;
+
   public HadoopSegmentPreprocessingJob(final Properties properties) {
     super(properties);
   }
@@ -94,56 +120,102 @@ public class HadoopSegmentPreprocessingJob extends SegmentPreprocessingJob {
   public void run()
       throws Exception {
     if (!_enablePreprocessing) {
-      _logger.info("Pre-processing job is disabled.");
+      LOGGER.info("Pre-processing job is disabled.");
       return;
     } else {
-      _logger.info("Starting {}", getClass().getSimpleName());
+      LOGGER.info("Starting {}", getClass().getSimpleName());
     }
 
-    _fileSystem = FileSystem.get(_inputSegmentDir.toUri(), getConf());
-    final List<Path> inputDataPaths = getDataFilePaths(_inputSegmentDir);
-    Preconditions.checkState(inputDataPaths.size() != 0, "No files in the input directory.");
-
-    if (_fileSystem.exists(_preprocessedOutputDir)) {
-      _logger.warn("Found output folder {}, deleting", _preprocessedOutputDir);
-      _fileSystem.delete(_preprocessedOutputDir, true);
-    }
     setTableConfigAndSchema();
-
-    _logger.info("Initializing a pre-processing job");
-    Job job = Job.getInstance(getConf());
-
-    Path sampleAvroPath = inputDataPaths.get(0);
-    int numInputPaths = inputDataPaths.size();
-
-    setValidationConfigs(job, sampleAvroPath);
-    setHadoopJobConfigs(job, numInputPaths);
-
-    // Avro Schema configs.
-    Schema avroSchema = getSchema(sampleAvroPath);
-    _logger.info("Schema is: {}", avroSchema.toString(true));
-    setSchemaParams(job, avroSchema);
-
-    // Set up mapper output key
-    Set<Schema.Field> fieldSet = new HashSet<>();
-
+    fetchPreProcessingOperations();
     fetchPartitioningConfig();
     fetchSortingConfig();
     fetchResizingConfig();
-    validateConfigsAgainstSchema(avroSchema);
 
-    // Partition configs.
+    // Cleans up preprocessed output dir if exists
+    cleanUpPreprocessedOutputs(_preprocessedOutputDir);
+
+    final List<Path> avroFiles = DataFileUtils.getDataFiles(_inputSegmentDir, DataFileUtils.AVRO_FILE_EXTENSION);
+    final List<Path> orcFiles = DataFileUtils.getDataFiles(_inputSegmentDir, DataFileUtils.ORC_FILE_EXTENSION);
+
+    int numAvroFiles = avroFiles.size();
+    int numOrcFiles = orcFiles.size();
+    Preconditions.checkState(numAvroFiles == 0 || numOrcFiles == 0,
+        "Cannot preprocess mixed AVRO files: %s and ORC files: %s in directories: %s", avroFiles, orcFiles,
+        _inputSegmentDir);
+    Preconditions
+        .checkState(numAvroFiles > 0 || numOrcFiles > 0, "Failed to find any AVRO or ORC file in directories: %s",
+            _inputSegmentDir);
+    List<Path> inputDataPaths;
+    RawDataFormat rawDataFormat;
+    if (numAvroFiles > 0) {
+      rawDataFormat = RawDataFormat.AVRO;
+      inputDataPaths = avroFiles;
+      LOGGER.info("Find AVRO files: {} in directories: {}", avroFiles, _inputSegmentDir);
+    } else {
+      rawDataFormat = RawDataFormat.ORC;
+      inputDataPaths = orcFiles;
+      LOGGER.info("Find ORC files: {} in directories: {}", orcFiles, _inputSegmentDir);
+    }
+
+    LOGGER.info("Initializing a pre-processing job");
+    Job job = Job.getInstance(HadoopUtils.DEFAULT_CONFIGURATION);
+    Configuration jobConf = job.getConfiguration();
+    // Input and output paths.
+    Path sampleRawDataPath = inputDataPaths.get(0);
+    int numInputPaths = inputDataPaths.size();
+    setValidationConfigs(job, sampleRawDataPath);
+    for (Path inputFile : inputDataPaths) {
+      FileInputFormat.addInputPath(job, inputFile);
+    }
+    setHadoopJobConfigs(job);
+
+    // Sorting column
+    if (_sortingColumn != null) {
+      LOGGER.info("Adding sorting column: {} to job config", _sortingColumn);
+      jobConf.set(InternalConfigConstants.SORTING_COLUMN_CONFIG, _sortingColumn);
+      jobConf.set(InternalConfigConstants.SORTING_COLUMN_TYPE, _sortingColumnType.name());
+
+      switch (_sortingColumnType) {
+        case INT:
+          job.setMapOutputKeyClass(IntWritable.class);
+          break;
+        case LONG:
+          job.setMapOutputKeyClass(LongWritable.class);
+          break;
+        case FLOAT:
+          job.setMapOutputKeyClass(FloatWritable.class);
+          break;
+        case DOUBLE:
+          job.setMapOutputKeyClass(DoubleWritable.class);
+          break;
+        case STRING:
+          job.setMapOutputKeyClass(Text.class);
+          job.setSortComparatorClass(TextComparator.class);
+          break;
+        default:
+          throw new IllegalStateException();
+      }
+    } else {
+      job.setMapOutputKeyClass(NullWritable.class);
+    }
+
+    // Partition column
     int numReduceTasks = 0;
     if (_partitionColumn != null) {
       numReduceTasks = _numPartitions;
-      job.getConfiguration().set(InternalConfigConstants.ENABLE_PARTITIONING, "true");
+      jobConf.set(InternalConfigConstants.ENABLE_PARTITIONING, "true");
       job.setPartitionerClass(GenericPartitioner.class);
-      job.getConfiguration().set(InternalConfigConstants.PARTITION_COLUMN_CONFIG, _partitionColumn);
+      jobConf.set(InternalConfigConstants.PARTITION_COLUMN_CONFIG, _partitionColumn);
       if (_partitionFunction != null) {
-        job.getConfiguration().set(InternalConfigConstants.PARTITION_FUNCTION_CONFIG, _partitionFunction);
+        jobConf.set(InternalConfigConstants.PARTITION_FUNCTION_CONFIG, _partitionFunction);
+      }
+      jobConf.setInt(InternalConfigConstants.NUM_PARTITIONS_CONFIG, numReduceTasks);
+      if (rawDataFormat == RawDataFormat.AVRO) {
+        job.setPartitionerClass(AvroDataPreprocessingPartitioner.class);
+      } else {
+        job.setPartitionerClass(OrcDataPreprocessingPartitioner.class);
       }
-      job.getConfiguration().set(InternalConfigConstants.NUM_PARTITIONS_CONFIG, Integer.toString(numReduceTasks));
-      setMaxNumRecordsConfigIfSpecified(job);
     } else {
       if (_numOutputFiles > 0) {
         numReduceTasks = _numOutputFiles;
@@ -151,45 +223,65 @@ public class HadoopSegmentPreprocessingJob extends SegmentPreprocessingJob {
         // default number of input paths
         numReduceTasks = inputDataPaths.size();
       }
-      // Partitioning is disabled. Adding hashcode as one of the fields to mapper output key.
-      // so that all the rows can be spread evenly.
-      addHashCodeField(fieldSet);
     }
-    job.setInputFormatClass(CombineAvroKeyInputFormat.class);
-
-    _logger.info("Number of reduce tasks for pre-processing job: {}", numReduceTasks);
+    // Maximum number of records per output file
+    jobConf
+        .set(InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE, Integer.toString(_maxNumRecordsPerFile));
+    // Number of reducers
+    LOGGER.info("Number of reduce tasks for pre-processing job: {}", numReduceTasks);
     job.setNumReduceTasks(numReduceTasks);
 
-    // Sort config.
-    if (_sortedColumn != null) {
-      _logger.info("Adding sorted column: {} to job config", _sortedColumn);
-      job.getConfiguration().set(InternalConfigConstants.SORTED_COLUMN_CONFIG, _sortedColumn);
-
-      addSortedColumnField(avroSchema, fieldSet);
+    // Mapper and reducer configs.
+    if (rawDataFormat == RawDataFormat.AVRO) {
+      Schema avroSchema = getAvroSchema(sampleRawDataPath);
+      LOGGER.info("Avro schema is: {}", avroSchema.toString(true));
+
+      // Set up mapper output key
+      validateConfigsAgainstSchema(avroSchema);
+
+      job.setInputFormatClass(AvroKeyInputFormat.class);
+      job.setMapperClass(AvroDataPreprocessingMapper.class);
+      jobConf.setInt(JobContext.NUM_MAPS, numInputPaths);
+
+      job.setReducerClass(AvroDataPreprocessingReducer.class);
+      AvroMultipleOutputs.addNamedOutput(job, "avro", AvroKeyOutputFormat.class, avroSchema);
+      AvroMultipleOutputs.setCountersEnabled(job, true);
+      // Use LazyOutputFormat to avoid creating empty files.
+      LazyOutputFormat.setOutputFormatClass(job, AvroKeyOutputFormat.class);
+      job.setOutputKeyClass(AvroKey.class);
+      job.setOutputValueClass(NullWritable.class);
+
+      AvroJob.setInputKeySchema(job, avroSchema);
+      AvroJob.setMapOutputValueSchema(job, avroSchema);
+      AvroJob.setOutputKeySchema(job, avroSchema);
     } else {
-      // If sorting is disabled, hashcode will be the only factor for sort/group comparator.
-      addHashCodeField(fieldSet);
+      String orcSchemaString = getOrcSchemaString(sampleRawDataPath);
+      LOGGER.info("Orc schema is: {}", orcSchemaString);
+
+      job.setInputFormatClass(OrcInputFormat.class);
+      job.setMapperClass(OrcDataPreprocessingMapper.class);
+      job.setMapOutputValueClass(OrcValue.class);
+      jobConf.setInt(JobContext.NUM_MAPS, numInputPaths);
+      OrcConf.MAPRED_SHUFFLE_VALUE_SCHEMA.setString(jobConf, orcSchemaString);
+
+      job.setReducerClass(OrcDataPreprocessingReducer.class);
+      // Use LazyOutputFormat to avoid creating empty files.
+      LazyOutputFormat.setOutputFormatClass(job, OrcOutputFormat.class);
+      job.setOutputKeyClass(NullWritable.class);
+      job.setOutputValueClass(OrcStruct.class);
+      OrcConf.MAPRED_OUTPUT_SCHEMA.setString(jobConf, orcSchemaString);
     }
 
-    // Creates a wrapper for the schema of output key in mapper.
-    Schema mapperOutputKeySchema = Schema.createRecord(/*name*/"record", /*doc*/"", /*namespace*/"", false);
-    mapperOutputKeySchema.setFields(new ArrayList<>(fieldSet));
-    _logger.info("Mapper output schema: {}", mapperOutputKeySchema);
-
-    AvroJob.setInputKeySchema(job, avroSchema);
-    AvroJob.setMapOutputKeySchema(job, mapperOutputKeySchema);
-    AvroJob.setMapOutputValueSchema(job, avroSchema);
-    AvroJob.setOutputKeySchema(job, avroSchema);
-
     // Since we aren't extending AbstractHadoopJob, we need to add the jars for the job to
     // distributed cache ourselves. Take a look at how the addFilesToDistributedCache is
     // implemented so that you know what it does.
-    _logger.info("HDFS class path: " + _pathToDependencyJar);
+    LOGGER.info("HDFS class path: " + _pathToDependencyJar);
     if (_pathToDependencyJar != null) {
-      _logger.info("Copying jars locally.");
-      PinotHadoopJobPreparationHelper.addDepsJarToDistributedCacheHelper(_fileSystem, job, _pathToDependencyJar);
+      LOGGER.info("Copying jars locally.");
+      PinotHadoopJobPreparationHelper
+          .addDepsJarToDistributedCacheHelper(HadoopUtils.DEFAULT_FILE_SYSTEM, job, _pathToDependencyJar);
     } else {
-      _logger.info("Property '{}' not specified.", JobConfigConstants.PATH_TO_DEPS_JAR);
+      LOGGER.info("Property '{}' not specified.", JobConfigConstants.PATH_TO_DEPS_JAR);
     }
 
     long startTime = System.currentTimeMillis();
@@ -199,11 +291,31 @@ public class HadoopSegmentPreprocessingJob extends SegmentPreprocessingJob {
       throw new RuntimeException("Job failed : " + job);
     }
 
-    _logger.info("Finished pre-processing job in {}ms", (System.currentTimeMillis() - startTime));
+    LOGGER.info("Finished pre-processing job in {}ms", (System.currentTimeMillis() - startTime));
+  }
+
+  private void fetchPreProcessingOperations() {
+    _preprocessingOperations = new HashSet<>();
+    TableCustomConfig customConfig = _tableConfig.getCustomConfig();
+    if (customConfig != null) {
+      Map<String, String> customConfigMap = customConfig.getCustomConfigs();
+      if (customConfigMap != null && !customConfigMap.isEmpty()) {
+        String preprocessingOperationsString =
+            customConfigMap.getOrDefault(InternalConfigConstants.PREPROCESS_OPERATIONS, "");
+        String[] preprocessingOpsArray = preprocessingOperationsString.split(",");
+        for (String preprocessingOps : preprocessingOpsArray) {
+          _preprocessingOperations.add(preprocessingOps.trim().toLowerCase());
+        }
+      }
+    }
   }
 
   private void fetchPartitioningConfig() {
     // Fetch partition info from table config.
+    if (!_preprocessingOperations.contains("partition")) {
+      LOGGER.info("Partitioning is disabled.");
+      return;
+    }
     SegmentPartitionConfig segmentPartitionConfig = _tableConfig.getIndexingConfig().getSegmentPartitionConfig();
     if (segmentPartitionConfig != null) {
       Map<String, ColumnPartitionConfig> columnPartitionMap = segmentPartitionConfig.getColumnPartitionMap();
@@ -215,55 +327,63 @@ public class HadoopSegmentPreprocessingJob extends SegmentPreprocessingJob {
         _partitionFunction = segmentPartitionConfig.getFunctionName(_partitionColumn);
       }
     } else {
-      _logger.info("Segment partition config is null for table: {}", _tableConfig.getTableName());
+      LOGGER.info("Segment partition config is null for table: {}", _tableConfig.getTableName());
     }
   }
 
   private void fetchSortingConfig() {
+    if (!_preprocessingOperations.contains("sort")) {
+      LOGGER.info("Sorting is disabled.");
+      return;
+    }
     // Fetch sorting info from table config.
     IndexingConfig indexingConfig = _tableConfig.getIndexingConfig();
     List<String> sortedColumns = indexingConfig.getSortedColumn();
     if (sortedColumns != null) {
       Preconditions.checkArgument(sortedColumns.size() <= 1, "There should be at most 1 sorted column in the table.");
       if (sortedColumns.size() == 1) {
-        _sortedColumn = sortedColumns.get(0);
+        _sortingColumn = sortedColumns.get(0);
+        FieldSpec fieldSpec = _pinotTableSchema.getFieldSpecFor(_sortingColumn);
+        Preconditions.checkState(fieldSpec != null, "Failed to find sorting column: {} in the schema", _sortingColumn);
+        Preconditions
+            .checkState(fieldSpec.isSingleValueField(), "Cannot sort on multi-value column: %s", _sortingColumn);
+        _sortingColumnType = fieldSpec.getDataType();
+        Preconditions.checkState(_sortingColumnType != FieldSpec.DataType.BYTES, "Cannot sort on BYTES column: %s",
+            _sortingColumn);
+        LOGGER.info("Sorting the data with column: {} of type: {}", _sortingColumn, _sortingColumnType);
       }
     }
   }
 
   private void fetchResizingConfig() {
+    if (!_preprocessingOperations.contains("resize")) {
+      LOGGER.info("Resizing is disabled.");
+      return;
+    }
     TableCustomConfig tableCustomConfig = _tableConfig.getCustomConfig();
     if (tableCustomConfig == null) {
       _numOutputFiles = 0;
       return;
     }
     Map<String, String> customConfigsMap = tableCustomConfig.getCustomConfigs();
-    if (customConfigsMap != null && customConfigsMap.containsKey(InternalConfigConstants.PREPROCESS_NUM_FILES)) {
-      _numOutputFiles = Integer.parseInt(customConfigsMap.get(InternalConfigConstants.PREPROCESS_NUM_FILES));
+    if (customConfigsMap != null && customConfigsMap.containsKey(InternalConfigConstants.PREPROCESSING_NUM_REDUCERS)) {
+      _numOutputFiles = Integer.parseInt(customConfigsMap.get(InternalConfigConstants.PREPROCESSING_NUM_REDUCERS));
       Preconditions.checkState(_numOutputFiles > 0, String
-          .format("The value of %s should be positive! Current value: %s", InternalConfigConstants.PREPROCESS_NUM_FILES,
-              _numOutputFiles));
+          .format("The value of %s should be positive! Current value: %s",
+              InternalConfigConstants.PREPROCESSING_NUM_REDUCERS, _numOutputFiles));
     } else {
       _numOutputFiles = 0;
     }
-  }
 
-  private void setMaxNumRecordsConfigIfSpecified(Job job) {
-    TableCustomConfig tableCustomConfig = _tableConfig.getCustomConfig();
-    if (tableCustomConfig == null) {
-      return;
-    }
-    Map<String, String> customConfigsMap = tableCustomConfig.getCustomConfigs();
     if (customConfigsMap != null && customConfigsMap
-        .containsKey(InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE)) {
+        .containsKey(InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE)) {
       int maxNumRecords =
-          Integer.parseInt(customConfigsMap.get(InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE));
+          Integer.parseInt(customConfigsMap.get(InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE));
       Preconditions.checkArgument(maxNumRecords > 0,
-          "The value of " + InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE
+          "The value of " + InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE
               + " should be positive. Current value: " + maxNumRecords);
-      _logger.info("Setting {} to {}", InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE, maxNumRecords);
-      job.getConfiguration()
-          .set(InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE, Integer.toString(maxNumRecords));
+      LOGGER.info("Setting {} to {}", InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE, maxNumRecords);
+      _maxNumRecordsPerFile = maxNumRecords;
     }
   }
 
@@ -273,13 +393,12 @@ public class HadoopSegmentPreprocessingJob extends SegmentPreprocessingJob {
    * @return Input schema
    * @throws IOException exception when accessing to IO
    */
-  private Schema getSchema(Path inputPathDir)
+  private Schema getAvroSchema(Path inputPathDir)
       throws IOException {
-    FileSystem fs = FileSystem.get(new Configuration());
     Schema avroSchema = null;
-    for (FileStatus fileStatus : fs.listStatus(inputPathDir)) {
+    for (FileStatus fileStatus : HadoopUtils.DEFAULT_FILE_SYSTEM.listStatus(inputPathDir)) {
       if (fileStatus.isFile() && fileStatus.getPath().getName().endsWith(".avro")) {
-        _logger.info("Extracting schema from " + fileStatus.getPath());
+        LOGGER.info("Extracting schema from " + fileStatus.getPath());
         try (DataFileStream<GenericRecord> dataStreamReader = getAvroReader(inputPathDir)) {
           avroSchema = dataStreamReader.getSchema();
         }
@@ -289,19 +408,17 @@ public class HadoopSegmentPreprocessingJob extends SegmentPreprocessingJob {
     return avroSchema;
   }
 
-  private void addSortedColumnField(Schema schema, Set<Schema.Field> fieldSet) {
-    // Sorting is enabled. Adding sorted column value to mapper output key.
-    Schema sortedColumnSchema = schema.getField(_sortedColumn).schema();
-    Schema sortedColumnAsKeySchema;
-    if (sortedColumnSchema.getType().equals(Schema.Type.UNION)) {
-      sortedColumnAsKeySchema = Schema.createUnion(sortedColumnSchema.getTypes());
-    } else if (sortedColumnSchema.getType().equals(Schema.Type.ARRAY)) {
-      sortedColumnAsKeySchema = Schema.createArray(sortedColumnSchema.getElementType());
-    } else {
-      sortedColumnAsKeySchema = Schema.create(sortedColumnSchema.getType());
+  /**
+   * Finds the orc file and return its orc schema.
+   */
+  private String getOrcSchemaString(Path orcFile) {
+    String orcSchemaString;
+    try (Reader reader = OrcFile.createReader(orcFile, OrcFile.readerOptions(HadoopUtils.DEFAULT_CONFIGURATION))) {
+      orcSchemaString = reader.getSchema().toString();
+    } catch (Exception e) {
+      throw new IllegalStateException("Caught exception while extracting ORC schema from file: " + orcFile, e);
     }
-    Schema.Field columnField = new Schema.Field(_sortedColumn, sortedColumnAsKeySchema, "sortedColumn", null);
-    fieldSet.add(columnField);
+    return orcSchemaString;
   }
 
   private void validateConfigsAgainstSchema(Schema schema) {
@@ -314,22 +431,17 @@ public class HadoopSegmentPreprocessingJob extends SegmentPreprocessingJob {
       try {
         PartitionFunctionFactory.PartitionFunctionType.fromString(_partitionFunction);
       } catch (IllegalArgumentException e) {
-        _logger.error("Partition function needs to be one of Modulo, Murmur, ByteArray, HashCode, it is currently {}",
+        LOGGER.error("Partition function needs to be one of Modulo, Murmur, ByteArray, HashCode, it is currently {}",
             _partitionColumn);
         throw new IllegalArgumentException(e);
       }
     }
-    if (_sortedColumn != null) {
-      Preconditions.checkArgument(schema.getField(_sortedColumn) != null,
-          String.format("Sorted column: %s is not found from the schema of input files.", _sortedColumn));
+    if (_sortingColumn != null) {
+      Preconditions.checkArgument(schema.getField(_sortingColumn) != null,
+          String.format("Sorted column: %s is not found from the schema of input files.", _sortingColumn));
     }
   }
 
-  private void addHashCodeField(Set<Schema.Field> fieldSet) {
-    Schema.Field hashCodeField = new Schema.Field("hashcode", Schema.create(Schema.Type.INT), "hashcode", null);
-    fieldSet.add(hashCodeField);
-  }
-
   @Override
   protected org.apache.pinot.spi.data.Schema getSchema()
       throws IOException {
@@ -376,12 +488,10 @@ public class HadoopSegmentPreprocessingJob extends SegmentPreprocessingJob {
         DateTimeFieldSpec dateTimeFieldSpec = _pinotTableSchema.getSpecForTimeColumn(timeColumnName);
         if (dateTimeFieldSpec != null) {
           DateTimeFormatSpec formatSpec = new DateTimeFormatSpec(dateTimeFieldSpec.getFormat());
-          job.getConfiguration()
-              .set(InternalConfigConstants.SEGMENT_TIME_TYPE, formatSpec.getColumnUnit().toString());
+          job.getConfiguration().set(InternalConfigConstants.SEGMENT_TIME_TYPE, formatSpec.getColumnUnit().toString());
           job.getConfiguration()
               .set(InternalConfigConstants.SEGMENT_TIME_FORMAT, formatSpec.getTimeFormat().toString());
-          job.getConfiguration()
-              .set(InternalConfigConstants.SEGMENT_TIME_SDF_PATTERN, formatSpec.getSDFPattern());
+          job.getConfiguration().set(InternalConfigConstants.SEGMENT_TIME_SDF_PATTERN, formatSpec.getSDFPattern());
         }
       }
       job.getConfiguration().set(InternalConfigConstants.SEGMENT_PUSH_FREQUENCY,
@@ -393,41 +503,30 @@ public class HadoopSegmentPreprocessingJob extends SegmentPreprocessingJob {
     }
   }
 
-  private void setHadoopJobConfigs(Job job, int numInputPaths) {
+  private void setHadoopJobConfigs(Job job) {
+    job.setJarByClass(HadoopSegmentPreprocessingJob.class);
+    job.setJobName(getClass().getName());
+    FileOutputFormat.setOutputPath(job, _preprocessedOutputDir);
     job.getConfiguration().set(JobContext.JOB_NAME, this.getClass().getName());
     // Turn this on to always firstly use class paths that user specifies.
     job.getConfiguration().set(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, "true");
     // Turn this off since we don't need an empty file in the output directory
     job.getConfiguration().set(FileOutputCommitter.SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, "false");
 
-    job.setJarByClass(HadoopSegmentPreprocessingJob.class);
-
     String hadoopTokenFileLocation = System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
     if (hadoopTokenFileLocation != null) {
       job.getConfiguration().set(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY, hadoopTokenFileLocation);
     }
-
-    // Mapper configs.
-    job.setMapperClass(SegmentPreprocessingMapper.class);
-    job.setMapOutputKeyClass(AvroKey.class);
-    job.setMapOutputValueClass(AvroValue.class);
-    job.getConfiguration().setInt(JobContext.NUM_MAPS, numInputPaths);
-
-    // Reducer configs.
-    job.setReducerClass(SegmentPreprocessingReducer.class);
-    job.setOutputKeyClass(AvroKey.class);
-    job.setOutputValueClass(NullWritable.class);
   }
 
-  private void setSchemaParams(Job job, Schema avroSchema)
+  /**
+   * Cleans up outputs in preprocessed output directory.
+   */
+  public static void cleanUpPreprocessedOutputs(Path preprocessedOutputDir)
       throws IOException {
-    AvroMultipleOutputs.addNamedOutput(job, "avro", AvroKeyOutputFormat.class, avroSchema);
-    AvroMultipleOutputs.setCountersEnabled(job, true);
-    // Use LazyOutputFormat to avoid creating empty files.
-    LazyOutputFormat.setOutputFormatClass(job, AvroKeyOutputFormat.class);
-
-    // Input and output paths.
-    FileInputFormat.setInputPaths(job, _inputSegmentDir);
-    FileOutputFormat.setOutputPath(job, _preprocessedOutputDir);
+    if (HadoopUtils.DEFAULT_FILE_SYSTEM.exists(preprocessedOutputDir)) {
+      LOGGER.warn("Found output folder {}, deleting", preprocessedOutputDir);
+      HadoopUtils.DEFAULT_FILE_SYSTEM.delete(preprocessedOutputDir, true);
+    }
   }
 }
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/InternalConfigConstants.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/InternalConfigConstants.java
index 3a7938d..43009e8 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/InternalConfigConstants.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/InternalConfigConstants.java
@@ -31,19 +31,23 @@ public class InternalConfigConstants {
   public static final String SEGMENT_TIME_FORMAT = "segment.time.format";
   public static final String SEGMENT_TIME_SDF_PATTERN = "segment.time.sdf.pattern";
 
+  // The operations of preprocessing that is enabled.
+  public static final String PREPROCESS_OPERATIONS = "preprocessing.operations";
+
   // Partitioning configs
   public static final String PARTITION_COLUMN_CONFIG = "partition.column";
   public static final String NUM_PARTITIONS_CONFIG = "num.partitions";
   public static final String PARTITION_FUNCTION_CONFIG = "partition.function";
 
-  public static final String SORTED_COLUMN_CONFIG = "sorted.column";
+  public static final String SORTING_COLUMN_CONFIG = "sorting.column";
+  public static final String SORTING_COLUMN_TYPE = "sorting.type";
   public static final String ENABLE_PARTITIONING = "enable.partitioning";
 
   // max records per file in each partition. No effect otherwise.
-  public static final String PARTITION_MAX_RECORDS_PER_FILE = "partition.max.records.per.file";
+  public static final String PREPROCESSING_MAX_NUM_RECORDS_PER_FILE = "preprocessing.max.num.records.per.file";
 
   // Number of segments we want generated.
-  public static final String PREPROCESS_NUM_FILES = "preprocess.num.files";
+  public static final String PREPROCESSING_NUM_REDUCERS = "preprocessing.num.reducers";
 
   public static final String FAIL_ON_SCHEMA_MISMATCH = "fail.on.schema.mismatch";
 }
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/AvroDataPreprocessingMapper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/AvroDataPreprocessingMapper.java
new file mode 100644
index 0000000..6278e8e
--- /dev/null
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/AvroDataPreprocessingMapper.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.hadoop.job.mappers;
+
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapred.AvroValue;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.pinot.hadoop.job.InternalConfigConstants;
+import org.apache.pinot.hadoop.utils.preprocess.DataPreprocessingUtils;
+import org.apache.pinot.plugin.inputformat.avro.AvroRecordExtractor;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class AvroDataPreprocessingMapper extends Mapper<AvroKey<GenericRecord>, NullWritable, WritableComparable, AvroValue<GenericRecord>> {
+  private static final Logger LOGGER = LoggerFactory.getLogger(AvroDataPreprocessingMapper.class);
+
+  private String _sortingColumn = null;
+  private FieldSpec.DataType _sortingColumnType = null;
+  private AvroRecordExtractor _avroRecordExtractor;
+
+  @Override
+  public void setup(Context context) {
+    Configuration configuration = context.getConfiguration();
+    _avroRecordExtractor = new AvroRecordExtractor();
+    String sortingColumnConfig = configuration.get(InternalConfigConstants.SORTING_COLUMN_CONFIG);
+    if (sortingColumnConfig != null) {
+      _sortingColumn = sortingColumnConfig;
+      _sortingColumnType = FieldSpec.DataType.valueOf(configuration.get(InternalConfigConstants.SORTING_COLUMN_TYPE));
+      LOGGER.info("Initialized AvroDataPreprocessingMapper with sortingColumn: {} of type: {}", _sortingColumn,
+          _sortingColumnType);
+    } else {
+      LOGGER.info("Initialized AvroDataPreprocessingMapper without sorting column");
+    }
+  }
+
+  @Override
+  public void map(AvroKey<GenericRecord> key, NullWritable value, Context context)
+      throws IOException, InterruptedException {
+    GenericRecord record = key.datum();
+    if (_sortingColumn != null) {
+      Object object = record.get(_sortingColumn);
+      Preconditions
+          .checkState(object != null, "Failed to find value for sorting column: %s in record: %s", _sortingColumn,
+              record);
+      Object convertedValue = _avroRecordExtractor.convert(object);
+      Preconditions.checkState(convertedValue != null, "Invalid value: %s for sorting column: %s in record: %s", object,
+          _sortingColumn, record);
+      WritableComparable outputKey;
+      try {
+        outputKey = DataPreprocessingUtils.convertToWritableComparable(convertedValue, _sortingColumnType);
+      } catch (Exception e) {
+        throw new IllegalStateException(
+            String.format("Caught exception while processing sorting column: %s in record: %s", _sortingColumn, record),
+            e);
+      }
+      context.write(outputKey, new AvroValue<>(record));
+    } else {
+      context.write(NullWritable.get(), new AvroValue<>(record));
+    }
+  }
+}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/OrcDataPreprocessingMapper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/OrcDataPreprocessingMapper.java
new file mode 100644
index 0000000..d7d0694
--- /dev/null
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/OrcDataPreprocessingMapper.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.hadoop.job.mappers;
+
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.orc.mapred.OrcStruct;
+import org.apache.orc.mapred.OrcValue;
+import org.apache.pinot.hadoop.job.InternalConfigConstants;
+import org.apache.pinot.hadoop.utils.preprocess.DataPreprocessingUtils;
+import org.apache.pinot.hadoop.utils.preprocess.OrcUtils;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class OrcDataPreprocessingMapper extends Mapper<NullWritable, OrcStruct, WritableComparable, OrcValue> {
+  private static final Logger LOGGER = LoggerFactory.getLogger(OrcDataPreprocessingMapper.class);
+
+  private final OrcValue _valueWrapper = new OrcValue();
+  private String _sortingColumn = null;
+  private FieldSpec.DataType _sortingColumnType = null;
+  private int _sortingColumnId = -1;
+
+  @Override
+  public void setup(Context context) {
+    Configuration configuration = context.getConfiguration();
+    String sortingColumnConfig = configuration.get(InternalConfigConstants.SORTING_COLUMN_CONFIG);
+    if (sortingColumnConfig != null) {
+      _sortingColumn = sortingColumnConfig;
+      _sortingColumnType = FieldSpec.DataType.valueOf(configuration.get(InternalConfigConstants.SORTING_COLUMN_TYPE));
+      LOGGER.info("Initialized OrcDataPreprocessingMapper with sortingColumn: {} of type: {}", _sortingColumn,
+          _sortingColumnType);
+    } else {
+      LOGGER.info("Initialized OrcDataPreprocessingMapper without sorting column");
+    }
+  }
+
+  @Override
+  public void map(NullWritable key, OrcStruct value, Context context)
+      throws IOException, InterruptedException {
+    _valueWrapper.value = value;
+    if (_sortingColumn != null) {
+      if (_sortingColumnId == -1) {
+        List<String> fieldNames = value.getSchema().getFieldNames();
+        _sortingColumnId = fieldNames.indexOf(_sortingColumn);
+        Preconditions.checkState(_sortingColumnId != -1, "Failed to find sorting column: %s in the ORC fields: %s",
+            _sortingColumn, fieldNames);
+        LOGGER.info("Field id for sorting column: {} is: {}", _sortingColumn, _sortingColumnId);
+      }
+      WritableComparable sortingColumnValue = value.getFieldValue(_sortingColumnId);
+      WritableComparable outputKey;
+      try {
+        outputKey = DataPreprocessingUtils
+            .convertToWritableComparable(OrcUtils.convert(sortingColumnValue), _sortingColumnType);
+      } catch (Exception e) {
+        throw new IllegalStateException(String
+            .format("Caught exception while processing sorting column: %s, id: %d in ORC struct: %s", _sortingColumn,
+                _sortingColumnId, value), e);
+      }
+      context.write(outputKey, _valueWrapper);
+    } else {
+      context.write(NullWritable.get(), _valueWrapper);
+    }
+  }
+}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentPreprocessingMapper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentPreprocessingMapper.java
index e3f088d..6decee1 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentPreprocessingMapper.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentPreprocessingMapper.java
@@ -20,6 +20,7 @@ package org.apache.pinot.hadoop.job.mappers;
 
 import com.google.common.base.Preconditions;
 import java.io.IOException;
+import java.util.Map;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
@@ -40,6 +41,7 @@ import org.slf4j.LoggerFactory;
 
 public class SegmentPreprocessingMapper extends Mapper<AvroKey<GenericRecord>, NullWritable, AvroKey<GenericRecord>, AvroValue<GenericRecord>> {
   private static final Logger LOGGER = LoggerFactory.getLogger(SegmentPreprocessingMapper.class);
+  private Configuration _jobConf;
   private String _sortedColumn = null;
   private String _timeColumn = null;
   private Schema _outputKeySchema;
@@ -52,43 +54,42 @@ public class SegmentPreprocessingMapper extends Mapper<AvroKey<GenericRecord>, N
 
   @Override
   public void setup(final Context context) {
-    Configuration configuration = context.getConfiguration();
-
-    String tableName = configuration.get(JobConfigConstants.SEGMENT_TABLE_NAME);
-
-    _isAppend = configuration.get(InternalConfigConstants.IS_APPEND).equalsIgnoreCase("true");
+    _jobConf = context.getConfiguration();
+    logConfigurations();
+    String tableName = _jobConf.get(JobConfigConstants.SEGMENT_TABLE_NAME);
 
+    _isAppend = "true".equalsIgnoreCase(_jobConf.get(InternalConfigConstants.IS_APPEND));
     if (_isAppend) {
       // Get time column name
-      _timeColumn = configuration.get(InternalConfigConstants.TIME_COLUMN_CONFIG);
+      _timeColumn = _jobConf.get(InternalConfigConstants.TIME_COLUMN_CONFIG);
 
       // Get sample time column value
-      String timeColumnValue = configuration.get(InternalConfigConstants.TIME_COLUMN_VALUE);
-      String pushFrequency = configuration.get(InternalConfigConstants.SEGMENT_PUSH_FREQUENCY);
+      String timeColumnValue = _jobConf.get(InternalConfigConstants.TIME_COLUMN_VALUE);
+      String pushFrequency = _jobConf.get(InternalConfigConstants.SEGMENT_PUSH_FREQUENCY);
 
-      String timeType = configuration.get(InternalConfigConstants.SEGMENT_TIME_TYPE);
-      String timeFormat = configuration.get(InternalConfigConstants.SEGMENT_TIME_FORMAT);
+      String timeType = _jobConf.get(InternalConfigConstants.SEGMENT_TIME_TYPE);
+      String timeFormat = _jobConf.get(InternalConfigConstants.SEGMENT_TIME_FORMAT);
       DateTimeFormatSpec dateTimeFormatSpec;
       if (timeFormat.equals(DateTimeFieldSpec.TimeFormat.EPOCH.toString())) {
         dateTimeFormatSpec = new DateTimeFormatSpec(1, timeType, timeFormat);
       } else {
         dateTimeFormatSpec = new DateTimeFormatSpec(1, timeType, timeFormat,
-            configuration.get(InternalConfigConstants.SEGMENT_TIME_SDF_PATTERN));
+            _jobConf.get(InternalConfigConstants.SEGMENT_TIME_SDF_PATTERN));
       }
       _normalizedDateSegmentNameGenerator =
           new NormalizedDateSegmentNameGenerator(tableName, null, false, "APPEND", pushFrequency, dateTimeFormatSpec);
       _sampleNormalizedTimeColumnValue = _normalizedDateSegmentNameGenerator.getNormalizedDate(timeColumnValue);
     }
 
-    String sortedColumn = configuration.get(InternalConfigConstants.SORTED_COLUMN_CONFIG);
+    String sortedColumn = _jobConf.get(InternalConfigConstants.SORTING_COLUMN_CONFIG);
     // Logging the configs for the mapper
     LOGGER.info("Sorted Column: " + sortedColumn);
     if (sortedColumn != null) {
       _sortedColumn = sortedColumn;
     }
-    _outputKeySchema = AvroJob.getMapOutputKeySchema(configuration);
-    _outputSchema = AvroJob.getMapOutputValueSchema(configuration);
-    _enablePartitioning = Boolean.parseBoolean(configuration.get(InternalConfigConstants.ENABLE_PARTITIONING, "false"));
+    _outputKeySchema = AvroJob.getMapOutputKeySchema(_jobConf);
+    _outputSchema = AvroJob.getMapOutputValueSchema(_jobConf);
+    _enablePartitioning = Boolean.parseBoolean(_jobConf.get(InternalConfigConstants.ENABLE_PARTITIONING, "false"));
   }
 
   @Override
@@ -130,4 +131,26 @@ public class SegmentPreprocessingMapper extends Mapper<AvroKey<GenericRecord>, N
       throw e;
     }
   }
+
+  protected void logConfigurations() {
+    StringBuilder stringBuilder = new StringBuilder();
+    stringBuilder.append('{');
+    boolean firstEntry = true;
+    for (Map.Entry<String, String> entry : _jobConf) {
+      if (!firstEntry) {
+        stringBuilder.append(", ");
+      } else {
+        firstEntry = false;
+      }
+
+      stringBuilder.append(entry.getKey());
+      stringBuilder.append('=');
+      stringBuilder.append(entry.getValue());
+    }
+    stringBuilder.append('}');
+
+    LOGGER.info("*********************************************************************");
+    LOGGER.info("Job Configurations: {}", stringBuilder.toString());
+    LOGGER.info("*********************************************************************");
+  }
 }
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/AvroDataPreprocessingPartitioner.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/AvroDataPreprocessingPartitioner.java
new file mode 100644
index 0000000..74799c7
--- /dev/null
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/AvroDataPreprocessingPartitioner.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.hadoop.job.partitioners;
+
+import com.google.common.base.Preconditions;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.mapred.AvroValue;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.pinot.hadoop.job.InternalConfigConstants;
+import org.apache.pinot.plugin.inputformat.avro.AvroRecordExtractor;
+import org.apache.pinot.segment.spi.partition.PartitionFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class AvroDataPreprocessingPartitioner extends Partitioner<WritableComparable, AvroValue<GenericRecord>> implements Configurable {
+  private static final Logger LOGGER = LoggerFactory.getLogger(AvroDataPreprocessingPartitioner.class);
+
+  private Configuration _conf;
+  private String _partitionColumn;
+  private PartitionFunction _partitionFunction;
+  private AvroRecordExtractor _avroRecordExtractor;
+
+  @Override
+  public void setConf(Configuration conf) {
+    _conf = conf;
+    _avroRecordExtractor = new AvroRecordExtractor();
+    _partitionColumn = conf.get(InternalConfigConstants.PARTITION_COLUMN_CONFIG);
+    String partitionFunctionName = conf.get(InternalConfigConstants.PARTITION_FUNCTION_CONFIG);
+    int numPartitions = Integer.parseInt(conf.get(InternalConfigConstants.NUM_PARTITIONS_CONFIG));
+    _partitionFunction = PartitionFunctionFactory.getPartitionFunction(partitionFunctionName, numPartitions);
+    LOGGER.info(
+        "Initialized AvroDataPreprocessingPartitioner with partitionColumn: {}, partitionFunction: {}, numPartitions: {}",
+        _partitionColumn, partitionFunctionName, numPartitions);
+  }
+
+  @Override
+  public Configuration getConf() {
+    return _conf;
+  }
+
+  @Override
+  public int getPartition(WritableComparable key, AvroValue<GenericRecord> value, int numPartitions) {
+    GenericRecord record = value.datum();
+    Object object = record.get(_partitionColumn);
+    Preconditions
+        .checkState(object != null, "Failed to find value for partition column: %s in record: %s", _partitionColumn,
+            record);
+    Object convertedValue = _avroRecordExtractor.convert(object);
+    Preconditions.checkState(convertedValue != null, "Invalid value: %s for partition column: %s in record: %s", object,
+        _partitionColumn, record);
+    Preconditions.checkState(convertedValue instanceof Number || convertedValue instanceof String,
+        "Value for partition column: %s must be either a Number or a String, found: %s in record: %s", _partitionColumn,
+        convertedValue.getClass(), record);
+    // NOTE: Always partition with String type value because Broker uses String type value to prune segments
+    return _partitionFunction.getPartition(convertedValue.toString());
+  }
+}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/OrcDataPreprocessingPartitioner.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/OrcDataPreprocessingPartitioner.java
new file mode 100644
index 0000000..bef2cef
--- /dev/null
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/OrcDataPreprocessingPartitioner.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.hadoop.job.partitioners;
+
+import com.google.common.base.Preconditions;
+import java.util.List;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.orc.mapred.OrcStruct;
+import org.apache.orc.mapred.OrcValue;
+import org.apache.pinot.hadoop.job.InternalConfigConstants;
+import org.apache.pinot.hadoop.utils.preprocess.OrcUtils;
+import org.apache.pinot.segment.spi.partition.PartitionFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class OrcDataPreprocessingPartitioner extends Partitioner<WritableComparable, OrcValue> implements Configurable {
+  private static final Logger LOGGER = LoggerFactory.getLogger(OrcDataPreprocessingPartitioner.class);
+
+  private Configuration _conf;
+  private String _partitionColumn;
+  private PartitionFunction _partitionFunction;
+  private int _partitionColumnId = -1;
+
+  @Override
+  public void setConf(Configuration conf) {
+    _conf = conf;
+    _partitionColumn = conf.get(InternalConfigConstants.PARTITION_COLUMN_CONFIG);
+    String partitionFunctionName = conf.get(InternalConfigConstants.PARTITION_FUNCTION_CONFIG);
+    int numPartitions = Integer.parseInt(conf.get(InternalConfigConstants.NUM_PARTITIONS_CONFIG));
+    _partitionFunction = PartitionFunctionFactory.getPartitionFunction(partitionFunctionName, numPartitions);
+    LOGGER.info(
+        "Initialized OrcDataPreprocessingPartitioner with partitionColumn: {}, partitionFunction: {}, numPartitions: {}",
+        _partitionColumn, partitionFunctionName, numPartitions);
+  }
+
+  @Override
+  public Configuration getConf() {
+    return _conf;
+  }
+
+  @Override
+  public int getPartition(WritableComparable key, OrcValue value, int numPartitions) {
+    OrcStruct orcStruct = (OrcStruct) value.value;
+    if (_partitionColumnId == -1) {
+      List<String> fieldNames = orcStruct.getSchema().getFieldNames();
+      _partitionColumnId = fieldNames.indexOf(_partitionColumn);
+      Preconditions.checkState(_partitionColumnId != -1, "Failed to find partition column: %s in the ORC fields: %s",
+          _partitionColumn, fieldNames);
+      LOGGER.info("Field id for partition column: {} is: {}", _partitionColumn, _partitionColumnId);
+    }
+    WritableComparable partitionColumnValue = orcStruct.getFieldValue(_partitionColumnId);
+    Object convertedValue;
+    try {
+      convertedValue = OrcUtils.convert(partitionColumnValue);
+    } catch (Exception e) {
+      throw new IllegalStateException(String
+          .format("Caught exception while processing partition column: %s, id: %d in ORC struct: %s", _partitionColumn,
+              _partitionColumnId, orcStruct), e);
+    }
+    // NOTE: Always partition with String type value because Broker uses String type value to prune segments
+    return _partitionFunction.getPartition(convertedValue.toString());
+  }
+}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/SegmentPreprocessingReducer.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/AvroDataPreprocessingReducer.java
similarity index 63%
copy from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/SegmentPreprocessingReducer.java
copy to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/AvroDataPreprocessingReducer.java
index 9c07e6f..5fcbf10 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/SegmentPreprocessingReducer.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/AvroDataPreprocessingReducer.java
@@ -19,7 +19,6 @@
 package org.apache.pinot.hadoop.job.reducers;
 
 import java.io.IOException;
-import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.mapred.AvroKey;
 import org.apache.avro.mapred.AvroValue;
@@ -33,33 +32,43 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
-public class SegmentPreprocessingReducer<T> extends Reducer<T, AvroValue<GenericRecord>, AvroKey<GenericRecord>, NullWritable> {
-  private static final Logger LOGGER = LoggerFactory.getLogger(SegmentPreprocessingReducer.class);
+public class AvroDataPreprocessingReducer<T> extends Reducer<T, AvroValue<GenericRecord>, AvroKey<GenericRecord>, NullWritable> {
+  private static final Logger LOGGER = LoggerFactory.getLogger(AvroDataPreprocessingReducer.class);
 
   private AvroMultipleOutputs _multipleOutputs;
-  private AtomicInteger _counter;
-  private int _maxNumberOfRecords;
+  private long _numRecords;
+  private int _maxNumRecordsPerFile;
   private String _filePrefix;
 
   @Override
   public void setup(Context context) {
-    LOGGER.info("Using multiple outputs strategy.");
     Configuration configuration = context.getConfiguration();
-    _multipleOutputs = new AvroMultipleOutputs(context);
-    _counter = new AtomicInteger();
     // If it's 0, the output file won't be split into multiple files.
     // If not, output file will be split when the number of records reaches this number.
-    _maxNumberOfRecords = configuration.getInt(InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE, 0);
-    LOGGER.info("Maximum number of records per file: {}", _maxNumberOfRecords);
-    _filePrefix = RandomStringUtils.randomAlphanumeric(4);
+    _maxNumRecordsPerFile = configuration.getInt(InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE, 0);
+    if (_maxNumRecordsPerFile > 0) {
+      LOGGER.info("Using multiple outputs strategy.");
+      _multipleOutputs = new AvroMultipleOutputs(context);
+      _numRecords = 0L;
+      _filePrefix = RandomStringUtils.randomAlphanumeric(4);
+      LOGGER.info("Initialized AvroDataPreprocessingReducer with maxNumRecordsPerFile: {}", _maxNumRecordsPerFile);
+    } else {
+      LOGGER.info("Initialized AvroDataPreprocessingReducer without limit on maxNumRecordsPerFile");
+    }
   }
 
   @Override
   public void reduce(final T inputRecord, final Iterable<AvroValue<GenericRecord>> values, final Context context)
       throws IOException, InterruptedException {
-    for (final AvroValue<GenericRecord> value : values) {
-      String fileName = generateFileName();
-      _multipleOutputs.write(new AvroKey<>(value.datum()), NullWritable.get(), fileName);
+    if (_maxNumRecordsPerFile > 0) {
+      for (final AvroValue<GenericRecord> value : values) {
+        String fileName = _filePrefix + (_numRecords++ / _maxNumRecordsPerFile);
+        _multipleOutputs.write(new AvroKey<>(value.datum()), NullWritable.get(), fileName);
+      }
+    } else {
+      for (final AvroValue<GenericRecord> value : values) {
+        context.write(new AvroKey<>(value.datum()), NullWritable.get());
+      }
     }
   }
 
@@ -73,12 +82,4 @@ public class SegmentPreprocessingReducer<T> extends Reducer<T, AvroValue<Generic
     }
     LOGGER.info("Finished cleaning up reducer.");
   }
-
-  private String generateFileName() {
-    if (_maxNumberOfRecords == 0) {
-      return _filePrefix;
-    } else {
-      return _filePrefix + (_counter.getAndIncrement() / _maxNumberOfRecords);
-    }
-  }
 }
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/SegmentPreprocessingReducer.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/OrcDataPreprocessingReducer.java
similarity index 54%
rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/SegmentPreprocessingReducer.java
rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/OrcDataPreprocessingReducer.java
index 9c07e6f..a3387a2 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/SegmentPreprocessingReducer.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/OrcDataPreprocessingReducer.java
@@ -19,47 +19,56 @@
 package org.apache.pinot.hadoop.job.reducers;
 
 import java.io.IOException;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.mapred.AvroKey;
-import org.apache.avro.mapred.AvroValue;
-import org.apache.avro.mapreduce.AvroMultipleOutputs;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
+import org.apache.orc.mapred.OrcStruct;
+import org.apache.orc.mapred.OrcValue;
 import org.apache.pinot.hadoop.job.InternalConfigConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
-public class SegmentPreprocessingReducer<T> extends Reducer<T, AvroValue<GenericRecord>, AvroKey<GenericRecord>, NullWritable> {
-  private static final Logger LOGGER = LoggerFactory.getLogger(SegmentPreprocessingReducer.class);
+public class OrcDataPreprocessingReducer extends Reducer<WritableComparable, OrcValue, NullWritable, OrcStruct> {
+  private static final Logger LOGGER = LoggerFactory.getLogger(OrcDataPreprocessingReducer.class);
 
-  private AvroMultipleOutputs _multipleOutputs;
-  private AtomicInteger _counter;
-  private int _maxNumberOfRecords;
+  private int _maxNumRecordsPerFile;
+  private MultipleOutputs<NullWritable, OrcStruct> _multipleOutputs;
+  private long _numRecords;
   private String _filePrefix;
 
   @Override
   public void setup(Context context) {
-    LOGGER.info("Using multiple outputs strategy.");
     Configuration configuration = context.getConfiguration();
-    _multipleOutputs = new AvroMultipleOutputs(context);
-    _counter = new AtomicInteger();
     // If it's 0, the output file won't be split into multiple files.
     // If not, output file will be split when the number of records reaches this number.
-    _maxNumberOfRecords = configuration.getInt(InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE, 0);
-    LOGGER.info("Maximum number of records per file: {}", _maxNumberOfRecords);
-    _filePrefix = RandomStringUtils.randomAlphanumeric(4);
+    _maxNumRecordsPerFile = configuration.getInt(InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE, 0);
+    if (_maxNumRecordsPerFile > 0) {
+      LOGGER.info("Using multiple outputs strategy.");
+      _multipleOutputs = new MultipleOutputs<>(context);
+      _numRecords = 0L;
+      _filePrefix = RandomStringUtils.randomAlphanumeric(4);
+      LOGGER.info("Initialized OrcDataPreprocessingReducer with maxNumRecordsPerFile: {}", _maxNumRecordsPerFile);
+    } else {
+      LOGGER.info("Initialized OrcDataPreprocessingReducer without limit on maxNumRecordsPerFile");
+    }
   }
 
   @Override
-  public void reduce(final T inputRecord, final Iterable<AvroValue<GenericRecord>> values, final Context context)
+  public void reduce(WritableComparable key, Iterable<OrcValue> values, Context context)
       throws IOException, InterruptedException {
-    for (final AvroValue<GenericRecord> value : values) {
-      String fileName = generateFileName();
-      _multipleOutputs.write(new AvroKey<>(value.datum()), NullWritable.get(), fileName);
+    if (_maxNumRecordsPerFile > 0) {
+      for (final OrcValue value : values) {
+        String fileName = _filePrefix + (_numRecords++ / _maxNumRecordsPerFile);
+        _multipleOutputs.write(NullWritable.get(), (OrcStruct) value.value, fileName);
+      }
+    } else {
+      for (final OrcValue value : values) {
+        context.write(NullWritable.get(), (OrcStruct) value.value);
+      }
     }
   }
 
@@ -73,12 +82,4 @@ public class SegmentPreprocessingReducer<T> extends Reducer<T, AvroValue<Generic
     }
     LOGGER.info("Finished cleaning up reducer.");
   }
-
-  private String generateFileName() {
-    if (_maxNumberOfRecords == 0) {
-      return _filePrefix;
-    } else {
-      return _filePrefix + (_counter.getAndIncrement() / _maxNumberOfRecords);
-    }
-  }
 }
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/DataFileUtils.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/DataFileUtils.java
new file mode 100644
index 0000000..58e1c1d
--- /dev/null
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/DataFileUtils.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.hadoop.utils.preprocess;
+
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+
+
+public class DataFileUtils {
+  private DataFileUtils() {
+  }
+
+  public static final String AVRO_FILE_EXTENSION = ".avro";
+  public static final String ORC_FILE_EXTENSION = ".orc";
+
+  /**
+   * Returns the data files under the input directory with the given file extension.
+   */
+  public static List<Path> getDataFiles(Path inputDir, String dataFileExtension)
+      throws IOException {
+    FileStatus fileStatus = HadoopUtils.DEFAULT_FILE_SYSTEM.getFileStatus(inputDir);
+    Preconditions.checkState(fileStatus.isDirectory(), "Path: %s is not a directory", inputDir);
+    List<Path> dataFiles = new ArrayList<>();
+    getDataFilesHelper(HadoopUtils.DEFAULT_FILE_SYSTEM.listStatus(inputDir), dataFileExtension, dataFiles);
+    return dataFiles;
+  }
+
+  private static void getDataFilesHelper(FileStatus[] fileStatuses, String dataFileExtension, List<Path> dataFiles)
+      throws IOException {
+    for (FileStatus fileStatus : fileStatuses) {
+      Path path = fileStatus.getPath();
+      if (fileStatus.isDirectory()) {
+        getDataFilesHelper(HadoopUtils.DEFAULT_FILE_SYSTEM.listStatus(path), dataFileExtension, dataFiles);
+      } else {
+        Preconditions.checkState(fileStatus.isFile(), "Path: %s is neither a directory nor a file", path);
+        if (path.getName().endsWith(dataFileExtension)) {
+          dataFiles.add(path);
+        }
+      }
+    }
+  }
+}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/DataPreprocessingUtils.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/DataPreprocessingUtils.java
new file mode 100644
index 0000000..234e2d3
--- /dev/null
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/DataPreprocessingUtils.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.hadoop.utils.preprocess;
+
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.pinot.spi.data.FieldSpec;
+
+
+public class DataPreprocessingUtils {
+  private DataPreprocessingUtils() {
+  }
+
+  /**
+   * Converts a value into {@link WritableComparable} based on the given data type.
+   * <p>NOTE: The passed in value must be either a Number or a String.
+   */
+  public static WritableComparable convertToWritableComparable(Object value, FieldSpec.DataType dataType) {
+    if (value instanceof Number) {
+      Number numberValue = (Number) value;
+      switch (dataType) {
+        case INT:
+          return new IntWritable(numberValue.intValue());
+        case LONG:
+          return new LongWritable(numberValue.longValue());
+        case FLOAT:
+          return new FloatWritable(numberValue.floatValue());
+        case DOUBLE:
+          return new DoubleWritable(numberValue.doubleValue());
+        case STRING:
+          return new Text(numberValue.toString());
+        default:
+          throw new IllegalArgumentException("Unsupported data type: " + dataType);
+      }
+    } else if (value instanceof String) {
+      String stringValue = (String) value;
+      switch (dataType) {
+        case INT:
+          return new IntWritable(Integer.parseInt(stringValue));
+        case LONG:
+          return new LongWritable(Long.parseLong(stringValue));
+        case FLOAT:
+          return new FloatWritable(Float.parseFloat(stringValue));
+        case DOUBLE:
+          return new DoubleWritable(Double.parseDouble(stringValue));
+        case STRING:
+          return new Text(stringValue);
+        default:
+          throw new IllegalArgumentException("Unsupported data type: " + dataType);
+      }
+    } else {
+      throw new IllegalArgumentException(
+          String.format("Value: %s must be either a Number or a String, found: %s", value, value.getClass()));
+    }
+  }
+}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/HadoopUtils.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/HadoopUtils.java
new file mode 100644
index 0000000..0596259
--- /dev/null
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/HadoopUtils.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.hadoop.utils.preprocess;
+
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+
+
+public class HadoopUtils {
+  private HadoopUtils() {
+  }
+
+  public static final Configuration DEFAULT_CONFIGURATION;
+  public static final FileSystem DEFAULT_FILE_SYSTEM;
+
+  static {
+    DEFAULT_CONFIGURATION = new Configuration();
+    try {
+      DEFAULT_FILE_SYSTEM = FileSystem.get(DEFAULT_CONFIGURATION);
+    } catch (IOException e) {
+      throw new IllegalStateException("Failed to get the default file system", e);
+    }
+  }
+}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/OrcUtils.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/OrcUtils.java
new file mode 100644
index 0000000..dcfc3b5
--- /dev/null
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/OrcUtils.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.hadoop.utils.preprocess;
+
+import org.apache.hadoop.hive.serde2.io.DateWritable;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.ShortWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.orc.mapred.OrcTimestamp;
+
+
+public class OrcUtils {
+  private OrcUtils() {
+  }
+
+  /**
+   * Converts the ORC value into Number or String.
+   * <p>The following ORC types are supported:
+   * <ul>
+   *   <li>IntWritable -> Integer</li>
+   *   <li>LongWritable -> Long</li>
+   *   <li>FloatWritable -> Float</li>
+   *   <li>DoubleWritable -> Double</li>
+   *   <li>Text -> String</li>
+   *   <li>BooleanWritable -> String</li>
+   *   <li>ByteWritable -> Byte</li>
+   *   <li>ShortWritable -> Short</li>
+   *   <li>DateWritable -> Long</li>
+   *   <li>OrcTimestamp -> Long</li>
+   * </ul>
+   */
+  public static Object convert(WritableComparable orcValue) {
+    if (orcValue instanceof IntWritable) {
+      return ((IntWritable) orcValue).get();
+    }
+    if (orcValue instanceof LongWritable) {
+      return ((LongWritable) orcValue).get();
+    }
+    if (orcValue instanceof FloatWritable) {
+      return ((FloatWritable) orcValue).get();
+    }
+    if (orcValue instanceof DoubleWritable) {
+      return ((DoubleWritable) orcValue).get();
+    }
+    if (orcValue instanceof Text) {
+      return orcValue.toString();
+    }
+    if (orcValue instanceof BooleanWritable) {
+      return Boolean.toString(((BooleanWritable) orcValue).get());
+    }
+    if (orcValue instanceof ByteWritable) {
+      return ((ByteWritable) orcValue).get();
+    }
+    if (orcValue instanceof ShortWritable) {
+      return ((ShortWritable) orcValue).get();
+    }
+    if (orcValue instanceof DateWritable) {
+      return ((DateWritable) orcValue).get().getTime();
+    }
+    if (orcValue instanceof OrcTimestamp) {
+      return ((OrcTimestamp) orcValue).getTime();
+    }
+    throw new IllegalArgumentException(
+        String.format("Illegal ORC value: %s, class: %s", orcValue, orcValue.getClass()));
+  }
+}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/RawDataFormat.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/RawDataFormat.java
new file mode 100644
index 0000000..0ae3ca3
--- /dev/null
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/RawDataFormat.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.hadoop.utils.preprocess;
+
+/**
+ * The current supported data formats in Pinot Preprocessing jobs
+ */
+public enum RawDataFormat {
+  AVRO, ORC
+}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/TextComparator.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/TextComparator.java
new file mode 100644
index 0000000..65f1222
--- /dev/null
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/TextComparator.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.hadoop.utils.preprocess;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.pinot.spi.utils.StringUtils;
+
+
+/**
+ * Override the Text comparison logic to compare with the decoded String instead of the byte array.
+ */
+public class TextComparator extends WritableComparator {
+  public TextComparator() {
+    super(Text.class);
+  }
+
+  @Override
+  public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+    int n1 = WritableUtils.decodeVIntSize(b1[s1]);
+    int n2 = WritableUtils.decodeVIntSize(b2[s2]);
+    return StringUtils.decodeUtf8(b1, s1 + n1, l1 - n1).compareTo(StringUtils.decodeUtf8(b2, s2 + n2, l2 - n2));
+  }
+}
diff --git a/pom.xml b/pom.xml
index aaf122f..7337462 100644
--- a/pom.xml
+++ b/pom.xml
@@ -944,6 +944,11 @@
         <classifier>hadoop2</classifier>
       </dependency>
       <dependency>
+        <groupId>org.apache.orc</groupId>
+        <artifactId>orc-mapreduce</artifactId>
+        <version>1.5.9</version>
+      </dependency>
+      <dependency>
         <groupId>org.codehaus.jackson</groupId>
         <artifactId>jackson-core-asl</artifactId>
         <version>1.9.13</version>

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org