You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by je...@apache.org on 2019/07/23 20:07:24 UTC

[incubator-pinot] 01/01: Time-aware resizing

This is an automated email from the ASF dual-hosted git repository.

jenniferdai pushed a commit to branch inputPathCompat
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 4a0d4b11c4411b1b1179824bbdbf8ac94640143d
Author: Jennifer Dai <jd...@linkedin.com>
AuthorDate: Tue Jul 23 13:00:07 2019 -0700

    Time-aware resizing
---
 .../name/NormalizedDateSegmentNameGenerator.java   |  18 +-
 .../pinot/hadoop/job/InternalConfigConstants.java  |  21 ++
 .../pinot/hadoop/job/SegmentPreprocessingJob.java  | 216 ++++++++++++++++++++-
 .../job/mappers/SegmentPreprocessingMapper.java    |  41 +++-
 .../job/partitioners/GenericPartitioner.java       |   8 +-
 .../tests/BaseClusterIntegrationTest.java          |   1 +
 ...mentBuildPushOfflineClusterIntegrationTest.java |  73 ++++++-
 7 files changed, 363 insertions(+), 15 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/name/NormalizedDateSegmentNameGenerator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/name/NormalizedDateSegmentNameGenerator.java
index eadd3d9..ff1ecc1 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/name/NormalizedDateSegmentNameGenerator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/name/NormalizedDateSegmentNameGenerator.java
@@ -32,16 +32,20 @@ import org.apache.pinot.common.data.TimeGranularitySpec.TimeFormat;
  * Segment name generator that normalizes the date to human readable format.
  */
 public class NormalizedDateSegmentNameGenerator implements SegmentNameGenerator {
-  private final String _segmentNamePrefix;
-  private final boolean _excludeSequenceId;
-  private final boolean _appendPushType;
+  private String _segmentNamePrefix;
+  private boolean _excludeSequenceId;
+  private boolean _appendPushType;
 
   // For APPEND tables
-  private final SimpleDateFormat _outputSDF;
+  private SimpleDateFormat _outputSDF;
   // For EPOCH time format
-  private final TimeUnit _inputTimeUnit;
+  private TimeUnit _inputTimeUnit;
   // For SIMPLE_DATE_FORMAT time format
-  private final SimpleDateFormat _inputSDF;
+  private SimpleDateFormat _inputSDF;
+
+  public NormalizedDateSegmentNameGenerator(@Nullable String pushFrequency, @Nullable TimeUnit timeType, @Nullable String timeFormat) {
+    new NormalizedDateSegmentNameGenerator("myTable", null, false, "APPEND", pushFrequency, timeType, timeFormat);
+  }
 
   public NormalizedDateSegmentNameGenerator(String tableName, @Nullable String segmentNamePrefix,
       boolean excludeSequenceId, @Nullable String pushType, @Nullable String pushFrequency, @Nullable TimeUnit timeType,
@@ -98,7 +102,7 @@ public class NormalizedDateSegmentNameGenerator implements SegmentNameGenerator
    * @param timeValue Time value
    * @return Normalized date string
    */
-  private String getNormalizedDate(Object timeValue) {
+  public String getNormalizedDate(Object timeValue) {
     if (_inputTimeUnit != null) {
       return _outputSDF.format(new Date(_inputTimeUnit.toMillis(Long.parseLong(timeValue.toString()))));
     } else {
diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/InternalConfigConstants.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/InternalConfigConstants.java
new file mode 100644
index 0000000..d674742
--- /dev/null
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/InternalConfigConstants.java
@@ -0,0 +1,21 @@
+package org.apache.pinot.hadoop.job;
+
+/**
+ * Internal-only constants for Hadoop MapReduce jobs. These constants are propagated across different segment creation
+ * jobs. They are not meant to be set externally.
+ */
+public class InternalConfigConstants {
+  public static final String TIME_COLUMN_CONFIG = "time.column";
+  public static final String TIME_COLUMN_VALUE = "time.column.value";
+  public static final String IS_APPEND = "is.append";
+  public static final String SEGMENT_PUSH_FREQUENCY = "segment.push.frequency";
+  public static final String SEGMENT_TIME_TYPE = "segment.time.type";
+  public static final String SEGMENT_TIME_FORMAT = "segment.time.format";
+
+  // Partitioning configs
+  public static final String PARTITION_COLUMN_CONFIG = "partition.column";
+  public static final String NUM_PARTITIONS_CONFIG = "num.partitions";
+  public static final String PARTITION_FUNCTION_CONFIG = "partition.function";
+
+  public static final String SORTED_COLUMN_CONFIG = "sorted.column";
+}
diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentPreprocessingJob.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentPreprocessingJob.java
index a3f3901..364ff08 100644
--- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentPreprocessingJob.java
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentPreprocessingJob.java
@@ -20,6 +20,8 @@ package org.apache.pinot.hadoop.job;
 
 import com.google.common.base.Preconditions;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -30,21 +32,43 @@ import org.apache.avro.Schema;
 import org.apache.avro.file.DataFileStream;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapred.AvroValue;
+import org.apache.avro.mapreduce.AvroJob;
+import org.apache.avro.mapreduce.AvroKeyOutputFormat;
+import org.apache.avro.mapreduce.AvroMultipleOutputs;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.JobContext;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
 import org.apache.pinot.common.config.ColumnPartitionConfig;
 import org.apache.pinot.common.config.IndexingConfig;
 import org.apache.pinot.common.config.SegmentPartitionConfig;
+import org.apache.pinot.common.config.SegmentsValidationAndRetentionConfig;
 import org.apache.pinot.common.config.TableConfig;
 import org.apache.pinot.common.config.TableCustomConfig;
+import org.apache.pinot.hadoop.io.CombineAvroKeyInputFormat;
+import org.apache.pinot.hadoop.job.mappers.SegmentPreprocessingMapper;
+import org.apache.pinot.hadoop.job.partitioners.GenericPartitioner;
+import org.apache.pinot.hadoop.job.reducers.SegmentPreprocessingReducer;
+import org.apache.pinot.hadoop.utils.JobPreparationHelper;
 import org.apache.pinot.hadoop.utils.PushLocation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.hadoop.mapreduce.MRJobConfig.*;
+import static org.apache.hadoop.security.UserGroupInformation.*;
+import static org.apache.pinot.hadoop.job.InternalConfigConstants.*;
+
 
 /**
  * A Hadoop job which provides partitioning, sorting, and resizing against the input files, which is raw data in Avro format.
@@ -79,6 +103,7 @@ public class SegmentPreprocessingJob extends BaseSegmentJob {
   private final String _defaultPermissionsMask;
 
   private TableConfig _tableConfig;
+  private org.apache.pinot.common.data.Schema _schema;
   protected FileSystem _fileSystem;
 
   public SegmentPreprocessingJob(final Properties properties) {
@@ -137,8 +162,195 @@ public class SegmentPreprocessingJob extends BaseSegmentJob {
 
   public void run()
       throws Exception {
-    _logger.info("Pre-processing job is disabled.");
-    return;
+    // TODO: Remove once the job is ready
+    _enablePartitioning = false;
+    _enableSorting = false;
+    _enableResizing = false;
+
+    if (!_enablePartitioning && !_enableSorting && !_enableResizing) {
+      _logger.info("Pre-processing job is disabled.");
+      return;
+    } else {
+      _logger.info("Starting {}", getClass().getSimpleName());
+    }
+
+    _fileSystem = FileSystem.get(_conf);
+    final List<Path> inputDataPath = getDataFilePaths(_inputSegmentDir);
+
+    if (_fileSystem.exists(_preprocessedOutputDir)) {
+      _logger.warn("Found the output folder {}, deleting it", _preprocessedOutputDir);
+      _fileSystem.delete(_preprocessedOutputDir, true);
+    }
+    JobPreparationHelper.setDirPermission(_fileSystem, _preprocessedOutputDir, _defaultPermissionsMask);
+
+    // If push locations, table config, and schema are not configured, this does not necessarily mean that segments
+    // cannot be created. We should allow the user to go to the next step rather than failing the job.
+    if (_pushLocations.isEmpty()) {
+      _logger.error("Push locations cannot be empty. "
+          + "They are needed to get the table config and schema needed for this step. Skipping pre-processing");
+      return;
+    }
+
+    try (ControllerRestApi controllerRestApi = getControllerRestApi()) {
+      _tableConfig = controllerRestApi.getTableConfig();
+      _schema = controllerRestApi.getSchema();
+    }
+
+    if (_tableConfig == null) {
+      _logger.error("Table config cannot be null. Skipping pre-processing");
+      return;
+    }
+
+    if (_schema == null) {
+      _logger.error("Schema cannot be null. Skipping pre-processing");
+    }
+
+    SegmentsValidationAndRetentionConfig validationConfig = _tableConfig.getValidationConfig();
+
+    _logger.info("Initializing a pre-processing job");
+    Job job = Job.getInstance(_conf);
+
+    // TODO: Serialize and deserialize validation config by creating toJson and fromJson
+    // If the use case is an append use case, check that one time unit is contained in one file. If there is more than one,
+    // the job should be disabled, as we should not resize for these use cases. Therefore, setting the time column name
+    // and value
+    if (validationConfig.getSegmentPushType().equalsIgnoreCase("APPEND")) {
+      job.getConfiguration().set(IS_APPEND, "true");
+      String timeColumnName = _schema.getTimeFieldSpec().getName();
+      job.getConfiguration().set(TIME_COLUMN_CONFIG, timeColumnName);
+      job.getConfiguration().set(SEGMENT_TIME_TYPE, validationConfig.getTimeType().toString());
+      job.getConfiguration().set(SEGMENT_TIME_FORMAT, _schema.getTimeFieldSpec().getOutgoingGranularitySpec().getTimeFormat());
+      job.getConfiguration().set(SEGMENT_PUSH_FREQUENCY, validationConfig.getSegmentPushFrequency());
+      DataFileStream<GenericRecord> dataStreamReader = getAvroReader(inputDataPath.get(0));
+      job.getConfiguration().set(TIME_COLUMN_VALUE, (String) dataStreamReader.next().get(timeColumnName));
+      dataStreamReader.close();
+    }
+
+    if (_enablePartitioning) {
+      fetchPartitioningConfig();
+      _logger.info("{}: {}", PARTITION_COLUMN_CONFIG, _partitionColumn);
+      _logger.info("{}: {}", NUM_PARTITIONS_CONFIG, _numberOfPartitions);
+      _logger.info("{}: {}", PARTITION_FUNCTION_CONFIG, _partitionColumn);
+    }
+
+    if (_enableSorting) {
+      fetchSortingConfig();
+      _logger.info("{}: {}", SORTED_COLUMN_CONFIG, _sortedColumn);
+    }
+
+    if (_enableResizing) {
+      fetchResizingConfig();
+      _logger.info("minimum number of output files: {}", _numberOfOutputFiles);
+    }
+
+    job.getConfiguration().set(JobContext.JOB_NAME, this.getClass().getName());
+    // Turn this on to always firstly use class paths that user specifies.
+    job.getConfiguration().set(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, "true");
+    // Turn this off since we don't need an empty file in the output directory
+    job.getConfiguration().set(FileOutputCommitter.SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, "false");
+
+    job.setJarByClass(SegmentPreprocessingJob.class);
+
+    String hadoopTokenFileLocation = System.getenv(HADOOP_TOKEN_FILE_LOCATION);
+    if (hadoopTokenFileLocation != null) {
+      job.getConfiguration().set(MAPREDUCE_JOB_CREDENTIALS_BINARY, hadoopTokenFileLocation);
+    }
+
+    // Schema configs.
+    Schema schema = getSchema(inputDataPath.get(0));
+    _logger.info("Schema is: {}", schema.toString(true));
+
+    // Validates configs against schema.
+    validateConfigsAgainstSchema(schema);
+
+    // Mapper configs.
+    job.setMapperClass(SegmentPreprocessingMapper.class);
+    job.setMapOutputKeyClass(AvroKey.class);
+    job.setMapOutputValueClass(AvroValue.class);
+    job.getConfiguration().setInt(JobContext.NUM_MAPS, inputDataPath.size());
+
+    // Reducer configs.
+    job.setReducerClass(SegmentPreprocessingReducer.class);
+    job.setOutputKeyClass(AvroKey.class);
+    job.setOutputValueClass(NullWritable.class);
+
+    AvroMultipleOutputs.addNamedOutput(job, "avro", AvroKeyOutputFormat.class, schema);
+    AvroMultipleOutputs.setCountersEnabled(job, true);
+    // Use LazyOutputFormat to avoid creating empty files.
+    LazyOutputFormat.setOutputFormatClass(job, AvroKeyOutputFormat.class);
+
+    // Input and output paths.
+    FileInputFormat.setInputPaths(job, _inputSegmentDir);
+    FileOutputFormat.setOutputPath(job, _preprocessedOutputDir);
+    _logger.info("Total number of files to be pre-processed: {}", inputDataPath.size());
+
+    // Set up mapper output key
+    Set<Schema.Field> fieldSet = new HashSet<>();
+
+    // Partition configs.
+    int numReduceTasks = (_numberOfPartitions != 0) ? _numberOfPartitions : inputDataPath.size();
+    if (_partitionColumn != null) {
+      job.getConfiguration().set(JobConfigConstants.ENABLE_PARTITIONING, "true");
+      job.setPartitionerClass(GenericPartitioner.class);
+      job.getConfiguration().set(PARTITION_COLUMN_CONFIG, _partitionColumn);
+      if (_partitionFunction != null) {
+        job.getConfiguration().set(PARTITION_FUNCTION_CONFIG, _partitionFunction);
+      }
+      job.getConfiguration().set(NUM_PARTITIONS_CONFIG, Integer.toString(numReduceTasks));
+    } else {
+      if (_numberOfOutputFiles > 0) {
+        numReduceTasks = _numberOfOutputFiles;
+      }
+      // Partitioning is disabled. Adding hashcode as one of the fields to mapper output key.
+      // so that all the rows can be spread evenly.
+      addHashCodeField(fieldSet);
+    }
+    setMaxNumRecordsConfigIfSpecified(job);
+    job.setInputFormatClass(CombineAvroKeyInputFormat.class);
+
+    _logger.info("Number of reduce tasks for pre-processing job: {}", numReduceTasks);
+    job.setNumReduceTasks(numReduceTasks);
+
+    // Sort config.
+    if (_sortedColumn != null) {
+      _logger.info("Adding sorted column: {} to job config", _sortedColumn);
+      job.getConfiguration().set(SORTED_COLUMN_CONFIG, _sortedColumn);
+
+      addSortedColumnField(schema, fieldSet);
+    } else {
+      // If sorting is disabled, hashcode will be the only factor for sort/group comparator.
+      addHashCodeField(fieldSet);
+    }
+
+    // Creates a wrapper for the schema of output key in mapper.
+    Schema mapperOutputKeySchema = Schema.createRecord(/*name*/"record", /*doc*/"", /*namespace*/"", false);
+    mapperOutputKeySchema.setFields(new ArrayList<>(fieldSet));
+    _logger.info("Mapper output schema: {}", mapperOutputKeySchema);
+
+    AvroJob.setInputKeySchema(job, schema);
+    AvroJob.setMapOutputKeySchema(job, mapperOutputKeySchema);
+    AvroJob.setMapOutputValueSchema(job, schema);
+    AvroJob.setOutputKeySchema(job, schema);
+
+    // Since we aren't extending AbstractHadoopJob, we need to add the jars for the job to
+    // distributed cache ourselves. Take a look at how the addFilesToDistributedCache is
+    // implemented so that you know what it does.
+    _logger.info("HDFS class path: " + _pathToDependencyJar);
+    if (_pathToDependencyJar != null) {
+      _logger.info("Copying jars locally.");
+      JobPreparationHelper.addDepsJarToDistributedCacheHelper(_fileSystem, job, _pathToDependencyJar);
+    } else {
+      _logger.info("Property '{}' not specified.", JobConfigConstants.PATH_TO_DEPS_JAR);
+    }
+
+    long startTime = System.currentTimeMillis();
+    // Submit the job for execution.
+    job.waitForCompletion(true);
+    if (!job.isSuccessful()) {
+      throw new RuntimeException("Job failed : " + job);
+    }
+
+    _logger.info("Finished pre-processing job in {}ms", (System.currentTimeMillis() - startTime));
   }
 
   @Nullable
diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentPreprocessingMapper.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentPreprocessingMapper.java
index 2c4d012..508ce62 100644
--- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentPreprocessingMapper.java
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentPreprocessingMapper.java
@@ -20,6 +20,7 @@ package org.apache.pinot.hadoop.job.mappers;
 
 import com.google.common.base.Preconditions;
 import java.io.IOException;
+import java.util.concurrent.TimeUnit;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
@@ -29,24 +30,49 @@ import org.apache.avro.mapreduce.AvroJob;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.pinot.core.segment.name.NormalizedDateSegmentNameGenerator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.pinot.hadoop.job.InternalConfigConstants.*;
 import static org.apache.pinot.hadoop.job.JobConfigConstants.*;
 
 
 public class SegmentPreprocessingMapper extends Mapper<AvroKey<GenericRecord>, NullWritable, AvroKey<GenericRecord>, AvroValue<GenericRecord>> {
   private static final Logger LOGGER = LoggerFactory.getLogger(SegmentPreprocessingMapper.class);
   private String _sortedColumn = null;
+  private String _timeColumn = null;
   private Schema _outputKeySchema;
   private Schema _outputSchema;
   private boolean _enablePartition = false;
+  private String _sampleNormalizedTimeColumnValue = null;
+  private NormalizedDateSegmentNameGenerator _normalizedDateSegmentNameGenerator = null;
+  private boolean _isAppend = false;
 
   @Override
   public void setup(final Context context) {
     Configuration configuration = context.getConfiguration();
 
-    String sortedColumn = configuration.get("sorted.column");
+    _isAppend = configuration.get(IS_APPEND).equalsIgnoreCase("true");
+
+      if (_isAppend) {
+      // Get time column name
+      _timeColumn = configuration.get(TIME_COLUMN_CONFIG);
+
+      // Get sample time column value
+      String timeColumnValue = configuration.get(TIME_COLUMN_VALUE);
+
+      String pushFrequency = configuration.get(SEGMENT_PUSH_FREQUENCY);
+      String timeType = configuration.get(SEGMENT_TIME_TYPE);
+      String timeFormat = configuration.get(SEGMENT_TIME_FORMAT);
+      TimeUnit timeUnit = TimeUnit.valueOf(timeType
+      );
+      // Normalize time column value
+      _normalizedDateSegmentNameGenerator = new NormalizedDateSegmentNameGenerator(pushFrequency, timeUnit, timeFormat);
+      _sampleNormalizedTimeColumnValue = _normalizedDateSegmentNameGenerator.getNormalizedDate(timeColumnValue);
+    }
+
+    String sortedColumn = configuration.get(SORTED_COLUMN_CONFIG);
     // Logging the configs for the mapper
     LOGGER.info("Sorted Column: " + sortedColumn);
     if (sortedColumn != null) {
@@ -61,6 +87,19 @@ public class SegmentPreprocessingMapper extends Mapper<AvroKey<GenericRecord>, N
   @Override
   public void map(AvroKey<GenericRecord> record, NullWritable value, final Context context)
       throws IOException, InterruptedException {
+
+    if (_isAppend) {
+      // Normalize time column value and check against sample value
+      String timeColumnValue = (String) record.datum().get(_timeColumn);
+      String normalizedTimeColumnValue = _normalizedDateSegmentNameGenerator.getNormalizedDate(timeColumnValue);
+
+      if (!normalizedTimeColumnValue.equals(_sampleNormalizedTimeColumnValue)) {
+        // TODO: Create a custom exception and gracefully catch this exception outside, changing what the path to input
+        // into segment creation should be
+        throw new IllegalArgumentException("");
+      }
+    }
+
     final GenericRecord inputRecord = record.datum();
     final Schema schema = inputRecord.getSchema();
     Preconditions.checkArgument(_outputSchema.equals(schema), "The schema of all avro files should be the same!");
diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/GenericPartitioner.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/GenericPartitioner.java
index 43d1396..ca7e312 100644
--- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/GenericPartitioner.java
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/GenericPartitioner.java
@@ -27,6 +27,8 @@ import org.apache.pinot.core.data.partition.PartitionFunction;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.pinot.hadoop.job.InternalConfigConstants.*;
+
 
 public class GenericPartitioner<T> extends Partitioner<T, AvroValue<GenericRecord>> implements Configurable {
 
@@ -39,10 +41,10 @@ public class GenericPartitioner<T> extends Partitioner<T, AvroValue<GenericRecor
   @Override
   public void setConf(Configuration conf) {
     _configuration = conf;
-    _partitionColumn = _configuration.get("partition.column");
-    _numPartitions = Integer.parseInt(_configuration.get("num.partitions"));
+    _partitionColumn = _configuration.get(PARTITION_COLUMN_CONFIG);
+    _numPartitions = Integer.parseInt(_configuration.get(NUM_PARTITIONS_CONFIG));
     _partitionFunction =
-        PartitionFunctionFactory.getPartitionFunction(_configuration.get("partition.function", null), _numPartitions);
+        PartitionFunctionFactory.getPartitionFunction(_configuration.get(PARTITION_FUNCTION_CONFIG, null), _numPartitions);
 
     LOGGER.info("The partition function is: " + _partitionFunction.getClass().getName());
     LOGGER.info("The partition column is: " + _partitionColumn);
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
index 67fabc6..2eb17f5 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
@@ -71,6 +71,7 @@ public abstract class BaseClusterIntegrationTest extends ClusterTest {
 
   protected final File _tempDir = new File(FileUtils.getTempDirectory(), getClass().getSimpleName());
   protected final File _avroDir = new File(_tempDir, "avroDir");
+  protected final File _preprocessingDir = new File(_tempDir, "preprocessingDir");
   protected final File _segmentDir = new File(_tempDir, "segmentDir");
   protected final File _tarDir = new File(_tempDir, "tarDir");
   protected List<KafkaServerStartable> _kafkaStarters;
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HadoopSegmentBuildPushOfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HadoopSegmentBuildPushOfflineClusterIntegrationTest.java
index 49903ef..7b39366 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HadoopSegmentBuildPushOfflineClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HadoopSegmentBuildPushOfflineClusterIntegrationTest.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.integration.tests;
 
 import java.io.File;
+import java.io.IOException;
 import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.ExecutorService;
@@ -31,14 +32,20 @@ import org.apache.pinot.common.data.Schema;
 import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
 import org.apache.pinot.hadoop.job.JobConfigConstants;
 import org.apache.pinot.hadoop.job.SegmentCreationJob;
+import org.apache.pinot.hadoop.job.SegmentPreprocessingJob;
 import org.apache.pinot.hadoop.job.SegmentTarPushJob;
 import org.apache.pinot.util.TestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+import static org.apache.pinot.hadoop.job.JobConfigConstants.*;
+
 
 public class HadoopSegmentBuildPushOfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet {
+  private static final Logger LOGGER = LoggerFactory.getLogger(HadoopSegmentBuildPushOfflineClusterIntegrationTest.class);
   private static final int NUM_BROKERS = 1;
   private static final int NUM_SERVERS = 1;
 
@@ -66,7 +73,7 @@ public class HadoopSegmentBuildPushOfflineClusterIntegrationTest extends BaseClu
 
     // Start the MR Yarn cluster
     final Configuration conf = new Configuration();
-    _mrCluster = new MiniMRYarnCluster(getClass().getName(), 1);
+    _mrCluster = new MiniMRYarnCluster(getClass().getName(), 2);
     _mrCluster.init(conf);
     _mrCluster.start();
 
@@ -93,7 +100,7 @@ public class HadoopSegmentBuildPushOfflineClusterIntegrationTest extends BaseClu
 
     // Create the table
     addOfflineTable(getTableName(), _schema.getTimeColumnName(), _schema.getOutgoingTimeUnit().toString(), null, null,
-        getLoadMode(), SegmentVersion.v3, getInvertedIndexColumns(), getBloomFilterIndexColumns(), getTaskConfig(), null, null);
+        getLoadMode(), SegmentVersion.v3, getInvertedIndexColumns(), getBloomFilterIndexColumns(), getTaskConfig(), getSegmentPartitionConfig(), getSortedColumn());
 
     // Generate and push Pinot segments from Hadoop
     generateAndPushSegmentsFromHadoop();
@@ -161,6 +168,25 @@ public class HadoopSegmentBuildPushOfflineClusterIntegrationTest extends BaseClu
     properties.setProperty(JobConfigConstants.PUSH_TO_HOSTS, getDefaultControllerConfiguration().getControllerHost());
     properties.setProperty(JobConfigConstants.PUSH_TO_PORT, getDefaultControllerConfiguration().getControllerPort());
 
+    Properties preComputeProperties = new Properties();
+    preComputeProperties.putAll(properties);
+    preComputeProperties.setProperty(ENABLE_PARTITIONING, Boolean.TRUE.toString());
+    preComputeProperties.setProperty(ENABLE_SORTING, Boolean.TRUE.toString());
+
+    preComputeProperties.setProperty(JobConfigConstants.PATH_TO_INPUT, _avroDir.getPath());
+    preComputeProperties.setProperty(JobConfigConstants.PREPROCESS_PATH_TO_OUTPUT, _preprocessingDir.getPath());
+//    properties.setProperty(JobConfigConstants.PATH_TO_INPUT, _preprocessingDir.getPath());
+
+    // Run segment pre-processing job
+    SegmentPreprocessingJob segmentPreprocessingJob = new SegmentPreprocessingJob(preComputeProperties);
+    Configuration preComputeConfig = _mrCluster.getConfig();
+    segmentPreprocessingJob.setConf(preComputeConfig);
+    segmentPreprocessingJob.run();
+    LOGGER.info("Segment preprocessing job finished.");
+
+    // Verify partitioning and sorting.
+    verifyPreprocessingJob(preComputeConfig);
+
     // Run segment creation job
     SegmentCreationJob creationJob = new SegmentCreationJob(properties);
     Configuration config = _mrCluster.getConfig();
@@ -172,4 +198,47 @@ public class HadoopSegmentBuildPushOfflineClusterIntegrationTest extends BaseClu
     pushJob.setConf(_mrCluster.getConfig());
     pushJob.run();
   }
+
+  private void verifyPreprocessingJob(Configuration preComputeConfig) throws IOException {
+    // TODO: Uncomment once this job is released
+//    // Fetch partitioning config and sorting config.
+//    SegmentPartitionConfig segmentPartitionConfig = getSegmentPartitionConfig();
+//    Map.Entry<String, ColumnPartitionConfig>
+//        entry = segmentPartitionConfig.getColumnPartitionMap().entrySet().iterator().next();
+//    String partitionColumn = entry.getKey();
+//    String partitionFunctionString = entry.getValue().getFunctionName();
+//    int numPartitions = entry.getValue().getNumPartitions();
+//    PartitionFunction partitionFunction = PartitionFunctionFactory.getPartitionFunction(partitionFunctionString, numPartitions);
+//    String sortedColumn = getSortedColumn();
+//
+//    // Get output files.
+//    FileSystem fileSystem = FileSystem.get(preComputeConfig);
+//    FileStatus[] fileStatuses = fileSystem.listStatus(new Path(_preprocessingDir.getPath()));
+//    Assert.assertEquals(fileStatuses.length, numPartitions, "Number of output file should be the same as the number of partitions.");
+//
+//    Set<Integer> partitionIdSet = new HashSet<>();
+//    Object previousObject;
+//    for (FileStatus fileStatus : fileStatuses) {
+//      Path avroFile = fileStatus.getPath();
+//      DataFileStream<GenericRecord> dataFileStream = new DataFileStream<>(fileSystem.open(avroFile), new GenericDatumReader<>());
+//
+//      // Reset hash set and previous object
+//      partitionIdSet.clear();
+//      previousObject = null;
+//      while (dataFileStream.hasNext()) {
+//        GenericRecord genericRecord = dataFileStream.next();
+//        partitionIdSet.add(partitionFunction.getPartition(genericRecord.get(partitionColumn)));
+//        Assert.assertEquals(partitionIdSet.size(), 1, "Partition Id should be the same within a file.");
+//        org.apache.avro.Schema sortedColumnSchema = genericRecord.getSchema().getField(sortedColumn).schema();
+//        Object currentObject = genericRecord.get(sortedColumn);
+//        if (previousObject == null) {
+//          previousObject = currentObject;
+//          continue;
+//        }
+//        // The values of sorted column should be sorted in ascending order.
+//        Assert.assertTrue(GenericData.get().compare(previousObject, currentObject, sortedColumnSchema) <= 0);
+//        previousObject = currentObject;
+//      }
+//    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org