You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by je...@apache.org on 2019/06/27 18:52:25 UTC

[incubator-pinot] branch master updated: Add segment pre-processing Hadoop job (#4253)

This is an automated email from the ASF dual-hosted git repository.

jenniferdai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 89a3449  Add segment pre-processing Hadoop job (#4253)
89a3449 is described below

commit 89a344998bce7aeaeda8a4a49d5df175bdb82185
Author: Jialiang Li <jl...@linkedin.com>
AuthorDate: Thu Jun 27 11:52:21 2019 -0700

    Add segment pre-processing Hadoop job (#4253)
    
    * Adds partitioning, sorting and resizing functionalities before Hadoop Pinot segment generation
---
 .../apache/pinot/common/config/TableConfig.java    |   8 +-
 .../data/partition/MurmurPartitionFunction.java    |   2 +-
 pinot-hadoop/pom.xml                               |   5 +
 .../pinot/hadoop/PinotHadoopJobLauncher.java       |   7 +-
 .../pinot/hadoop/io/CombineAvroKeyInputFormat.java |  57 +++
 .../pinot/hadoop/job/JobConfigConstants.java       |   5 +
 .../pinot/hadoop/job/SegmentCreationJob.java       |  33 +-
 .../pinot/hadoop/job/SegmentPreprocessingJob.java  | 483 +++++++++++++++++++++
 .../{mapper => mappers}/SegmentCreationMapper.java |   2 +-
 .../job/mappers/SegmentPreprocessingMapper.java    |  85 ++++
 .../job/partitioners/GenericPartitioner.java       |  63 +++
 .../job/partitioners/PartitionFunctionFactory.java |  98 +++++
 .../job/reducers/SegmentPreprocessingReducer.java  |  84 ++++
 .../pinot/hadoop/utils/JobPreparationHelper.java   |  58 +++
 .../tests/BaseClusterIntegrationTest.java          |  15 +
 .../pinot/integration/tests/ClusterTest.java       |  23 +-
 .../ControllerPeriodicTasksIntegrationTests.java   |   4 +-
 ...mentBuildPushOfflineClusterIntegrationTest.java |  87 +++-
 .../tests/HybridClusterIntegrationTest.java        |   2 +-
 ...ridClusterIntegrationTestCommandLineRunner.java |   2 +-
 .../tests/OfflineClusterIntegrationTest.java       |   6 +-
 .../tests/SimpleMinionClusterIntegrationTest.java  |   6 +-
 pom.xml                                            |   6 +
 23 files changed, 1087 insertions(+), 54 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/config/TableConfig.java b/pinot-common/src/main/java/org/apache/pinot/common/config/TableConfig.java
index 2f4ee1c..502a7a6 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/config/TableConfig.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/config/TableConfig.java
@@ -457,6 +457,7 @@ public class TableConfig {
     private TableTaskConfig _taskConfig;
     private RoutingConfig _routingConfig;
     private HllConfig _hllConfig;
+    private SegmentPartitionConfig _segmentPartitionConfig;
     private StarTreeIndexSpec _starTreeIndexSpec;
 
     public Builder(TableType tableType) {
@@ -609,6 +610,11 @@ public class TableConfig {
       return this;
     }
 
+    public Builder setSegmentPartitionConfig(SegmentPartitionConfig segmentPartitionConfig) {
+      _segmentPartitionConfig = segmentPartitionConfig;
+      return this;
+    }
+
     public TableConfig build() {
       // Validation config
       SegmentsValidationAndRetentionConfig validationConfig = new SegmentsValidationAndRetentionConfig();
@@ -646,7 +652,7 @@ public class TableConfig {
       StreamConsumptionConfig streamConsumptionConfig = new StreamConsumptionConfig();
       streamConsumptionConfig.setStreamPartitionAssignmentStrategy(_streamPartitionAssignmentStrategy);
       indexingConfig.setStreamConsumptionConfig(streamConsumptionConfig);
-      // TODO: set SegmentPartitionConfig here
+      indexingConfig.setSegmentPartitionConfig(_segmentPartitionConfig);
 
       if (_customConfig == null) {
         _customConfig = new TableCustomConfig();
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/partition/MurmurPartitionFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/data/partition/MurmurPartitionFunction.java
index fd522fb..24da4dd 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/partition/MurmurPartitionFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/partition/MurmurPartitionFunction.java
@@ -35,7 +35,7 @@ public class MurmurPartitionFunction implements PartitionFunction {
    * @param numPartitions Number of partitions.
    */
   public MurmurPartitionFunction(int numPartitions) {
-    Preconditions.checkArgument(numPartitions > 0, "Number of partitions must be > 0, specified", numPartitions);
+    Preconditions.checkArgument(numPartitions > 0, "Number of partitions must be > 0");
     _numPartitions = numPartitions;
   }
 
diff --git a/pinot-hadoop/pom.xml b/pinot-hadoop/pom.xml
index cfcfc05..f6c2b55 100644
--- a/pinot-hadoop/pom.xml
+++ b/pinot-hadoop/pom.xml
@@ -118,6 +118,11 @@
       <scope>provided</scope>
     </dependency>
     <dependency>
+      <groupId>org.apache.avro</groupId>
+      <artifactId>avro-mapred</artifactId>
+      <classifier>hadoop2</classifier>
+    </dependency>
+    <dependency>
       <groupId>org.testng</groupId>
       <artifactId>testng</artifactId>
       <scope>test</scope>
diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/PinotHadoopJobLauncher.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/PinotHadoopJobLauncher.java
index 2c7b0da..9f59d6d 100644
--- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/PinotHadoopJobLauncher.java
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/PinotHadoopJobLauncher.java
@@ -21,6 +21,7 @@ package org.apache.pinot.hadoop;
 import java.io.FileInputStream;
 import java.util.Arrays;
 import java.util.Properties;
+import org.apache.pinot.hadoop.job.SegmentPreprocessingJob;
 import org.apache.pinot.hadoop.job.SegmentCreationJob;
 import org.apache.pinot.hadoop.job.SegmentTarPushJob;
 import org.apache.pinot.hadoop.job.SegmentUriPushJob;
@@ -29,7 +30,8 @@ import org.apache.pinot.hadoop.job.SegmentUriPushJob;
 public class PinotHadoopJobLauncher {
 
   enum PinotHadoopJobType {
-    SegmentCreation, SegmentTarPush, SegmentUriPush, SegmentCreationAndTarPush, SegmentCreationAndUriPush
+    SegmentCreation, SegmentTarPush, SegmentUriPush, SegmentCreationAndTarPush, SegmentCreationAndUriPush,
+    SegmentPreprocessing
   }
 
   private static final String USAGE = "usage: [job_type] [job.properties]";
@@ -61,6 +63,9 @@ public class PinotHadoopJobLauncher {
         new SegmentCreationJob(jobConf).run();
         new SegmentUriPushJob(jobConf).run();
         break;
+      case SegmentPreprocessing:
+        new SegmentPreprocessingJob(jobConf).run();
+        break;
       default:
         throw new RuntimeException("Not a valid jobType - " + jobType);
     }
diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/io/CombineAvroKeyInputFormat.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/io/CombineAvroKeyInputFormat.java
new file mode 100644
index 0000000..a36f8db
--- /dev/null
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/io/CombineAvroKeyInputFormat.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.hadoop.io;
+
+import java.io.IOException;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapreduce.AvroKeyInputFormat;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
+import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReaderWrapper;
+import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
+
+
+/**
+ * Create Hadoop splits across files.
+ * By default, hadoop cannot create splits across files. As a result, if the input has lots of small avro files,
+ * the job will end up having lots of short running mappers, which is inefficient.
+ * This class allows creating splits across avro files so we can have larger splits and fewer long running mappers,
+ * which improves the performance of the Hadoop job.
+ */
+public class CombineAvroKeyInputFormat<T> extends CombineFileInputFormat<AvroKey<T>, NullWritable> {
+  @Override
+  public RecordReader<AvroKey<T>, NullWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
+      throws IOException {
+    Class cls = AvroKeyRecordReaderWrapper.class;
+
+    return new CombineFileRecordReader<>((CombineFileSplit) split, context,
+        (Class<? extends RecordReader<AvroKey<T>, NullWritable>>) cls);
+  }
+
+  public static class AvroKeyRecordReaderWrapper<T> extends CombineFileRecordReaderWrapper<AvroKey<T>, NullWritable> {
+    public AvroKeyRecordReaderWrapper(CombineFileSplit split, TaskAttemptContext context, Integer index)
+        throws IOException, InterruptedException {
+      super(new AvroKeyInputFormat<>(), split, context, index);
+    }
+  }
+}
diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/JobConfigConstants.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/JobConfigConstants.java
index 35557b5..19b350c 100644
--- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/JobConfigConstants.java
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/JobConfigConstants.java
@@ -21,6 +21,7 @@ package org.apache.pinot.hadoop.job;
 public class JobConfigConstants {
   public static final String PATH_TO_INPUT = "path.to.input";
   public static final String PATH_TO_OUTPUT = "path.to.output";
+  public static final String PREPROCESS_PATH_TO_OUTPUT = "preprocess.path.to.output";
   public static final String PATH_TO_DEPS_JAR = "path.to.deps.jar";
   public static final String PATH_TO_READER_CONFIG = "path.to.reader.config";
   // Leave this for backward compatibility. We prefer to use the schema fetched from the controller.
@@ -52,4 +53,8 @@ public class JobConfigConstants {
 
   // The path to the record reader to be configured
   public static final String RECORD_READER_PATH = "record.reader.path";
+
+  public static final String ENABLE_PARTITIONING = "enable.partitioning";
+  public static final String ENABLE_SORTING = "enable.sorting";
+  public static final String ENABLE_RESIZING = "enable.resizing";
 }
diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentCreationJob.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentCreationJob.java
index 997dfec..b4e56ef 100644
--- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentCreationJob.java
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentCreationJob.java
@@ -32,7 +32,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobContext;
@@ -46,7 +45,8 @@ import org.apache.pinot.common.config.SegmentsValidationAndRetentionConfig;
 import org.apache.pinot.common.config.TableConfig;
 import org.apache.pinot.common.data.Schema;
 import org.apache.pinot.common.utils.StringUtil;
-import org.apache.pinot.hadoop.job.mapper.SegmentCreationMapper;
+import org.apache.pinot.hadoop.job.mappers.SegmentCreationMapper;
+import org.apache.pinot.hadoop.utils.JobPreparationHelper;
 import org.apache.pinot.hadoop.utils.PushLocation;
 
 
@@ -202,16 +202,7 @@ public class SegmentCreationJob extends BaseSegmentJob {
     }
     _logger.info("Making directory: {}", dirPath);
     _fileSystem.mkdirs(dirPath);
-    setDirPermission(dirPath);
-  }
-
-  protected void setDirPermission(Path dirPath)
-      throws IOException {
-    if (_defaultPermissionsMask != null) {
-      FsPermission permission = FsPermission.getDirDefault().applyUMask(new FsPermission(_defaultPermissionsMask));
-      _logger.info("Setting permission: {} to directory: {}", permission, dirPath);
-      _fileSystem.setPermission(dirPath, permission);
-    }
+    JobPreparationHelper.setDirPermission(_fileSystem, dirPath, _defaultPermissionsMask);
   }
 
   @Nullable
@@ -263,23 +254,7 @@ public class SegmentCreationJob extends BaseSegmentJob {
   protected void addDepsJarToDistributedCache(Job job)
       throws IOException {
     if (_depsJarDir != null) {
-      addDepsJarToDistributedCacheHelper(job, _depsJarDir);
-    }
-  }
-
-  protected void addDepsJarToDistributedCacheHelper(Job job, Path depsJarDir)
-      throws IOException {
-    FileStatus[] fileStatuses = _fileSystem.listStatus(depsJarDir);
-    for (FileStatus fileStatus : fileStatuses) {
-      if (fileStatus.isDirectory()) {
-        addDepsJarToDistributedCacheHelper(job, fileStatus.getPath());
-      } else {
-        Path depJarPath = fileStatus.getPath();
-        if (depJarPath.getName().endsWith(".jar")) {
-          _logger.info("Adding deps jar: {} to distributed cache", depJarPath);
-          job.addCacheArchive(depJarPath.toUri());
-        }
-      }
+      JobPreparationHelper.addDepsJarToDistributedCacheHelper(_fileSystem, job, _depsJarDir);
     }
   }
 
diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentPreprocessingJob.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentPreprocessingJob.java
new file mode 100644
index 0000000..aadeb3b
--- /dev/null
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentPreprocessingJob.java
@@ -0,0 +1,483 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.hadoop.job;
+
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.zip.GZIPInputStream;
+import javax.annotation.Nullable;
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapred.AvroValue;
+import org.apache.avro.mapreduce.AvroJob;
+import org.apache.avro.mapreduce.AvroKeyOutputFormat;
+import org.apache.avro.mapreduce.AvroMultipleOutputs;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
+import org.apache.pinot.common.config.ColumnPartitionConfig;
+import org.apache.pinot.common.config.IndexingConfig;
+import org.apache.pinot.common.config.SegmentPartitionConfig;
+import org.apache.pinot.common.config.TableConfig;
+import org.apache.pinot.common.config.TableCustomConfig;
+import org.apache.pinot.hadoop.io.CombineAvroKeyInputFormat;
+import org.apache.pinot.hadoop.job.mappers.SegmentPreprocessingMapper;
+import org.apache.pinot.hadoop.job.partitioners.GenericPartitioner;
+import org.apache.pinot.hadoop.job.reducers.SegmentPreprocessingReducer;
+import org.apache.pinot.hadoop.utils.JobPreparationHelper;
+import org.apache.pinot.hadoop.utils.PushLocation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.hadoop.mapreduce.MRJobConfig.*;
+import static org.apache.hadoop.security.UserGroupInformation.*;
+
+
+/**
+ * A Hadoop job which provides partitioning, sorting, and resizing against the input files, which is raw data in Avro format.
+ * Thus, the output files are partitioned, sorted, resized after this job.
+ * In order to run this job, the following configs need to be specified in job properties:
+ * * enable.partitioning: false by default. If enabled, this job will fetch the partition config from table config.
+ * * enable.sorting: false by default. If enabled, the job will fetch sorted column from table config.
+ * * enable.resizing: false by default. If partitioning is enabled, this is force to be disabled. If enabled, min.num.output.files is required.
+ * * min.num.output.files: null by default. It's required if resizing is enabled.
+ * * max.num.records: null by default. The output file will be split into multiple files if the number of records exceeded max.num.records.
+ */
+public class SegmentPreprocessingJob extends BaseSegmentJob {
+  private static final Logger _logger = LoggerFactory.getLogger(SegmentPreprocessingJob.class);
+
+  private boolean _enablePartitioning = false;
+  private boolean _enableSorting = false;
+  private boolean _enableResizing = false;
+
+  private String _partitionColumn;
+  private int _numberOfPartitions;
+  private String _partitionFunction;
+  private String _sortedColumn;
+  private int _numberOfOutputFiles;
+
+  private final Path _inputSegmentDir;
+  private final Path _preprocessedOutputDir;
+  protected final String _rawTableName;
+  protected final List<PushLocation> _pushLocations;
+
+  // Optional.
+  private final Path _pathToDependencyJar;
+  private final String _defaultPermissionsMask;
+
+  private TableConfig _tableConfig;
+  protected FileSystem _fileSystem;
+
+  public SegmentPreprocessingJob(final Properties properties) {
+    super(properties);
+
+    if (_properties.getProperty(JobConfigConstants.ENABLE_PARTITIONING) != null) {
+      _enablePartitioning = Boolean.parseBoolean(_properties.getProperty(JobConfigConstants.ENABLE_PARTITIONING));
+    }
+
+    if (_properties.getProperty(JobConfigConstants.ENABLE_SORTING) != null) {
+      _enableSorting = Boolean.parseBoolean(_properties.getProperty(JobConfigConstants.ENABLE_SORTING));
+    }
+
+    boolean resizingForcedDisabled = false;
+    if (_properties.getProperty(JobConfigConstants.ENABLE_RESIZING) != null) {
+      if (!_enablePartitioning) {
+        _enableResizing = Boolean.parseBoolean(_properties.getProperty(JobConfigConstants.ENABLE_RESIZING));
+      } else {
+        resizingForcedDisabled = true;
+        _logger.warn("Resizing cannot be enabled since partitioning has already been enabled!");
+      }
+    }
+
+    // get input/output paths.
+    _inputSegmentDir = Preconditions.checkNotNull(getPathFromProperty(JobConfigConstants.PATH_TO_INPUT));
+    _preprocessedOutputDir =
+        Preconditions.checkNotNull(getPathFromProperty(JobConfigConstants.PREPROCESS_PATH_TO_OUTPUT));
+    _rawTableName = Preconditions.checkNotNull(_properties.getProperty(JobConfigConstants.SEGMENT_TABLE_NAME));
+
+    _pathToDependencyJar = getPathFromProperty(JobConfigConstants.PATH_TO_DEPS_JAR);
+    _defaultPermissionsMask = _properties.getProperty(JobConfigConstants.DEFAULT_PERMISSIONS_MASK, null);
+
+    // Optional push location and table parameters. If set, will use the table config and schema from the push hosts.
+    String pushHostsString = _properties.getProperty(JobConfigConstants.PUSH_TO_HOSTS);
+    String pushPortString = _properties.getProperty(JobConfigConstants.PUSH_TO_PORT);
+    if (pushHostsString != null && pushPortString != null) {
+      _pushLocations =
+          PushLocation.getPushLocations(StringUtils.split(pushHostsString, ','), Integer.parseInt(pushPortString));
+    } else {
+      throw new RuntimeException(String
+          .format("Push location is mis-configured! %s: %s, %s: %s", JobConfigConstants.PUSH_TO_HOSTS, pushHostsString,
+              JobConfigConstants.PUSH_TO_PORT, pushPortString));
+    }
+
+    _logger.info("*********************************************************************");
+    _logger.info("enable.partitioning: {}", _enablePartitioning);
+    _logger.info("enable.sorting: {}", _enableSorting);
+    _logger.info("enable.resizing: {} {}", _enableResizing,
+        (resizingForcedDisabled ? "(forced to be disabled since partitioning is enabled)" : ""));
+    _logger.info("path.to.input: {}", _inputSegmentDir);
+    _logger.info("preprocess.path.to.output: {}", _preprocessedOutputDir);
+    _logger.info("path.to.deps.jar: {}", _pathToDependencyJar);
+    _logger.info("push.locations: {}", _pushLocations);
+    _logger.info("*********************************************************************");
+  }
+
+  public void run()
+      throws Exception {
+    if (!_enablePartitioning && !_enableSorting && !_enableResizing) {
+      _logger.info("Pre-processing job is disabled.");
+      return;
+    } else {
+      _logger.info("Starting {}", getClass().getSimpleName());
+    }
+
+    _fileSystem = FileSystem.get(_conf);
+    final List<Path> inputDataPath = getDataFilePaths(_inputSegmentDir);
+
+    if (_fileSystem.exists(_preprocessedOutputDir)) {
+      _logger.warn("Found the output folder {}, deleting it", _preprocessedOutputDir);
+      _fileSystem.delete(_preprocessedOutputDir, true);
+    }
+    JobPreparationHelper.setDirPermission(_fileSystem, _preprocessedOutputDir, _defaultPermissionsMask);
+
+    _tableConfig = getTableConfig();
+    Preconditions.checkArgument(_tableConfig != null, "Table config shouldn't be null");
+
+    if (_enablePartitioning) {
+      fetchPartitioningConfig();
+      _logger.info("partition.column: {}", _partitionColumn);
+      _logger.info("num.partitions: {}", _numberOfPartitions);
+      _logger.info("partition.function: {}", _partitionColumn);
+    }
+
+    if (_enableSorting) {
+      fetchSortingConfig();
+      _logger.info("sorted.column: {}", _sortedColumn);
+    }
+
+    if (_enableResizing) {
+      fetchResizingConfig();
+      _logger.info("minimum number of output files: {}", _numberOfOutputFiles);
+    }
+
+    _logger.info("Initializing a pre-processing job");
+    Job job = Job.getInstance(_conf);
+
+    job.getConfiguration().set(JobContext.JOB_NAME, this.getClass().getName());
+    // Turn this on to always firstly use class paths that user specifies.
+    job.getConfiguration().set(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, "true");
+    // Turn this off since we don't need an empty file in the output directory
+    job.getConfiguration().set(FileOutputCommitter.SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, "false");
+
+    job.setJarByClass(SegmentPreprocessingJob.class);
+
+    String hadoopTokenFileLocation = System.getenv(HADOOP_TOKEN_FILE_LOCATION);
+    if (hadoopTokenFileLocation != null) {
+      job.getConfiguration().set(MAPREDUCE_JOB_CREDENTIALS_BINARY, hadoopTokenFileLocation);
+    }
+
+    // Schema configs.
+    Schema schema = getSchema(inputDataPath.get(0));
+    _logger.info("Schema is: {}", schema.toString(true));
+
+    // Validates configs against schema.
+    validateConfigsAgainstSchema(schema);
+
+    // Mapper configs.
+    job.setMapperClass(SegmentPreprocessingMapper.class);
+    job.setMapOutputKeyClass(AvroKey.class);
+    job.setMapOutputValueClass(AvroValue.class);
+    job.getConfiguration().setInt(JobContext.NUM_MAPS, inputDataPath.size());
+
+    // Reducer configs.
+    job.setReducerClass(SegmentPreprocessingReducer.class);
+    job.setOutputKeyClass(AvroKey.class);
+    job.setOutputValueClass(NullWritable.class);
+
+    AvroMultipleOutputs.addNamedOutput(job, "avro", AvroKeyOutputFormat.class, schema);
+    AvroMultipleOutputs.setCountersEnabled(job, true);
+    // Use LazyOutputFormat to avoid creating empty files.
+    LazyOutputFormat.setOutputFormatClass(job, AvroKeyOutputFormat.class);
+
+    // Input and output paths.
+    FileInputFormat.setInputPaths(job, _inputSegmentDir);
+    FileOutputFormat.setOutputPath(job, _preprocessedOutputDir);
+    _logger.info("Total number of files to be pre-processed: {}", inputDataPath.size());
+
+    // Set up mapper output key
+    Set<Schema.Field> fieldSet = new HashSet<>();
+
+    // Partition configs.
+    int numReduceTasks = (_numberOfPartitions != 0) ? _numberOfPartitions : inputDataPath.size();
+    if (_partitionColumn != null) {
+      job.getConfiguration().set(JobConfigConstants.ENABLE_PARTITIONING, "true");
+      job.setPartitionerClass(GenericPartitioner.class);
+      job.getConfiguration().set("partition.column", _partitionColumn);
+      if (_partitionFunction != null) {
+        job.getConfiguration().set("partition.function", _partitionFunction);
+      }
+      job.getConfiguration().set("num.partitions", Integer.toString(numReduceTasks));
+    } else {
+      if (_numberOfOutputFiles > 0) {
+        numReduceTasks = _numberOfOutputFiles;
+      }
+      // Partitioning is disabled. Adding hashcode as one of the fields to mapper output key.
+      // so that all the rows can be spread evenly.
+      addHashCodeField(fieldSet);
+    }
+    setMaxNumRecordsConfigIfSpecified(job);
+    job.setInputFormatClass(CombineAvroKeyInputFormat.class);
+
+    _logger.info("Number of reduce tasks for pre-processing job: {}", numReduceTasks);
+    job.setNumReduceTasks(numReduceTasks);
+
+    // Sort config.
+    if (_sortedColumn != null) {
+      _logger.info("Adding sorted column: {} to job config", _sortedColumn);
+      job.getConfiguration().set("sorted.column", _sortedColumn);
+
+      addSortedColumnField(schema, fieldSet);
+    } else {
+      // If sorting is disabled, hashcode will be the only factor for sort/group comparator.
+      addHashCodeField(fieldSet);
+    }
+
+    // Creates a wrapper for the schema of output key in mapper.
+    Schema mapperOutputKeySchema = Schema.createRecord(/*name*/"record", /*doc*/"", /*namespace*/"", false);
+    mapperOutputKeySchema.setFields(new ArrayList<>(fieldSet));
+    _logger.info("Mapper output schema: {}", mapperOutputKeySchema);
+
+    AvroJob.setInputKeySchema(job, schema);
+    AvroJob.setMapOutputKeySchema(job, mapperOutputKeySchema);
+    AvroJob.setMapOutputValueSchema(job, schema);
+    AvroJob.setOutputKeySchema(job, schema);
+
+    // Since we aren't extending AbstractHadoopJob, we need to add the jars for the job to
+    // distributed cache ourselves. Take a look at how the addFilesToDistributedCache is
+    // implemented so that you know what it does.
+    _logger.info("HDFS class path: " + _pathToDependencyJar);
+    if (_pathToDependencyJar != null) {
+      _logger.info("Copying jars locally.");
+      JobPreparationHelper.addDepsJarToDistributedCacheHelper(_fileSystem, job, _pathToDependencyJar);
+    } else {
+      _logger.info("Property '{}' not specified.", JobConfigConstants.PATH_TO_DEPS_JAR);
+    }
+
+    long startTime = System.currentTimeMillis();
+    // Submit the job for execution.
+    job.waitForCompletion(true);
+    if (!job.isSuccessful()) {
+      throw new RuntimeException("Job failed : " + job);
+    }
+
+    _logger.info("Finished pre-processing job in {}ms", (System.currentTimeMillis() - startTime));
+  }
+
+  @Nullable
+  private TableConfig getTableConfig()
+      throws IOException {
+    try (ControllerRestApi controllerRestApi = getControllerRestApi()) {
+      return controllerRestApi != null ? controllerRestApi.getTableConfig() : null;
+    }
+  }
+
+  /**
+   * Can be overridden to provide custom controller Rest API.
+   */
+  @Nullable
+  private ControllerRestApi getControllerRestApi() {
+    return _pushLocations != null ? new DefaultControllerRestApi(_pushLocations, _rawTableName) : null;
+  }
+
+  private void fetchPartitioningConfig() {
+    // Fetch partition info from table config.
+    SegmentPartitionConfig segmentPartitionConfig = _tableConfig.getIndexingConfig().getSegmentPartitionConfig();
+    if (segmentPartitionConfig != null) {
+      Map<String, ColumnPartitionConfig> columnPartitionMap = segmentPartitionConfig.getColumnPartitionMap();
+      Preconditions.checkArgument(columnPartitionMap.size() <= 1, "There should be at most 1 partition setting in the table.");
+      if (columnPartitionMap.size() == 1) {
+        _partitionColumn = columnPartitionMap.keySet().iterator().next();
+        _numberOfPartitions = segmentPartitionConfig.getNumPartitions(_partitionColumn);
+        _partitionFunction = segmentPartitionConfig.getFunctionName(_partitionColumn);
+      }
+    } else {
+      _logger.info("Segment partition config is null for table: {}", _tableConfig.getTableName());
+    }
+  }
+
+  private void fetchSortingConfig() {
+    // Fetch sorting info from table config.
+    IndexingConfig indexingConfig = _tableConfig.getIndexingConfig();
+    List<String> sortedColumns = indexingConfig.getSortedColumn();
+    Preconditions.checkArgument(sortedColumns.size() <= 1, "There should be at most 1 sorted column in the table.");
+    if (sortedColumns.size() == 1) {
+      _sortedColumn = sortedColumns.get(0);
+    }
+  }
+
+  private void fetchResizingConfig() {
+    TableCustomConfig tableCustomConfig = _tableConfig.getCustomConfig();
+    if (tableCustomConfig == null) {
+      _numberOfOutputFiles = 0;
+      return;
+    }
+    Map<String, String> customConfigsMap = tableCustomConfig.getCustomConfigs();
+    if (customConfigsMap != null && customConfigsMap.containsKey("min.num.output.files")) {
+      _numberOfOutputFiles = Integer.parseInt(customConfigsMap.get("min.num.output.files"));
+      Preconditions.checkArgument(_numberOfOutputFiles > 0, String.format("The value of min.num.output.files should be positive! Current value: %s", customConfigsMap.get("min.num.output.files")));
+    } else {
+      _numberOfOutputFiles = 0;
+    }
+  }
+
+  private void setMaxNumRecordsConfigIfSpecified(Job job) {
+    TableCustomConfig tableCustomConfig = _tableConfig.getCustomConfig();
+    if (tableCustomConfig == null) {
+      return;
+    }
+    Map<String, String> customConfigsMap = tableCustomConfig.getCustomConfigs();
+    if (customConfigsMap != null && customConfigsMap.containsKey("max.num.records")) {
+      int maxNumRecords = Integer.parseInt(customConfigsMap.get("max.num.records"));
+      Preconditions.checkArgument(maxNumRecords > 0, "The value of max.num.records should be positive. Current value: " + customConfigsMap.get("max.num.records"));
+      _logger.info("Setting max.num.records to {}", maxNumRecords);
+      job.getConfiguration().set("max.num.records", Integer.toString(maxNumRecords));
+    }
+  }
+
+  /**
+   * Finds the avro file in the input folder, and returns its avro schema
+   * @param inputPathDir Path to input directory
+   * @return Input schema
+   * @throws IOException exception when accessing to IO
+   */
+  private static Schema getSchema(Path inputPathDir)
+      throws IOException {
+    FileSystem fs = FileSystem.get(new Configuration());
+    Schema avroSchema = null;
+    for (FileStatus fileStatus : fs.listStatus(inputPathDir)) {
+      if (fileStatus.isFile() && fileStatus.getPath().getName().endsWith(".avro")) {
+        _logger.info("Extracting schema from " + fileStatus.getPath());
+        avroSchema = extractSchemaFromAvro(fileStatus.getPath());
+        break;
+      }
+    }
+    return avroSchema;
+  }
+
+  /**
+   * Extracts avro schema from avro file
+   * @param avroFile Input avro file
+   * @return Schema in avro file
+   * @throws IOException exception when accessing to IO
+   */
+  private static Schema extractSchemaFromAvro(Path avroFile)
+      throws IOException {
+    DataFileStream<GenericRecord> dataStreamReader = getAvroReader(avroFile);
+    Schema avroSchema = dataStreamReader.getSchema();
+    dataStreamReader.close();
+    return avroSchema;
+  }
+
+  /**
+   * Helper method that returns avro reader for the given avro file.
+   * If file name ends in 'gz' then returns the GZIP version, otherwise gives the regular reader.
+   *
+   * @param avroFile File to read
+   * @return Avro reader for the file.
+   * @throws IOException exception when accessing to IO
+   */
+  private static DataFileStream<GenericRecord> getAvroReader(Path avroFile)
+      throws IOException {
+    FileSystem fs = FileSystem.get(new Configuration());
+    if (avroFile.getName().endsWith("gz")) {
+      return new DataFileStream<>(new GZIPInputStream(fs.open(avroFile)), new GenericDatumReader<>());
+    } else {
+      return new DataFileStream<>(fs.open(avroFile), new GenericDatumReader<>());
+    }
+  }
+
+  private void addSortedColumnField(Schema schema, Set<Schema.Field> fieldSet) {
+    // Sorting is enabled. Adding sorted column value to mapper output key.
+    Schema sortedColumnSchema = schema.getField(_sortedColumn).schema();
+    Schema sortedColumnAsKeySchema;
+    if (sortedColumnSchema.getType().equals(Schema.Type.UNION)) {
+      sortedColumnAsKeySchema = Schema.createUnion(sortedColumnSchema.getTypes());
+    } else if (sortedColumnSchema.getType().equals(Schema.Type.ARRAY)) {
+      sortedColumnAsKeySchema = Schema.createArray(sortedColumnSchema.getElementType());
+    } else {
+      sortedColumnAsKeySchema = Schema.create(sortedColumnSchema.getType());
+    }
+    Schema.Field columnField = new Schema.Field(_sortedColumn, sortedColumnAsKeySchema, "sortedColumn", null);
+    fieldSet.add(columnField);
+  }
+
+  private void validateConfigsAgainstSchema(Schema schema) {
+    if (_enablePartitioning) {
+      Preconditions.checkArgument(_partitionColumn != null, "Partition column should not be null!");
+      Preconditions.checkArgument(schema.getField(_partitionColumn) != null, String
+          .format("Partition column: %s is not found from the schema of input files.", _partitionColumn));
+      Preconditions.checkArgument(_numberOfPartitions > 0, String.format("Number of partitions should be positive. Current value: %s", _numberOfPartitions));
+      Preconditions.checkArgument(_partitionFunction != null, "Partition function should not be null!");
+    }
+    if (_enableSorting) {
+      Preconditions.checkArgument(_sortedColumn != null, "Sorted column should not be null!");
+      Preconditions.checkArgument(schema.getField(_sortedColumn) != null, String
+          .format("Sorted column: %s is not found from the schema of input files.", _sortedColumn));
+    }
+  }
+
+  private void addHashCodeField(Set<Schema.Field> fieldSet) {
+    Schema.Field hashCodeField = new Schema.Field("hashcode", Schema.create(Schema.Type.INT), "hashcode", null);
+    fieldSet.add(hashCodeField);
+  }
+
+  @Override
+  protected boolean isDataFile(String fileName) {
+    // TODO: support orc format in the future.
+    return fileName.endsWith(".avro");
+  }
+
+  void cleanup() {
+    _logger.info("Clean up pre-processing output path: {}", _preprocessedOutputDir);
+    try {
+      _fileSystem.delete(_preprocessedOutputDir, true);
+    } catch (IOException e) {
+      _logger.error("Failed to clean up pre-processing output path: {}", _preprocessedOutputDir);
+    }
+  }
+}
diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mapper/SegmentCreationMapper.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java
similarity index 99%
rename from pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mapper/SegmentCreationMapper.java
rename to pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java
index 2fe3537..ccf752c 100644
--- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mapper/SegmentCreationMapper.java
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.hadoop.job.mapper;
+package org.apache.pinot.hadoop.job.mappers;
 
 import com.google.common.base.Preconditions;
 import java.io.File;
diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentPreprocessingMapper.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentPreprocessingMapper.java
new file mode 100644
index 0000000..2c4d012
--- /dev/null
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentPreprocessingMapper.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.hadoop.job.mappers;
+
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapred.AvroValue;
+import org.apache.avro.mapreduce.AvroJob;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.pinot.hadoop.job.JobConfigConstants.*;
+
+
+public class SegmentPreprocessingMapper extends Mapper<AvroKey<GenericRecord>, NullWritable, AvroKey<GenericRecord>, AvroValue<GenericRecord>> {
+  private static final Logger LOGGER = LoggerFactory.getLogger(SegmentPreprocessingMapper.class);
+  private String _sortedColumn = null;
+  private Schema _outputKeySchema;
+  private Schema _outputSchema;
+  private boolean _enablePartition = false;
+
+  @Override
+  public void setup(final Context context) {
+    Configuration configuration = context.getConfiguration();
+
+    String sortedColumn = configuration.get("sorted.column");
+    // Logging the configs for the mapper
+    LOGGER.info("Sorted Column: " + sortedColumn);
+    if (sortedColumn != null) {
+      _sortedColumn = sortedColumn;
+    }
+    _outputKeySchema = AvroJob.getMapOutputKeySchema(configuration);
+    _outputSchema = AvroJob.getMapOutputValueSchema(configuration);
+    _enablePartition = Boolean.parseBoolean(configuration.get(ENABLE_PARTITIONING));
+    LOGGER.info("Enable partitioning? " + _enablePartition);
+  }
+
+  @Override
+  public void map(AvroKey<GenericRecord> record, NullWritable value, final Context context)
+      throws IOException, InterruptedException {
+    final GenericRecord inputRecord = record.datum();
+    final Schema schema = inputRecord.getSchema();
+    Preconditions.checkArgument(_outputSchema.equals(schema), "The schema of all avro files should be the same!");
+
+    GenericRecord outputKey = new GenericData.Record(_outputKeySchema);
+    if (_sortedColumn == null) {
+      outputKey.put("hashcode", inputRecord.hashCode());
+    } else if (_enablePartition) {
+      outputKey.put(_sortedColumn, inputRecord.get(_sortedColumn));
+    } else {
+      outputKey.put(_sortedColumn, inputRecord.get(_sortedColumn));
+      outputKey.put("hashcode", inputRecord.hashCode());
+    }
+
+    try {
+      context.write(new AvroKey<>(outputKey), new AvroValue<>(inputRecord));
+    } catch (Exception e) {
+      LOGGER.error("Exception when writing context on mapper!");
+      throw e;
+    }
+  }
+}
diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/GenericPartitioner.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/GenericPartitioner.java
new file mode 100644
index 0000000..43d1396
--- /dev/null
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/GenericPartitioner.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.hadoop.job.partitioners;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.mapred.AvroValue;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.pinot.core.data.partition.PartitionFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class GenericPartitioner<T> extends Partitioner<T, AvroValue<GenericRecord>> implements Configurable {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(GenericPartitioner.class);
+  private Configuration _configuration;
+  private String _partitionColumn;
+  private int _numPartitions;
+  private PartitionFunction _partitionFunction;
+
+  @Override
+  public void setConf(Configuration conf) {
+    _configuration = conf;
+    _partitionColumn = _configuration.get("partition.column");
+    _numPartitions = Integer.parseInt(_configuration.get("num.partitions"));
+    _partitionFunction =
+        PartitionFunctionFactory.getPartitionFunction(_configuration.get("partition.function", null), _numPartitions);
+
+    LOGGER.info("The partition function is: " + _partitionFunction.getClass().getName());
+    LOGGER.info("The partition column is: " + _partitionColumn);
+    LOGGER.info("Total number of partitions is: " + _numPartitions);
+  }
+
+  @Override
+  public Configuration getConf() {
+    return _configuration;
+  }
+
+  @Override
+  public int getPartition(T genericRecordAvroKey, AvroValue<GenericRecord> genericRecordAvroValue, int numPartitions) {
+    final GenericRecord inputRecord = genericRecordAvroValue.datum();
+    final Object partitionColumnValue = inputRecord.get(_partitionColumn);
+    return _partitionFunction.getPartition(partitionColumnValue);
+  }
+}
diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/PartitionFunctionFactory.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/PartitionFunctionFactory.java
new file mode 100644
index 0000000..54c0bbc
--- /dev/null
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/PartitionFunctionFactory.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.hadoop.job.partitioners;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pinot.core.data.partition.ByteArrayPartitionFunction;
+import org.apache.pinot.core.data.partition.HashCodePartitionFunction;
+import org.apache.pinot.core.data.partition.ModuloPartitionFunction;
+import org.apache.pinot.core.data.partition.MurmurPartitionFunction;
+import org.apache.pinot.core.data.partition.PartitionFunction;
+
+
+public class PartitionFunctionFactory {
+
+  private PartitionFunctionFactory() {
+  }
+
+  public enum PartitionFunctionType {
+    Modulo,
+    Murmur,
+    HashCode,
+    ByteArray;
+    // Add more functions here.
+
+    private static final Map<String, PartitionFunctionType> VALUE_MAP = new HashMap<>();
+
+    static {
+      for (PartitionFunctionType functionType : PartitionFunctionType.values()) {
+        VALUE_MAP.put(functionType.name().toLowerCase(), functionType);
+      }
+    }
+
+    public static PartitionFunctionType fromString(String name) {
+      PartitionFunctionType functionType = VALUE_MAP.get(name.toLowerCase());
+
+      if (functionType == null) {
+        throw new IllegalArgumentException("No enum constant for: " + name);
+      }
+      return functionType;
+    }
+  }
+
+  /**
+   * This method generates and returns a partition function based on the provided string.
+   *
+   * @param functionName Name of partition function
+   * @return Partition function
+   */
+  public static PartitionFunction getPartitionFunction(String functionName, int numPartitions) {
+
+    // Return default partition function if there is no configuration given.
+    if (functionName == null) {
+      return new MurmurPartitionFunction(numPartitions);
+    }
+
+    PartitionFunctionType functionType;
+    try {
+      functionType = PartitionFunctionType.fromString(functionName);
+    } catch (IllegalArgumentException e) {
+      // By default, we use murmur
+      functionType = PartitionFunctionType.Murmur;
+    }
+
+    switch (functionType) {
+      case Modulo:
+        return new ModuloPartitionFunction(numPartitions);
+
+      case Murmur:
+        return new MurmurPartitionFunction(numPartitions);
+
+      case HashCode:
+        return new HashCodePartitionFunction(numPartitions);
+
+      case ByteArray:
+        return new ByteArrayPartitionFunction(numPartitions);
+
+      default:
+        return new MurmurPartitionFunction(numPartitions);
+    }
+  }
+}
diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/SegmentPreprocessingReducer.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/SegmentPreprocessingReducer.java
new file mode 100644
index 0000000..d07e98f
--- /dev/null
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/SegmentPreprocessingReducer.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.hadoop.job.reducers;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapred.AvroValue;
+import org.apache.avro.mapreduce.AvroMultipleOutputs;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class SegmentPreprocessingReducer<T> extends Reducer<T, AvroValue<GenericRecord>, AvroKey<GenericRecord>, NullWritable> {
+  private static final Logger LOGGER = LoggerFactory.getLogger(SegmentPreprocessingReducer.class);
+
+  private AvroMultipleOutputs _multipleOutputs;
+  private AtomicInteger _counter;
+  private int _maxNumberOfRecords;
+  private String _filePrefix;
+
+  @Override
+  public void setup(Context context) {
+    LOGGER.info("Using multiple outputs strategy.");
+    Configuration configuration = context.getConfiguration();
+    _multipleOutputs = new AvroMultipleOutputs(context);
+    _counter = new AtomicInteger();
+    // If it's 0, the output file won't be split into multiple files.
+    // If not, output file will be split when the number of records reaches this number.
+    _maxNumberOfRecords = configuration.getInt("max.num.records", 0);
+    LOGGER.info("Maximum number of records per file: {}", _maxNumberOfRecords);
+    _filePrefix = RandomStringUtils.randomAlphanumeric(4);
+  }
+
+  @Override
+  public void reduce(final T inputRecord, final Iterable<AvroValue<GenericRecord>> values, final Context context)
+      throws IOException, InterruptedException {
+    for (final AvroValue<GenericRecord> value : values) {
+      String fileName = generateFileName();
+      _multipleOutputs.write(new AvroKey<>(value.datum()), NullWritable.get(), fileName);
+    }
+  }
+
+  @Override
+  public void cleanup(Context context)
+      throws IOException, InterruptedException {
+    LOGGER.info("Clean up reducer.");
+    if (_multipleOutputs != null) {
+      _multipleOutputs.close();
+      _multipleOutputs = null;
+    }
+    LOGGER.info("Finished cleaning up reducer.");
+  }
+
+  private String generateFileName() {
+    if (_maxNumberOfRecords == 0) {
+      return _filePrefix;
+    } else {
+      return _filePrefix + (_counter.getAndIncrement() / _maxNumberOfRecords);
+    }
+  }
+}
+
diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/JobPreparationHelper.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/JobPreparationHelper.java
new file mode 100644
index 0000000..bd726d2
--- /dev/null
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/JobPreparationHelper.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.hadoop.utils;
+
+import java.io.IOException;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.mapreduce.Job;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class JobPreparationHelper {
+  private static final Logger _logger = LoggerFactory.getLogger(JobPreparationHelper.class);
+
+  public static void addDepsJarToDistributedCacheHelper(FileSystem fileSystem, Job job, Path depsJarDir)
+      throws IOException {
+    FileStatus[] fileStatuses = fileSystem.listStatus(depsJarDir);
+    for (FileStatus fileStatus : fileStatuses) {
+      if (fileStatus.isDirectory()) {
+        addDepsJarToDistributedCacheHelper(fileSystem, job, fileStatus.getPath());
+      } else {
+        Path depJarPath = fileStatus.getPath();
+        if (depJarPath.getName().endsWith(".jar")) {
+          _logger.info("Adding deps jar: {} to distributed cache", depJarPath);
+          job.addCacheArchive(depJarPath.toUri());
+        }
+      }
+    }
+  }
+
+  public static void setDirPermission(FileSystem fileSystem, Path dirPath, String defaultPermissionsMask)
+      throws IOException {
+    if (defaultPermissionsMask != null) {
+      FsPermission permission = FsPermission.getDirDefault().applyUMask(new FsPermission(defaultPermissionsMask));
+      _logger.info("Setting permission: {} to directory: {}", permission, dirPath);
+      fileSystem.setPermission(dirPath, permission);
+    }
+  }
+}
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
index 21c2140..914adda 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
@@ -24,13 +24,17 @@ import java.net.URL;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.Executor;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import kafka.server.KafkaServerStartable;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.client.ConnectionFactory;
+import org.apache.pinot.common.config.ColumnPartitionConfig;
+import org.apache.pinot.common.config.SegmentPartitionConfig;
 import org.apache.pinot.common.config.TableTaskConfig;
 import org.apache.pinot.common.config.TagNameUtils;
 import org.apache.pinot.common.utils.TarGzCompressionUtils;
@@ -67,6 +71,7 @@ public abstract class BaseClusterIntegrationTest extends ClusterTest {
 
   protected final File _tempDir = new File(FileUtils.getTempDirectory(), getClass().getSimpleName());
   protected final File _avroDir = new File(_tempDir, "avroDir");
+  protected final File _preprocessingDir = new File(_tempDir, "preprocessingDir");
   protected final File _segmentDir = new File(_tempDir, "segmentDir");
   protected final File _tarDir = new File(_tempDir, "tarDir");
   protected List<KafkaServerStartable> _kafkaStarters;
@@ -188,6 +193,16 @@ public abstract class BaseClusterIntegrationTest extends ClusterTest {
     return TagNameUtils.DEFAULT_TENANT_NAME;
   }
 
+  protected SegmentPartitionConfig getSegmentPartitionConfig() {
+
+    ColumnPartitionConfig columnPartitionConfig = new ColumnPartitionConfig("murmur", 2);
+    Map<String, ColumnPartitionConfig> columnPartitionConfigMap = new HashMap<>();
+    columnPartitionConfigMap.put("AirlineID", columnPartitionConfig);
+
+    SegmentPartitionConfig segmentPartitionConfig = new SegmentPartitionConfig(columnPartitionConfigMap);
+    return segmentPartitionConfig;
+  }
+
   /**
    * Get the Pinot connection.
    *
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
index 0457159..d8fb9b9 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
@@ -46,6 +46,7 @@ import org.apache.commons.io.FileUtils;
 import org.apache.http.HttpStatus;
 import org.apache.pinot.broker.broker.helix.HelixBrokerStarter;
 import org.apache.pinot.common.config.IndexingConfig;
+import org.apache.pinot.common.config.SegmentPartitionConfig;
 import org.apache.pinot.common.config.TableConfig;
 import org.apache.pinot.common.config.TableNameBuilder;
 import org.apache.pinot.common.config.TableTaskConfig;
@@ -307,16 +308,17 @@ public abstract class ClusterTest extends ControllerTest {
 
   protected void addOfflineTable(String tableName, SegmentVersion segmentVersion)
       throws Exception {
-    addOfflineTable(tableName, null, null, null, null, null, segmentVersion, null, null, null);
+    addOfflineTable(tableName, null, null, null, null, null, segmentVersion, null, null, null, null, null);
   }
 
   protected void addOfflineTable(String tableName, String timeColumnName, String timeType, String brokerTenant,
       String serverTenant, String loadMode, SegmentVersion segmentVersion, List<String> invertedIndexColumns,
-      List<String> bloomFilterColumns, TableTaskConfig taskConfig)
+      List<String> bloomFilterColumns, TableTaskConfig taskConfig, SegmentPartitionConfig segmentPartitionConfig,
+      String sortedColumn)
       throws Exception {
     TableConfig tableConfig =
         getOfflineTableConfig(tableName, timeColumnName, timeType, brokerTenant, serverTenant, loadMode, segmentVersion,
-            invertedIndexColumns, bloomFilterColumns, taskConfig);
+            invertedIndexColumns, bloomFilterColumns, taskConfig, segmentPartitionConfig, sortedColumn);
 
     if (!isUsingNewConfigFormat()) {
       sendPostRequest(_controllerRequestURLBuilder.forTableCreate(), tableConfig.toJsonConfigString());
@@ -327,11 +329,12 @@ public abstract class ClusterTest extends ControllerTest {
 
   protected void updateOfflineTable(String tableName, String timeColumnName, String timeType, String brokerTenant,
       String serverTenant, String loadMode, SegmentVersion segmentVersion, List<String> invertedIndexColumns,
-      List<String> bloomFilterColumns, TableTaskConfig taskConfig)
+      List<String> bloomFilterColumns, TableTaskConfig taskConfig, SegmentPartitionConfig segmentPartitionConfig,
+      String sortedColumn)
       throws Exception {
     TableConfig tableConfig =
         getOfflineTableConfig(tableName, timeColumnName, timeType, brokerTenant, serverTenant, loadMode, segmentVersion,
-            invertedIndexColumns, bloomFilterColumns, taskConfig);
+            invertedIndexColumns, bloomFilterColumns, taskConfig, segmentPartitionConfig, sortedColumn);
 
     if (!isUsingNewConfigFormat()) {
       sendPutRequest(_controllerRequestURLBuilder.forUpdateTableConfig(tableName), tableConfig.toJsonConfigString());
@@ -342,12 +345,14 @@ public abstract class ClusterTest extends ControllerTest {
 
   private static TableConfig getOfflineTableConfig(String tableName, String timeColumnName, String timeType,
       String brokerTenant, String serverTenant, String loadMode, SegmentVersion segmentVersion,
-      List<String> invertedIndexColumns, List<String> bloomFilterColumns, TableTaskConfig taskConfig) {
+      List<String> invertedIndexColumns, List<String> bloomFilterColumns, TableTaskConfig taskConfig,
+      SegmentPartitionConfig segmentPartitionConfig, String sortedColumn) {
     return new TableConfig.Builder(Helix.TableType.OFFLINE).setTableName(tableName).setTimeColumnName(timeColumnName)
         .setTimeType(timeType).setNumReplicas(3).setBrokerTenant(brokerTenant).setServerTenant(serverTenant)
         .setLoadMode(loadMode).setSegmentVersion(segmentVersion.toString())
         .setInvertedIndexColumns(invertedIndexColumns).setBloomFilterColumns(bloomFilterColumns)
-        .setTaskConfig(taskConfig).build();
+        .setTaskConfig(taskConfig).setSegmentPartitionConfig(segmentPartitionConfig).setSortedColumn(sortedColumn)
+        .build();
   }
 
   protected void dropOfflineTable(String tableName)
@@ -481,10 +486,10 @@ public abstract class ClusterTest extends ControllerTest {
       String kafkaTopic, int realtimeSegmentFlushSize, File avroFile, String timeColumnName, String timeType,
       String schemaName, String brokerTenant, String serverTenant, String loadMode, String sortedColumn,
       List<String> invertedIndexColumns, List<String> bloomFilterColumns, List<String> noDictionaryColumns,
-      TableTaskConfig taskConfig, String streamConsumerFactoryName)
+      TableTaskConfig taskConfig, String streamConsumerFactoryName, SegmentPartitionConfig segmentPartitionConfig)
       throws Exception {
     addOfflineTable(tableName, timeColumnName, timeType, brokerTenant, serverTenant, loadMode, SegmentVersion.v1,
-        invertedIndexColumns, bloomFilterColumns, taskConfig);
+        invertedIndexColumns, bloomFilterColumns, taskConfig, segmentPartitionConfig, sortedColumn);
     addRealtimeTable(tableName, useLlc, kafkaBrokerList, kafkaZkUrl, kafkaTopic, realtimeSegmentFlushSize, avroFile,
         timeColumnName, timeType, schemaName, brokerTenant, serverTenant, loadMode, sortedColumn, invertedIndexColumns,
         bloomFilterColumns, noDictionaryColumns, taskConfig, streamConsumerFactoryName);
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTests.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTests.java
index 8c9ffe5..fc14789 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTests.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTests.java
@@ -161,7 +161,7 @@ public class ControllerPeriodicTasksIntegrationTests extends BaseClusterIntegrat
    */
   private void setupOfflineTable(String table) throws Exception {
     _realtimeTableConfig = null;
-    addOfflineTable(table, null, null, TENANT_NAME, TENANT_NAME, null, SegmentVersion.v1, null, null, null);
+    addOfflineTable(table, null, null, TENANT_NAME, TENANT_NAME, null, SegmentVersion.v1, null, null, null, null, null);
     completeTableConfiguration();
   }
 
@@ -184,7 +184,7 @@ public class ControllerPeriodicTasksIntegrationTests extends BaseClusterIntegrat
     Assert.assertNotNull(outgoingTimeUnit);
     String timeType = outgoingTimeUnit.toString();
 
-    addOfflineTable(tableName, timeColumnName, timeType, TENANT_NAME, TENANT_NAME, null, SegmentVersion.v1, null, null, null);
+    addOfflineTable(tableName, timeColumnName, timeType, TENANT_NAME, TENANT_NAME, null, SegmentVersion.v1, null, null, null, null, null);
     completeTableConfiguration();
 
     ExecutorService executor = Executors.newCachedThreadPool();
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HadoopSegmentBuildPushOfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HadoopSegmentBuildPushOfflineClusterIntegrationTest.java
index 36e083c..5bb9b65 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HadoopSegmentBuildPushOfflineClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HadoopSegmentBuildPushOfflineClusterIntegrationTest.java
@@ -19,26 +19,48 @@
 package org.apache.pinot.integration.tests;
 
 import java.io.File;
+import java.io.IOException;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
+import org.apache.pinot.common.config.ColumnPartitionConfig;
+import org.apache.pinot.common.config.SegmentPartitionConfig;
 import org.apache.pinot.common.data.Schema;
+import org.apache.pinot.core.data.partition.PartitionFunction;
+import org.apache.pinot.core.data.partition.PartitionFunctionFactory;
 import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
 import org.apache.pinot.hadoop.job.JobConfigConstants;
+import org.apache.pinot.hadoop.job.SegmentPreprocessingJob;
 import org.apache.pinot.hadoop.job.SegmentCreationJob;
 import org.apache.pinot.hadoop.job.SegmentTarPushJob;
 import org.apache.pinot.util.TestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+import static org.apache.pinot.hadoop.job.JobConfigConstants.*;
+
 
 public class HadoopSegmentBuildPushOfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet {
+  private static final Logger LOGGER = LoggerFactory.getLogger(HadoopSegmentBuildPushOfflineClusterIntegrationTest.class);
   private static final int NUM_BROKERS = 1;
   private static final int NUM_SERVERS = 1;
 
@@ -66,7 +88,7 @@ public class HadoopSegmentBuildPushOfflineClusterIntegrationTest extends BaseClu
 
     // Start the MR Yarn cluster
     final Configuration conf = new Configuration();
-    _mrCluster = new MiniMRYarnCluster(getClass().getName(), 1);
+    _mrCluster = new MiniMRYarnCluster(getClass().getName(), 2);
     _mrCluster.init(conf);
     _mrCluster.start();
 
@@ -93,7 +115,7 @@ public class HadoopSegmentBuildPushOfflineClusterIntegrationTest extends BaseClu
 
     // Create the table
     addOfflineTable(getTableName(), _schema.getTimeColumnName(), _schema.getOutgoingTimeUnit().toString(), null, null,
-        getLoadMode(), SegmentVersion.v3, getInvertedIndexColumns(), getBloomFilterIndexColumns(), getTaskConfig());
+        getLoadMode(), SegmentVersion.v3, getInvertedIndexColumns(), getBloomFilterIndexColumns(), getTaskConfig(), getSegmentPartitionConfig(), getSortedColumn());
 
     // Generate and push Pinot segments from Hadoop
     generateAndPushSegmentsFromHadoop();
@@ -154,6 +176,25 @@ public class HadoopSegmentBuildPushOfflineClusterIntegrationTest extends BaseClu
     properties.setProperty(JobConfigConstants.PUSH_TO_HOSTS, getDefaultControllerConfiguration().getControllerHost());
     properties.setProperty(JobConfigConstants.PUSH_TO_PORT, getDefaultControllerConfiguration().getControllerPort());
 
+    Properties preComputeProperties = new Properties();
+    preComputeProperties.putAll(properties);
+    preComputeProperties.setProperty(ENABLE_PARTITIONING, Boolean.TRUE.toString());
+    preComputeProperties.setProperty(ENABLE_SORTING, Boolean.TRUE.toString());
+
+    preComputeProperties.setProperty(JobConfigConstants.PATH_TO_INPUT, _avroDir.getPath());
+    preComputeProperties.setProperty(JobConfigConstants.PREPROCESS_PATH_TO_OUTPUT, _preprocessingDir.getPath());
+    properties.setProperty(JobConfigConstants.PATH_TO_INPUT, _preprocessingDir.getPath());
+
+    // Run segment pre-processing job
+    SegmentPreprocessingJob segmentPreprocessingJob = new SegmentPreprocessingJob(preComputeProperties);
+    Configuration preComputeConfig = _mrCluster.getConfig();
+    segmentPreprocessingJob.setConf(preComputeConfig);
+    segmentPreprocessingJob.run();
+    LOGGER.info("Segment preprocessing job finished.");
+
+    // Verify partitioning and sorting.
+    verifyPreprocessingJob(preComputeConfig);
+
     // Run segment creation job
     SegmentCreationJob creationJob = new SegmentCreationJob(properties);
     Configuration config = _mrCluster.getConfig();
@@ -165,4 +206,46 @@ public class HadoopSegmentBuildPushOfflineClusterIntegrationTest extends BaseClu
     pushJob.setConf(_mrCluster.getConfig());
     pushJob.run();
   }
+
+  private void verifyPreprocessingJob(Configuration preComputeConfig) throws IOException {
+    // Fetch partitioning config and sorting config.
+    SegmentPartitionConfig segmentPartitionConfig = getSegmentPartitionConfig();
+    Map.Entry<String, ColumnPartitionConfig>
+        entry = segmentPartitionConfig.getColumnPartitionMap().entrySet().iterator().next();
+    String partitionColumn = entry.getKey();
+    String partitionFunctionString = entry.getValue().getFunctionName();
+    int numPartitions = entry.getValue().getNumPartitions();
+    PartitionFunction partitionFunction = PartitionFunctionFactory.getPartitionFunction(partitionFunctionString, numPartitions);
+    String sortedColumn = getSortedColumn();
+
+    // Get output files.
+    FileSystem fileSystem = FileSystem.get(preComputeConfig);
+    FileStatus[] fileStatuses = fileSystem.listStatus(new Path(_preprocessingDir.getPath()));
+    Assert.assertEquals(fileStatuses.length, numPartitions, "Number of output file should be the same as the number of partitions.");
+
+    Set<Integer> partitionIdSet = new HashSet<>();
+    Object previousObject;
+    for (FileStatus fileStatus : fileStatuses) {
+      Path avroFile = fileStatus.getPath();
+      DataFileStream<GenericRecord> dataFileStream = new DataFileStream<>(fileSystem.open(avroFile), new GenericDatumReader<>());
+
+      // Reset hash set and previous object
+      partitionIdSet.clear();
+      previousObject = null;
+      while (dataFileStream.hasNext()) {
+        GenericRecord genericRecord = dataFileStream.next();
+        partitionIdSet.add(partitionFunction.getPartition(genericRecord.get(partitionColumn)));
+        Assert.assertEquals(partitionIdSet.size(), 1, "Partition Id should be the same within a file.");
+        org.apache.avro.Schema sortedColumnSchema = genericRecord.getSchema().getField(sortedColumn).schema();
+        Object currentObject = genericRecord.get(sortedColumn);
+        if (previousObject == null) {
+          previousObject = currentObject;
+          continue;
+        }
+        // The values of sorted column should be sorted in ascending order.
+        Assert.assertTrue(GenericData.get().compare(previousObject, currentObject, sortedColumnSchema) <= 0);
+        previousObject = currentObject;
+      }
+    }
+  }
 }
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java
index f9ca74a..875969f 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java
@@ -135,7 +135,7 @@ public class HybridClusterIntegrationTest extends BaseClusterIntegrationTestSet
     addHybridTable(getTableName(), useLlc(), KafkaStarterUtils.DEFAULT_KAFKA_BROKER, KafkaStarterUtils.DEFAULT_ZK_STR,
         getKafkaTopic(), getRealtimeSegmentFlushSize(), avroFile, timeColumnName, timeType, schemaName, TENANT_NAME,
         TENANT_NAME, getLoadMode(), getSortedColumn(), getInvertedIndexColumns(), getBloomFilterIndexColumns(),
-        getRawIndexColumns(), getTaskConfig(), getStreamConsumerFactoryClassName());
+        getRawIndexColumns(), getTaskConfig(), getStreamConsumerFactoryClassName(), getSegmentPartitionConfig());
 
     completeTableConfiguration();
   }
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTestCommandLineRunner.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTestCommandLineRunner.java
index 6e5474f..c1f77d8 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTestCommandLineRunner.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTestCommandLineRunner.java
@@ -295,7 +295,7 @@ public class HybridClusterIntegrationTestCommandLineRunner {
       String timeType = outgoingTimeUnit.toString();
       addHybridTable(_tableName, _useLlc, KAFKA_BROKER, KAFKA_ZK_STR, getKafkaTopic(), getRealtimeSegmentFlushSize(),
           _realtimeAvroFiles.get(0), timeColumnName, timeType, schemaName, TENANT_NAME, TENANT_NAME, "MMAP",
-          _sortedColumn, _invertedIndexColumns, null, null, null, getStreamConsumerFactoryClassName());
+          _sortedColumn, _invertedIndexColumns, null, null, null, getStreamConsumerFactoryClassName(), null);
 
       // Upload all segments
       uploadSegments(_tarDir);
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index 8d9d144..75a69a8 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -120,7 +120,7 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
 
     // Create the table
     addOfflineTable(getTableName(), null, null, null, null, getLoadMode(), SegmentVersion.v1, getInvertedIndexColumns(),
-        getBloomFilterIndexColumns(), getTaskConfig());
+        getBloomFilterIndexColumns(), getTaskConfig(), null, null);
 
     completeTableConfiguration();
 
@@ -225,7 +225,7 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
 
     // Update table config and trigger reload
     updateOfflineTable(getTableName(), null, null, null, null, getLoadMode(), SegmentVersion.v1,
-        UPDATED_INVERTED_INDEX_COLUMNS, null, getTaskConfig());
+        UPDATED_INVERTED_INDEX_COLUMNS, null, getTaskConfig(), null, null);
 
     updateTableConfiguration();
 
@@ -252,7 +252,7 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
 
     // Update table config and trigger reload
     updateOfflineTable(getTableName(), null, null, null, null, getLoadMode(), SegmentVersion.v1, null,
-        UPDATED_BLOOM_FILTER_COLUMNS, getTaskConfig());
+        UPDATED_BLOOM_FILTER_COLUMNS, getTaskConfig(), null, null);
 
     updateTableConfiguration();
 
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
index 438655a..ad50f05 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
@@ -86,9 +86,9 @@ public class SimpleMinionClusterIntegrationTest extends ClusterTest {
     Map<String, Map<String, String>> taskTypeConfigsMap = new HashMap<>();
     taskTypeConfigsMap.put(TestTaskGenerator.TASK_TYPE, Collections.emptyMap());
     taskConfig.setTaskTypeConfigsMap(taskTypeConfigsMap);
-    addOfflineTable(TABLE_NAME_1, null, null, null, null, null, SegmentVersion.v1, null, null, taskConfig);
-    addOfflineTable(TABLE_NAME_2, null, null, null, null, null, SegmentVersion.v1, null, null, taskConfig);
-    addOfflineTable(TABLE_NAME_3, null, null, null, null, null, SegmentVersion.v1, null, null, null);
+    addOfflineTable(TABLE_NAME_1, null, null, null, null, null, SegmentVersion.v1, null, null, taskConfig, null, null);
+    addOfflineTable(TABLE_NAME_2, null, null, null, null, null, SegmentVersion.v1, null, null, taskConfig, null, null);
+    addOfflineTable(TABLE_NAME_3, null, null, null, null, null, SegmentVersion.v1, null, null, null, null, null);
 
     _helixTaskResourceManager = _controllerStarter.getHelixTaskResourceManager();
     _taskManager = _controllerStarter.getTaskManager();
diff --git a/pom.xml b/pom.xml
index 95bd02c..4e27925 100644
--- a/pom.xml
+++ b/pom.xml
@@ -668,6 +668,12 @@
         </exclusions>
       </dependency>
       <dependency>
+        <groupId>org.apache.avro</groupId>
+        <artifactId>avro-mapred</artifactId>
+        <version>1.7.4</version>
+        <classifier>hadoop2</classifier>
+      </dependency>
+      <dependency>
         <groupId>org.codehaus.jackson</groupId>
         <artifactId>jackson-core-asl</artifactId>
         <version>1.9.13</version>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org