You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by je...@apache.org on 2019/07/23 19:52:50 UTC

[incubator-pinot] branch inputPathCompat created (now a4d2754)

This is an automated email from the ASF dual-hosted git repository.

jenniferdai pushed a change to branch inputPathCompat
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git.


      at a4d2754  Compatible resizing

This branch includes the following new commits:

     new a4d2754  Compatible resizing

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 01/01: Compatible resizing

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jenniferdai pushed a commit to branch inputPathCompat
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit a4d2754778715e0c3182d93fe267df26c423830f
Author: Jennifer Dai <jd...@linkedin.com>
AuthorDate: Tue Jul 23 12:52:34 2019 -0700

    Compatible resizing
---
 .../name/NormalizedDateSegmentNameGenerator.java   |  18 +-
 .../pinot/hadoop/job/InternalConfigConstants.java  |  21 ++
 .../pinot/hadoop/job/SegmentPreprocessingJob.java  | 216 ++++++++++++++++++++-
 .../job/mappers/SegmentPreprocessingMapper.java    |  41 +++-
 .../job/partitioners/GenericPartitioner.java       |   8 +-
 ...mentBuildPushOfflineClusterIntegrationTest.java |  87 ++++++++-
 6 files changed, 376 insertions(+), 15 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/name/NormalizedDateSegmentNameGenerator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/name/NormalizedDateSegmentNameGenerator.java
index eadd3d9..ff1ecc1 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/name/NormalizedDateSegmentNameGenerator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/name/NormalizedDateSegmentNameGenerator.java
@@ -32,16 +32,20 @@ import org.apache.pinot.common.data.TimeGranularitySpec.TimeFormat;
  * Segment name generator that normalizes the date to human readable format.
  */
 public class NormalizedDateSegmentNameGenerator implements SegmentNameGenerator {
-  private final String _segmentNamePrefix;
-  private final boolean _excludeSequenceId;
-  private final boolean _appendPushType;
+  private String _segmentNamePrefix;
+  private boolean _excludeSequenceId;
+  private boolean _appendPushType;
 
   // For APPEND tables
-  private final SimpleDateFormat _outputSDF;
+  private SimpleDateFormat _outputSDF;
   // For EPOCH time format
-  private final TimeUnit _inputTimeUnit;
+  private TimeUnit _inputTimeUnit;
   // For SIMPLE_DATE_FORMAT time format
-  private final SimpleDateFormat _inputSDF;
+  private SimpleDateFormat _inputSDF;
+
+  public NormalizedDateSegmentNameGenerator(@Nullable String pushFrequency, @Nullable TimeUnit timeType, @Nullable String timeFormat) {
+    new NormalizedDateSegmentNameGenerator("myTable", null, false, "APPEND", pushFrequency, timeType, timeFormat);
+  }
 
   public NormalizedDateSegmentNameGenerator(String tableName, @Nullable String segmentNamePrefix,
       boolean excludeSequenceId, @Nullable String pushType, @Nullable String pushFrequency, @Nullable TimeUnit timeType,
@@ -98,7 +102,7 @@ public class NormalizedDateSegmentNameGenerator implements SegmentNameGenerator
    * @param timeValue Time value
    * @return Normalized date string
    */
-  private String getNormalizedDate(Object timeValue) {
+  public String getNormalizedDate(Object timeValue) {
     if (_inputTimeUnit != null) {
       return _outputSDF.format(new Date(_inputTimeUnit.toMillis(Long.parseLong(timeValue.toString()))));
     } else {
diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/InternalConfigConstants.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/InternalConfigConstants.java
new file mode 100644
index 0000000..d674742
--- /dev/null
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/InternalConfigConstants.java
@@ -0,0 +1,21 @@
+package org.apache.pinot.hadoop.job;
+
+/**
+ * Internal-only constants for Hadoop MapReduce jobs. These constants are propagated across different segment creation
+ * jobs. They are not meant to be set externally.
+ */
+public class InternalConfigConstants {
+  public static final String TIME_COLUMN_CONFIG = "time.column";
+  public static final String TIME_COLUMN_VALUE = "time.column.value";
+  public static final String IS_APPEND = "is.append";
+  public static final String SEGMENT_PUSH_FREQUENCY = "segment.push.frequency";
+  public static final String SEGMENT_TIME_TYPE = "segment.time.type";
+  public static final String SEGMENT_TIME_FORMAT = "segment.time.format";
+
+  // Partitioning configs
+  public static final String PARTITION_COLUMN_CONFIG = "partition.column";
+  public static final String NUM_PARTITIONS_CONFIG = "num.partitions";
+  public static final String PARTITION_FUNCTION_CONFIG = "partition.function";
+
+  public static final String SORTED_COLUMN_CONFIG = "sorted.column";
+}
diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentPreprocessingJob.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentPreprocessingJob.java
index a3f3901..364ff08 100644
--- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentPreprocessingJob.java
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentPreprocessingJob.java
@@ -20,6 +20,8 @@ package org.apache.pinot.hadoop.job;
 
 import com.google.common.base.Preconditions;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -30,21 +32,43 @@ import org.apache.avro.Schema;
 import org.apache.avro.file.DataFileStream;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapred.AvroValue;
+import org.apache.avro.mapreduce.AvroJob;
+import org.apache.avro.mapreduce.AvroKeyOutputFormat;
+import org.apache.avro.mapreduce.AvroMultipleOutputs;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.JobContext;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
 import org.apache.pinot.common.config.ColumnPartitionConfig;
 import org.apache.pinot.common.config.IndexingConfig;
 import org.apache.pinot.common.config.SegmentPartitionConfig;
+import org.apache.pinot.common.config.SegmentsValidationAndRetentionConfig;
 import org.apache.pinot.common.config.TableConfig;
 import org.apache.pinot.common.config.TableCustomConfig;
+import org.apache.pinot.hadoop.io.CombineAvroKeyInputFormat;
+import org.apache.pinot.hadoop.job.mappers.SegmentPreprocessingMapper;
+import org.apache.pinot.hadoop.job.partitioners.GenericPartitioner;
+import org.apache.pinot.hadoop.job.reducers.SegmentPreprocessingReducer;
+import org.apache.pinot.hadoop.utils.JobPreparationHelper;
 import org.apache.pinot.hadoop.utils.PushLocation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.hadoop.mapreduce.MRJobConfig.*;
+import static org.apache.hadoop.security.UserGroupInformation.*;
+import static org.apache.pinot.hadoop.job.InternalConfigConstants.*;
+
 
 /**
  * A Hadoop job which provides partitioning, sorting, and resizing against the input files, which is raw data in Avro format.
@@ -79,6 +103,7 @@ public class SegmentPreprocessingJob extends BaseSegmentJob {
   private final String _defaultPermissionsMask;
 
   private TableConfig _tableConfig;
+  private org.apache.pinot.common.data.Schema _schema;
   protected FileSystem _fileSystem;
 
   public SegmentPreprocessingJob(final Properties properties) {
@@ -137,8 +162,195 @@ public class SegmentPreprocessingJob extends BaseSegmentJob {
 
   public void run()
       throws Exception {
-    _logger.info("Pre-processing job is disabled.");
-    return;
+    // TODO: Remove once the job is ready
+    _enablePartitioning = false;
+    _enableSorting = false;
+    _enableResizing = false;
+
+    if (!_enablePartitioning && !_enableSorting && !_enableResizing) {
+      _logger.info("Pre-processing job is disabled.");
+      return;
+    } else {
+      _logger.info("Starting {}", getClass().getSimpleName());
+    }
+
+    _fileSystem = FileSystem.get(_conf);
+    final List<Path> inputDataPath = getDataFilePaths(_inputSegmentDir);
+
+    if (_fileSystem.exists(_preprocessedOutputDir)) {
+      _logger.warn("Found the output folder {}, deleting it", _preprocessedOutputDir);
+      _fileSystem.delete(_preprocessedOutputDir, true);
+    }
+    JobPreparationHelper.setDirPermission(_fileSystem, _preprocessedOutputDir, _defaultPermissionsMask);
+
+    // If push locations, table config, and schema are not configured, this does not necessarily mean that segments
+    // cannot be created. We should allow the user to go to the next step rather than failing the job.
+    if (_pushLocations.isEmpty()) {
+      _logger.error("Push locations cannot be empty. "
+          + "They are needed to get the table config and schema needed for this step. Skipping pre-processing");
+      return;
+    }
+
+    try (ControllerRestApi controllerRestApi = getControllerRestApi()) {
+      _tableConfig = controllerRestApi.getTableConfig();
+      _schema = controllerRestApi.getSchema();
+    }
+
+    if (_tableConfig == null) {
+      _logger.error("Table config cannot be null. Skipping pre-processing");
+      return;
+    }
+
+    if (_schema == null) {
+      _logger.error("Schema cannot be null. Skipping pre-processing");
+    }
+
+    SegmentsValidationAndRetentionConfig validationConfig = _tableConfig.getValidationConfig();
+
+    _logger.info("Initializing a pre-processing job");
+    Job job = Job.getInstance(_conf);
+
+    // TODO: Serialize and deserialize validation config by creating toJson and fromJson
+    // If the use case is an append use case, check that one time unit is contained in one file. If there is more than one,
+    // the job should be disabled, as we should not resize for these use cases. Therefore, setting the time column name
+    // and value
+    if (validationConfig.getSegmentPushType().equalsIgnoreCase("APPEND")) {
+      job.getConfiguration().set(IS_APPEND, "true");
+      String timeColumnName = _schema.getTimeFieldSpec().getName();
+      job.getConfiguration().set(TIME_COLUMN_CONFIG, timeColumnName);
+      job.getConfiguration().set(SEGMENT_TIME_TYPE, validationConfig.getTimeType().toString());
+      job.getConfiguration().set(SEGMENT_TIME_FORMAT, _schema.getTimeFieldSpec().getOutgoingGranularitySpec().getTimeFormat());
+      job.getConfiguration().set(SEGMENT_PUSH_FREQUENCY, validationConfig.getSegmentPushFrequency());
+      DataFileStream<GenericRecord> dataStreamReader = getAvroReader(inputDataPath.get(0));
+      job.getConfiguration().set(TIME_COLUMN_VALUE, (String) dataStreamReader.next().get(timeColumnName));
+      dataStreamReader.close();
+    }
+
+    if (_enablePartitioning) {
+      fetchPartitioningConfig();
+      _logger.info("{}: {}", PARTITION_COLUMN_CONFIG, _partitionColumn);
+      _logger.info("{}: {}", NUM_PARTITIONS_CONFIG, _numberOfPartitions);
+      _logger.info("{}: {}", PARTITION_FUNCTION_CONFIG, _partitionColumn);
+    }
+
+    if (_enableSorting) {
+      fetchSortingConfig();
+      _logger.info("{}: {}", SORTED_COLUMN_CONFIG, _sortedColumn);
+    }
+
+    if (_enableResizing) {
+      fetchResizingConfig();
+      _logger.info("minimum number of output files: {}", _numberOfOutputFiles);
+    }
+
+    job.getConfiguration().set(JobContext.JOB_NAME, this.getClass().getName());
+    // Turn this on to always firstly use class paths that user specifies.
+    job.getConfiguration().set(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, "true");
+    // Turn this off since we don't need an empty file in the output directory
+    job.getConfiguration().set(FileOutputCommitter.SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, "false");
+
+    job.setJarByClass(SegmentPreprocessingJob.class);
+
+    String hadoopTokenFileLocation = System.getenv(HADOOP_TOKEN_FILE_LOCATION);
+    if (hadoopTokenFileLocation != null) {
+      job.getConfiguration().set(MAPREDUCE_JOB_CREDENTIALS_BINARY, hadoopTokenFileLocation);
+    }
+
+    // Schema configs.
+    Schema schema = getSchema(inputDataPath.get(0));
+    _logger.info("Schema is: {}", schema.toString(true));
+
+    // Validates configs against schema.
+    validateConfigsAgainstSchema(schema);
+
+    // Mapper configs.
+    job.setMapperClass(SegmentPreprocessingMapper.class);
+    job.setMapOutputKeyClass(AvroKey.class);
+    job.setMapOutputValueClass(AvroValue.class);
+    job.getConfiguration().setInt(JobContext.NUM_MAPS, inputDataPath.size());
+
+    // Reducer configs.
+    job.setReducerClass(SegmentPreprocessingReducer.class);
+    job.setOutputKeyClass(AvroKey.class);
+    job.setOutputValueClass(NullWritable.class);
+
+    AvroMultipleOutputs.addNamedOutput(job, "avro", AvroKeyOutputFormat.class, schema);
+    AvroMultipleOutputs.setCountersEnabled(job, true);
+    // Use LazyOutputFormat to avoid creating empty files.
+    LazyOutputFormat.setOutputFormatClass(job, AvroKeyOutputFormat.class);
+
+    // Input and output paths.
+    FileInputFormat.setInputPaths(job, _inputSegmentDir);
+    FileOutputFormat.setOutputPath(job, _preprocessedOutputDir);
+    _logger.info("Total number of files to be pre-processed: {}", inputDataPath.size());
+
+    // Set up mapper output key
+    Set<Schema.Field> fieldSet = new HashSet<>();
+
+    // Partition configs.
+    int numReduceTasks = (_numberOfPartitions != 0) ? _numberOfPartitions : inputDataPath.size();
+    if (_partitionColumn != null) {
+      job.getConfiguration().set(JobConfigConstants.ENABLE_PARTITIONING, "true");
+      job.setPartitionerClass(GenericPartitioner.class);
+      job.getConfiguration().set(PARTITION_COLUMN_CONFIG, _partitionColumn);
+      if (_partitionFunction != null) {
+        job.getConfiguration().set(PARTITION_FUNCTION_CONFIG, _partitionFunction);
+      }
+      job.getConfiguration().set(NUM_PARTITIONS_CONFIG, Integer.toString(numReduceTasks));
+    } else {
+      if (_numberOfOutputFiles > 0) {
+        numReduceTasks = _numberOfOutputFiles;
+      }
+      // Partitioning is disabled. Adding hashcode as one of the fields to mapper output key.
+      // so that all the rows can be spread evenly.
+      addHashCodeField(fieldSet);
+    }
+    setMaxNumRecordsConfigIfSpecified(job);
+    job.setInputFormatClass(CombineAvroKeyInputFormat.class);
+
+    _logger.info("Number of reduce tasks for pre-processing job: {}", numReduceTasks);
+    job.setNumReduceTasks(numReduceTasks);
+
+    // Sort config.
+    if (_sortedColumn != null) {
+      _logger.info("Adding sorted column: {} to job config", _sortedColumn);
+      job.getConfiguration().set(SORTED_COLUMN_CONFIG, _sortedColumn);
+
+      addSortedColumnField(schema, fieldSet);
+    } else {
+      // If sorting is disabled, hashcode will be the only factor for sort/group comparator.
+      addHashCodeField(fieldSet);
+    }
+
+    // Creates a wrapper for the schema of output key in mapper.
+    Schema mapperOutputKeySchema = Schema.createRecord(/*name*/"record", /*doc*/"", /*namespace*/"", false);
+    mapperOutputKeySchema.setFields(new ArrayList<>(fieldSet));
+    _logger.info("Mapper output schema: {}", mapperOutputKeySchema);
+
+    AvroJob.setInputKeySchema(job, schema);
+    AvroJob.setMapOutputKeySchema(job, mapperOutputKeySchema);
+    AvroJob.setMapOutputValueSchema(job, schema);
+    AvroJob.setOutputKeySchema(job, schema);
+
+    // Since we aren't extending AbstractHadoopJob, we need to add the jars for the job to
+    // distributed cache ourselves. Take a look at how the addFilesToDistributedCache is
+    // implemented so that you know what it does.
+    _logger.info("HDFS class path: " + _pathToDependencyJar);
+    if (_pathToDependencyJar != null) {
+      _logger.info("Copying jars locally.");
+      JobPreparationHelper.addDepsJarToDistributedCacheHelper(_fileSystem, job, _pathToDependencyJar);
+    } else {
+      _logger.info("Property '{}' not specified.", JobConfigConstants.PATH_TO_DEPS_JAR);
+    }
+
+    long startTime = System.currentTimeMillis();
+    // Submit the job for execution.
+    job.waitForCompletion(true);
+    if (!job.isSuccessful()) {
+      throw new RuntimeException("Job failed : " + job);
+    }
+
+    _logger.info("Finished pre-processing job in {}ms", (System.currentTimeMillis() - startTime));
   }
 
   @Nullable
diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentPreprocessingMapper.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentPreprocessingMapper.java
index 2c4d012..508ce62 100644
--- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentPreprocessingMapper.java
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentPreprocessingMapper.java
@@ -20,6 +20,7 @@ package org.apache.pinot.hadoop.job.mappers;
 
 import com.google.common.base.Preconditions;
 import java.io.IOException;
+import java.util.concurrent.TimeUnit;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
@@ -29,24 +30,49 @@ import org.apache.avro.mapreduce.AvroJob;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.pinot.core.segment.name.NormalizedDateSegmentNameGenerator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.pinot.hadoop.job.InternalConfigConstants.*;
 import static org.apache.pinot.hadoop.job.JobConfigConstants.*;
 
 
 public class SegmentPreprocessingMapper extends Mapper<AvroKey<GenericRecord>, NullWritable, AvroKey<GenericRecord>, AvroValue<GenericRecord>> {
   private static final Logger LOGGER = LoggerFactory.getLogger(SegmentPreprocessingMapper.class);
   private String _sortedColumn = null;
+  private String _timeColumn = null;
   private Schema _outputKeySchema;
   private Schema _outputSchema;
   private boolean _enablePartition = false;
+  private String _sampleNormalizedTimeColumnValue = null;
+  private NormalizedDateSegmentNameGenerator _normalizedDateSegmentNameGenerator = null;
+  private boolean _isAppend = false;
 
   @Override
   public void setup(final Context context) {
     Configuration configuration = context.getConfiguration();
 
-    String sortedColumn = configuration.get("sorted.column");
+    _isAppend = configuration.get(IS_APPEND).equalsIgnoreCase("true");
+
+      if (_isAppend) {
+      // Get time column name
+      _timeColumn = configuration.get(TIME_COLUMN_CONFIG);
+
+      // Get sample time column value
+      String timeColumnValue = configuration.get(TIME_COLUMN_VALUE);
+
+      String pushFrequency = configuration.get(SEGMENT_PUSH_FREQUENCY);
+      String timeType = configuration.get(SEGMENT_TIME_TYPE);
+      String timeFormat = configuration.get(SEGMENT_TIME_FORMAT);
+      TimeUnit timeUnit = TimeUnit.valueOf(timeType
+      );
+      // Normalize time column value
+      _normalizedDateSegmentNameGenerator = new NormalizedDateSegmentNameGenerator(pushFrequency, timeUnit, timeFormat);
+      _sampleNormalizedTimeColumnValue = _normalizedDateSegmentNameGenerator.getNormalizedDate(timeColumnValue);
+    }
+
+    String sortedColumn = configuration.get(SORTED_COLUMN_CONFIG);
     // Logging the configs for the mapper
     LOGGER.info("Sorted Column: " + sortedColumn);
     if (sortedColumn != null) {
@@ -61,6 +87,19 @@ public class SegmentPreprocessingMapper extends Mapper<AvroKey<GenericRecord>, N
   @Override
   public void map(AvroKey<GenericRecord> record, NullWritable value, final Context context)
       throws IOException, InterruptedException {
+
+    if (_isAppend) {
+      // Normalize time column value and check against sample value
+      String timeColumnValue = (String) record.datum().get(_timeColumn);
+      String normalizedTimeColumnValue = _normalizedDateSegmentNameGenerator.getNormalizedDate(timeColumnValue);
+
+      if (!normalizedTimeColumnValue.equals(_sampleNormalizedTimeColumnValue)) {
+        // TODO: Create a custom exception and gracefully catch this exception outside, changing what the path to input
+        // into segment creation should be
+        throw new IllegalArgumentException("");
+      }
+    }
+
     final GenericRecord inputRecord = record.datum();
     final Schema schema = inputRecord.getSchema();
     Preconditions.checkArgument(_outputSchema.equals(schema), "The schema of all avro files should be the same!");
diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/GenericPartitioner.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/GenericPartitioner.java
index 43d1396..ca7e312 100644
--- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/GenericPartitioner.java
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/GenericPartitioner.java
@@ -27,6 +27,8 @@ import org.apache.pinot.core.data.partition.PartitionFunction;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.pinot.hadoop.job.InternalConfigConstants.*;
+
 
 public class GenericPartitioner<T> extends Partitioner<T, AvroValue<GenericRecord>> implements Configurable {
 
@@ -39,10 +41,10 @@ public class GenericPartitioner<T> extends Partitioner<T, AvroValue<GenericRecor
   @Override
   public void setConf(Configuration conf) {
     _configuration = conf;
-    _partitionColumn = _configuration.get("partition.column");
-    _numPartitions = Integer.parseInt(_configuration.get("num.partitions"));
+    _partitionColumn = _configuration.get(PARTITION_COLUMN_CONFIG);
+    _numPartitions = Integer.parseInt(_configuration.get(NUM_PARTITIONS_CONFIG));
     _partitionFunction =
-        PartitionFunctionFactory.getPartitionFunction(_configuration.get("partition.function", null), _numPartitions);
+        PartitionFunctionFactory.getPartitionFunction(_configuration.get(PARTITION_FUNCTION_CONFIG, null), _numPartitions);
 
     LOGGER.info("The partition function is: " + _partitionFunction.getClass().getName());
     LOGGER.info("The partition column is: " + _partitionColumn);
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HadoopSegmentBuildPushOfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HadoopSegmentBuildPushOfflineClusterIntegrationTest.java
index 49903ef..1bc48c3 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HadoopSegmentBuildPushOfflineClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HadoopSegmentBuildPushOfflineClusterIntegrationTest.java
@@ -19,26 +19,48 @@
 package org.apache.pinot.integration.tests;
 
 import java.io.File;
+import java.io.IOException;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
+import org.apache.pinot.common.config.ColumnPartitionConfig;
+import org.apache.pinot.common.config.SegmentPartitionConfig;
 import org.apache.pinot.common.data.Schema;
+import org.apache.pinot.core.data.partition.PartitionFunction;
+import org.apache.pinot.core.data.partition.PartitionFunctionFactory;
 import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
 import org.apache.pinot.hadoop.job.JobConfigConstants;
+import org.apache.pinot.hadoop.job.SegmentPreprocessingJob;
 import org.apache.pinot.hadoop.job.SegmentCreationJob;
 import org.apache.pinot.hadoop.job.SegmentTarPushJob;
 import org.apache.pinot.util.TestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+import static org.apache.pinot.hadoop.job.JobConfigConstants.*;
+
 
 public class HadoopSegmentBuildPushOfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet {
+  private static final Logger LOGGER = LoggerFactory.getLogger(HadoopSegmentBuildPushOfflineClusterIntegrationTest.class);
   private static final int NUM_BROKERS = 1;
   private static final int NUM_SERVERS = 1;
 
@@ -66,7 +88,7 @@ public class HadoopSegmentBuildPushOfflineClusterIntegrationTest extends BaseClu
 
     // Start the MR Yarn cluster
     final Configuration conf = new Configuration();
-    _mrCluster = new MiniMRYarnCluster(getClass().getName(), 1);
+    _mrCluster = new MiniMRYarnCluster(getClass().getName(), 2);
     _mrCluster.init(conf);
     _mrCluster.start();
 
@@ -93,7 +115,7 @@ public class HadoopSegmentBuildPushOfflineClusterIntegrationTest extends BaseClu
 
     // Create the table
     addOfflineTable(getTableName(), _schema.getTimeColumnName(), _schema.getOutgoingTimeUnit().toString(), null, null,
-        getLoadMode(), SegmentVersion.v3, getInvertedIndexColumns(), getBloomFilterIndexColumns(), getTaskConfig(), null, null);
+        getLoadMode(), SegmentVersion.v3, getInvertedIndexColumns(), getBloomFilterIndexColumns(), getTaskConfig(), getSegmentPartitionConfig(), getSortedColumn());
 
     // Generate and push Pinot segments from Hadoop
     generateAndPushSegmentsFromHadoop();
@@ -161,6 +183,25 @@ public class HadoopSegmentBuildPushOfflineClusterIntegrationTest extends BaseClu
     properties.setProperty(JobConfigConstants.PUSH_TO_HOSTS, getDefaultControllerConfiguration().getControllerHost());
     properties.setProperty(JobConfigConstants.PUSH_TO_PORT, getDefaultControllerConfiguration().getControllerPort());
 
+    Properties preComputeProperties = new Properties();
+    preComputeProperties.putAll(properties);
+    preComputeProperties.setProperty(ENABLE_PARTITIONING, Boolean.TRUE.toString());
+    preComputeProperties.setProperty(ENABLE_SORTING, Boolean.TRUE.toString());
+
+    preComputeProperties.setProperty(JobConfigConstants.PATH_TO_INPUT, _avroDir.getPath());
+    preComputeProperties.setProperty(JobConfigConstants.PREPROCESS_PATH_TO_OUTPUT, _preprocessingDir.getPath());
+    properties.setProperty(JobConfigConstants.PATH_TO_INPUT, _preprocessingDir.getPath());
+
+    // Run segment pre-processing job
+    SegmentPreprocessingJob segmentPreprocessingJob = new SegmentPreprocessingJob(preComputeProperties);
+    Configuration preComputeConfig = _mrCluster.getConfig();
+    segmentPreprocessingJob.setConf(preComputeConfig);
+    segmentPreprocessingJob.run();
+    LOGGER.info("Segment preprocessing job finished.");
+
+    // Verify partitioning and sorting.
+    verifyPreprocessingJob(preComputeConfig);
+
     // Run segment creation job
     SegmentCreationJob creationJob = new SegmentCreationJob(properties);
     Configuration config = _mrCluster.getConfig();
@@ -172,4 +213,46 @@ public class HadoopSegmentBuildPushOfflineClusterIntegrationTest extends BaseClu
     pushJob.setConf(_mrCluster.getConfig());
     pushJob.run();
   }
+
+  private void verifyPreprocessingJob(Configuration preComputeConfig) throws IOException {
+    // Fetch partitioning config and sorting config.
+    SegmentPartitionConfig segmentPartitionConfig = getSegmentPartitionConfig();
+    Map.Entry<String, ColumnPartitionConfig>
+        entry = segmentPartitionConfig.getColumnPartitionMap().entrySet().iterator().next();
+    String partitionColumn = entry.getKey();
+    String partitionFunctionString = entry.getValue().getFunctionName();
+    int numPartitions = entry.getValue().getNumPartitions();
+    PartitionFunction partitionFunction = PartitionFunctionFactory.getPartitionFunction(partitionFunctionString, numPartitions);
+    String sortedColumn = getSortedColumn();
+
+    // Get output files.
+    FileSystem fileSystem = FileSystem.get(preComputeConfig);
+    FileStatus[] fileStatuses = fileSystem.listStatus(new Path(_preprocessingDir.getPath()));
+    Assert.assertEquals(fileStatuses.length, numPartitions, "Number of output file should be the same as the number of partitions.");
+
+    Set<Integer> partitionIdSet = new HashSet<>();
+    Object previousObject;
+    for (FileStatus fileStatus : fileStatuses) {
+      Path avroFile = fileStatus.getPath();
+      DataFileStream<GenericRecord> dataFileStream = new DataFileStream<>(fileSystem.open(avroFile), new GenericDatumReader<>());
+
+      // Reset hash set and previous object
+      partitionIdSet.clear();
+      previousObject = null;
+      while (dataFileStream.hasNext()) {
+        GenericRecord genericRecord = dataFileStream.next();
+        partitionIdSet.add(partitionFunction.getPartition(genericRecord.get(partitionColumn)));
+        Assert.assertEquals(partitionIdSet.size(), 1, "Partition Id should be the same within a file.");
+        org.apache.avro.Schema sortedColumnSchema = genericRecord.getSchema().getField(sortedColumn).schema();
+        Object currentObject = genericRecord.get(sortedColumn);
+        if (previousObject == null) {
+          previousObject = currentObject;
+          continue;
+        }
+        // The values of sorted column should be sorted in ascending order.
+        Assert.assertTrue(GenericData.get().compare(previousObject, currentObject, sortedColumnSchema) <= 0);
+        previousObject = currentObject;
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org