You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by je...@apache.org on 2019/07/23 20:07:24 UTC
[incubator-pinot] 01/01: Time-aware resizing
This is an automated email from the ASF dual-hosted git repository.
jenniferdai pushed a commit to branch inputPathCompat
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 4a0d4b11c4411b1b1179824bbdbf8ac94640143d
Author: Jennifer Dai <jd...@linkedin.com>
AuthorDate: Tue Jul 23 13:00:07 2019 -0700
Time-aware resizing
---
.../name/NormalizedDateSegmentNameGenerator.java | 18 +-
.../pinot/hadoop/job/InternalConfigConstants.java | 21 ++
.../pinot/hadoop/job/SegmentPreprocessingJob.java | 216 ++++++++++++++++++++-
.../job/mappers/SegmentPreprocessingMapper.java | 41 +++-
.../job/partitioners/GenericPartitioner.java | 8 +-
.../tests/BaseClusterIntegrationTest.java | 1 +
...mentBuildPushOfflineClusterIntegrationTest.java | 73 ++++++-
7 files changed, 363 insertions(+), 15 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/name/NormalizedDateSegmentNameGenerator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/name/NormalizedDateSegmentNameGenerator.java
index eadd3d9..ff1ecc1 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/name/NormalizedDateSegmentNameGenerator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/name/NormalizedDateSegmentNameGenerator.java
@@ -32,16 +32,20 @@ import org.apache.pinot.common.data.TimeGranularitySpec.TimeFormat;
* Segment name generator that normalizes the date to human readable format.
*/
public class NormalizedDateSegmentNameGenerator implements SegmentNameGenerator {
- private final String _segmentNamePrefix;
- private final boolean _excludeSequenceId;
- private final boolean _appendPushType;
+ private String _segmentNamePrefix;
+ private boolean _excludeSequenceId;
+ private boolean _appendPushType;
// For APPEND tables
- private final SimpleDateFormat _outputSDF;
+ private SimpleDateFormat _outputSDF;
// For EPOCH time format
- private final TimeUnit _inputTimeUnit;
+ private TimeUnit _inputTimeUnit;
// For SIMPLE_DATE_FORMAT time format
- private final SimpleDateFormat _inputSDF;
+ private SimpleDateFormat _inputSDF;
+
+ public NormalizedDateSegmentNameGenerator(@Nullable String pushFrequency, @Nullable TimeUnit timeType, @Nullable String timeFormat) {
+ new NormalizedDateSegmentNameGenerator("myTable", null, false, "APPEND", pushFrequency, timeType, timeFormat);
+ }
public NormalizedDateSegmentNameGenerator(String tableName, @Nullable String segmentNamePrefix,
boolean excludeSequenceId, @Nullable String pushType, @Nullable String pushFrequency, @Nullable TimeUnit timeType,
@@ -98,7 +102,7 @@ public class NormalizedDateSegmentNameGenerator implements SegmentNameGenerator
* @param timeValue Time value
* @return Normalized date string
*/
- private String getNormalizedDate(Object timeValue) {
+ public String getNormalizedDate(Object timeValue) {
if (_inputTimeUnit != null) {
return _outputSDF.format(new Date(_inputTimeUnit.toMillis(Long.parseLong(timeValue.toString()))));
} else {
diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/InternalConfigConstants.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/InternalConfigConstants.java
new file mode 100644
index 0000000..d674742
--- /dev/null
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/InternalConfigConstants.java
@@ -0,0 +1,21 @@
+package org.apache.pinot.hadoop.job;
+
+/**
+ * Internal-only constants for Hadoop MapReduce jobs. These constants are propagated across different segment creation
+ * jobs. They are not meant to be set externally.
+ */
+public class InternalConfigConstants {
+ public static final String TIME_COLUMN_CONFIG = "time.column";
+ public static final String TIME_COLUMN_VALUE = "time.column.value";
+ public static final String IS_APPEND = "is.append";
+ public static final String SEGMENT_PUSH_FREQUENCY = "segment.push.frequency";
+ public static final String SEGMENT_TIME_TYPE = "segment.time.type";
+ public static final String SEGMENT_TIME_FORMAT = "segment.time.format";
+
+ // Partitioning configs
+ public static final String PARTITION_COLUMN_CONFIG = "partition.column";
+ public static final String NUM_PARTITIONS_CONFIG = "num.partitions";
+ public static final String PARTITION_FUNCTION_CONFIG = "partition.function";
+
+ public static final String SORTED_COLUMN_CONFIG = "sorted.column";
+}
diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentPreprocessingJob.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentPreprocessingJob.java
index a3f3901..364ff08 100644
--- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentPreprocessingJob.java
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentPreprocessingJob.java
@@ -20,6 +20,8 @@ package org.apache.pinot.hadoop.job;
import com.google.common.base.Preconditions;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -30,21 +32,43 @@ import org.apache.avro.Schema;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapred.AvroValue;
+import org.apache.avro.mapreduce.AvroJob;
+import org.apache.avro.mapreduce.AvroKeyOutputFormat;
+import org.apache.avro.mapreduce.AvroMultipleOutputs;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.JobContext;
import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import org.apache.pinot.common.config.ColumnPartitionConfig;
import org.apache.pinot.common.config.IndexingConfig;
import org.apache.pinot.common.config.SegmentPartitionConfig;
+import org.apache.pinot.common.config.SegmentsValidationAndRetentionConfig;
import org.apache.pinot.common.config.TableConfig;
import org.apache.pinot.common.config.TableCustomConfig;
+import org.apache.pinot.hadoop.io.CombineAvroKeyInputFormat;
+import org.apache.pinot.hadoop.job.mappers.SegmentPreprocessingMapper;
+import org.apache.pinot.hadoop.job.partitioners.GenericPartitioner;
+import org.apache.pinot.hadoop.job.reducers.SegmentPreprocessingReducer;
+import org.apache.pinot.hadoop.utils.JobPreparationHelper;
import org.apache.pinot.hadoop.utils.PushLocation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.hadoop.mapreduce.MRJobConfig.*;
+import static org.apache.hadoop.security.UserGroupInformation.*;
+import static org.apache.pinot.hadoop.job.InternalConfigConstants.*;
+
/**
* A Hadoop job which provides partitioning, sorting, and resizing against the input files, which is raw data in Avro format.
@@ -79,6 +103,7 @@ public class SegmentPreprocessingJob extends BaseSegmentJob {
private final String _defaultPermissionsMask;
private TableConfig _tableConfig;
+ private org.apache.pinot.common.data.Schema _schema;
protected FileSystem _fileSystem;
public SegmentPreprocessingJob(final Properties properties) {
@@ -137,8 +162,195 @@ public class SegmentPreprocessingJob extends BaseSegmentJob {
public void run()
throws Exception {
- _logger.info("Pre-processing job is disabled.");
- return;
+ // TODO: Remove once the job is ready
+ _enablePartitioning = false;
+ _enableSorting = false;
+ _enableResizing = false;
+
+ if (!_enablePartitioning && !_enableSorting && !_enableResizing) {
+ _logger.info("Pre-processing job is disabled.");
+ return;
+ } else {
+ _logger.info("Starting {}", getClass().getSimpleName());
+ }
+
+ _fileSystem = FileSystem.get(_conf);
+ final List<Path> inputDataPath = getDataFilePaths(_inputSegmentDir);
+
+ if (_fileSystem.exists(_preprocessedOutputDir)) {
+ _logger.warn("Found the output folder {}, deleting it", _preprocessedOutputDir);
+ _fileSystem.delete(_preprocessedOutputDir, true);
+ }
+ JobPreparationHelper.setDirPermission(_fileSystem, _preprocessedOutputDir, _defaultPermissionsMask);
+
+ // If push locations, table config, and schema are not configured, this does not necessarily mean that segments
+ // cannot be created. We should allow the user to go to the next step rather than failing the job.
+ if (_pushLocations.isEmpty()) {
+ _logger.error("Push locations cannot be empty. "
+ + "They are needed to get the table config and schema needed for this step. Skipping pre-processing");
+ return;
+ }
+
+ try (ControllerRestApi controllerRestApi = getControllerRestApi()) {
+ _tableConfig = controllerRestApi.getTableConfig();
+ _schema = controllerRestApi.getSchema();
+ }
+
+ if (_tableConfig == null) {
+ _logger.error("Table config cannot be null. Skipping pre-processing");
+ return;
+ }
+
+ if (_schema == null) {
+ _logger.error("Schema cannot be null. Skipping pre-processing");
+ }
+
+ SegmentsValidationAndRetentionConfig validationConfig = _tableConfig.getValidationConfig();
+
+ _logger.info("Initializing a pre-processing job");
+ Job job = Job.getInstance(_conf);
+
+ // TODO: Serialize and deserialize validation config by creating toJson and fromJson
+ // If the use case is an append use case, check that one time unit is contained in one file. If there is more than one,
+ // the job should be disabled, as we should not resize for these use cases. Therefore, setting the time column name
+ // and value
+ if (validationConfig.getSegmentPushType().equalsIgnoreCase("APPEND")) {
+ job.getConfiguration().set(IS_APPEND, "true");
+ String timeColumnName = _schema.getTimeFieldSpec().getName();
+ job.getConfiguration().set(TIME_COLUMN_CONFIG, timeColumnName);
+ job.getConfiguration().set(SEGMENT_TIME_TYPE, validationConfig.getTimeType().toString());
+ job.getConfiguration().set(SEGMENT_TIME_FORMAT, _schema.getTimeFieldSpec().getOutgoingGranularitySpec().getTimeFormat());
+ job.getConfiguration().set(SEGMENT_PUSH_FREQUENCY, validationConfig.getSegmentPushFrequency());
+ DataFileStream<GenericRecord> dataStreamReader = getAvroReader(inputDataPath.get(0));
+ job.getConfiguration().set(TIME_COLUMN_VALUE, (String) dataStreamReader.next().get(timeColumnName));
+ dataStreamReader.close();
+ }
+
+ if (_enablePartitioning) {
+ fetchPartitioningConfig();
+ _logger.info("{}: {}", PARTITION_COLUMN_CONFIG, _partitionColumn);
+ _logger.info("{}: {}", NUM_PARTITIONS_CONFIG, _numberOfPartitions);
+ _logger.info("{}: {}", PARTITION_FUNCTION_CONFIG, _partitionColumn);
+ }
+
+ if (_enableSorting) {
+ fetchSortingConfig();
+ _logger.info("{}: {}", SORTED_COLUMN_CONFIG, _sortedColumn);
+ }
+
+ if (_enableResizing) {
+ fetchResizingConfig();
+ _logger.info("minimum number of output files: {}", _numberOfOutputFiles);
+ }
+
+ job.getConfiguration().set(JobContext.JOB_NAME, this.getClass().getName());
+ // Turn this on to always firstly use class paths that user specifies.
+ job.getConfiguration().set(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, "true");
+ // Turn this off since we don't need an empty file in the output directory
+ job.getConfiguration().set(FileOutputCommitter.SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, "false");
+
+ job.setJarByClass(SegmentPreprocessingJob.class);
+
+ String hadoopTokenFileLocation = System.getenv(HADOOP_TOKEN_FILE_LOCATION);
+ if (hadoopTokenFileLocation != null) {
+ job.getConfiguration().set(MAPREDUCE_JOB_CREDENTIALS_BINARY, hadoopTokenFileLocation);
+ }
+
+ // Schema configs.
+ Schema schema = getSchema(inputDataPath.get(0));
+ _logger.info("Schema is: {}", schema.toString(true));
+
+ // Validates configs against schema.
+ validateConfigsAgainstSchema(schema);
+
+ // Mapper configs.
+ job.setMapperClass(SegmentPreprocessingMapper.class);
+ job.setMapOutputKeyClass(AvroKey.class);
+ job.setMapOutputValueClass(AvroValue.class);
+ job.getConfiguration().setInt(JobContext.NUM_MAPS, inputDataPath.size());
+
+ // Reducer configs.
+ job.setReducerClass(SegmentPreprocessingReducer.class);
+ job.setOutputKeyClass(AvroKey.class);
+ job.setOutputValueClass(NullWritable.class);
+
+ AvroMultipleOutputs.addNamedOutput(job, "avro", AvroKeyOutputFormat.class, schema);
+ AvroMultipleOutputs.setCountersEnabled(job, true);
+ // Use LazyOutputFormat to avoid creating empty files.
+ LazyOutputFormat.setOutputFormatClass(job, AvroKeyOutputFormat.class);
+
+ // Input and output paths.
+ FileInputFormat.setInputPaths(job, _inputSegmentDir);
+ FileOutputFormat.setOutputPath(job, _preprocessedOutputDir);
+ _logger.info("Total number of files to be pre-processed: {}", inputDataPath.size());
+
+ // Set up mapper output key
+ Set<Schema.Field> fieldSet = new HashSet<>();
+
+ // Partition configs.
+ int numReduceTasks = (_numberOfPartitions != 0) ? _numberOfPartitions : inputDataPath.size();
+ if (_partitionColumn != null) {
+ job.getConfiguration().set(JobConfigConstants.ENABLE_PARTITIONING, "true");
+ job.setPartitionerClass(GenericPartitioner.class);
+ job.getConfiguration().set(PARTITION_COLUMN_CONFIG, _partitionColumn);
+ if (_partitionFunction != null) {
+ job.getConfiguration().set(PARTITION_FUNCTION_CONFIG, _partitionFunction);
+ }
+ job.getConfiguration().set(NUM_PARTITIONS_CONFIG, Integer.toString(numReduceTasks));
+ } else {
+ if (_numberOfOutputFiles > 0) {
+ numReduceTasks = _numberOfOutputFiles;
+ }
+ // Partitioning is disabled. Adding hashcode as one of the fields to mapper output key.
+ // so that all the rows can be spread evenly.
+ addHashCodeField(fieldSet);
+ }
+ setMaxNumRecordsConfigIfSpecified(job);
+ job.setInputFormatClass(CombineAvroKeyInputFormat.class);
+
+ _logger.info("Number of reduce tasks for pre-processing job: {}", numReduceTasks);
+ job.setNumReduceTasks(numReduceTasks);
+
+ // Sort config.
+ if (_sortedColumn != null) {
+ _logger.info("Adding sorted column: {} to job config", _sortedColumn);
+ job.getConfiguration().set(SORTED_COLUMN_CONFIG, _sortedColumn);
+
+ addSortedColumnField(schema, fieldSet);
+ } else {
+ // If sorting is disabled, hashcode will be the only factor for sort/group comparator.
+ addHashCodeField(fieldSet);
+ }
+
+ // Creates a wrapper for the schema of output key in mapper.
+ Schema mapperOutputKeySchema = Schema.createRecord(/*name*/"record", /*doc*/"", /*namespace*/"", false);
+ mapperOutputKeySchema.setFields(new ArrayList<>(fieldSet));
+ _logger.info("Mapper output schema: {}", mapperOutputKeySchema);
+
+ AvroJob.setInputKeySchema(job, schema);
+ AvroJob.setMapOutputKeySchema(job, mapperOutputKeySchema);
+ AvroJob.setMapOutputValueSchema(job, schema);
+ AvroJob.setOutputKeySchema(job, schema);
+
+ // Since we aren't extending AbstractHadoopJob, we need to add the jars for the job to
+ // distributed cache ourselves. Take a look at how the addFilesToDistributedCache is
+ // implemented so that you know what it does.
+ _logger.info("HDFS class path: " + _pathToDependencyJar);
+ if (_pathToDependencyJar != null) {
+ _logger.info("Copying jars locally.");
+ JobPreparationHelper.addDepsJarToDistributedCacheHelper(_fileSystem, job, _pathToDependencyJar);
+ } else {
+ _logger.info("Property '{}' not specified.", JobConfigConstants.PATH_TO_DEPS_JAR);
+ }
+
+ long startTime = System.currentTimeMillis();
+ // Submit the job for execution.
+ job.waitForCompletion(true);
+ if (!job.isSuccessful()) {
+ throw new RuntimeException("Job failed : " + job);
+ }
+
+ _logger.info("Finished pre-processing job in {}ms", (System.currentTimeMillis() - startTime));
}
@Nullable
diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentPreprocessingMapper.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentPreprocessingMapper.java
index 2c4d012..508ce62 100644
--- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentPreprocessingMapper.java
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentPreprocessingMapper.java
@@ -20,6 +20,7 @@ package org.apache.pinot.hadoop.job.mappers;
import com.google.common.base.Preconditions;
import java.io.IOException;
+import java.util.concurrent.TimeUnit;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
@@ -29,24 +30,49 @@ import org.apache.avro.mapreduce.AvroJob;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.pinot.core.segment.name.NormalizedDateSegmentNameGenerator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.pinot.hadoop.job.InternalConfigConstants.*;
import static org.apache.pinot.hadoop.job.JobConfigConstants.*;
public class SegmentPreprocessingMapper extends Mapper<AvroKey<GenericRecord>, NullWritable, AvroKey<GenericRecord>, AvroValue<GenericRecord>> {
private static final Logger LOGGER = LoggerFactory.getLogger(SegmentPreprocessingMapper.class);
private String _sortedColumn = null;
+ private String _timeColumn = null;
private Schema _outputKeySchema;
private Schema _outputSchema;
private boolean _enablePartition = false;
+ private String _sampleNormalizedTimeColumnValue = null;
+ private NormalizedDateSegmentNameGenerator _normalizedDateSegmentNameGenerator = null;
+ private boolean _isAppend = false;
@Override
public void setup(final Context context) {
Configuration configuration = context.getConfiguration();
- String sortedColumn = configuration.get("sorted.column");
+ _isAppend = configuration.get(IS_APPEND).equalsIgnoreCase("true");
+
+ if (_isAppend) {
+ // Get time column name
+ _timeColumn = configuration.get(TIME_COLUMN_CONFIG);
+
+ // Get sample time column value
+ String timeColumnValue = configuration.get(TIME_COLUMN_VALUE);
+
+ String pushFrequency = configuration.get(SEGMENT_PUSH_FREQUENCY);
+ String timeType = configuration.get(SEGMENT_TIME_TYPE);
+ String timeFormat = configuration.get(SEGMENT_TIME_FORMAT);
+ TimeUnit timeUnit = TimeUnit.valueOf(timeType
+ );
+ // Normalize time column value
+ _normalizedDateSegmentNameGenerator = new NormalizedDateSegmentNameGenerator(pushFrequency, timeUnit, timeFormat);
+ _sampleNormalizedTimeColumnValue = _normalizedDateSegmentNameGenerator.getNormalizedDate(timeColumnValue);
+ }
+
+ String sortedColumn = configuration.get(SORTED_COLUMN_CONFIG);
// Logging the configs for the mapper
LOGGER.info("Sorted Column: " + sortedColumn);
if (sortedColumn != null) {
@@ -61,6 +87,19 @@ public class SegmentPreprocessingMapper extends Mapper<AvroKey<GenericRecord>, N
@Override
public void map(AvroKey<GenericRecord> record, NullWritable value, final Context context)
throws IOException, InterruptedException {
+
+ if (_isAppend) {
+ // Normalize time column value and check against sample value
+ String timeColumnValue = (String) record.datum().get(_timeColumn);
+ String normalizedTimeColumnValue = _normalizedDateSegmentNameGenerator.getNormalizedDate(timeColumnValue);
+
+ if (!normalizedTimeColumnValue.equals(_sampleNormalizedTimeColumnValue)) {
+ // TODO: Create a custom exception and gracefully catch this exception outside, changing what the path to input
+ // into segment creation should be
+ throw new IllegalArgumentException("");
+ }
+ }
+
final GenericRecord inputRecord = record.datum();
final Schema schema = inputRecord.getSchema();
Preconditions.checkArgument(_outputSchema.equals(schema), "The schema of all avro files should be the same!");
diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/GenericPartitioner.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/GenericPartitioner.java
index 43d1396..ca7e312 100644
--- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/GenericPartitioner.java
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/GenericPartitioner.java
@@ -27,6 +27,8 @@ import org.apache.pinot.core.data.partition.PartitionFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.pinot.hadoop.job.InternalConfigConstants.*;
+
public class GenericPartitioner<T> extends Partitioner<T, AvroValue<GenericRecord>> implements Configurable {
@@ -39,10 +41,10 @@ public class GenericPartitioner<T> extends Partitioner<T, AvroValue<GenericRecor
@Override
public void setConf(Configuration conf) {
_configuration = conf;
- _partitionColumn = _configuration.get("partition.column");
- _numPartitions = Integer.parseInt(_configuration.get("num.partitions"));
+ _partitionColumn = _configuration.get(PARTITION_COLUMN_CONFIG);
+ _numPartitions = Integer.parseInt(_configuration.get(NUM_PARTITIONS_CONFIG));
_partitionFunction =
- PartitionFunctionFactory.getPartitionFunction(_configuration.get("partition.function", null), _numPartitions);
+ PartitionFunctionFactory.getPartitionFunction(_configuration.get(PARTITION_FUNCTION_CONFIG, null), _numPartitions);
LOGGER.info("The partition function is: " + _partitionFunction.getClass().getName());
LOGGER.info("The partition column is: " + _partitionColumn);
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
index 67fabc6..2eb17f5 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
@@ -71,6 +71,7 @@ public abstract class BaseClusterIntegrationTest extends ClusterTest {
protected final File _tempDir = new File(FileUtils.getTempDirectory(), getClass().getSimpleName());
protected final File _avroDir = new File(_tempDir, "avroDir");
+ protected final File _preprocessingDir = new File(_tempDir, "preprocessingDir");
protected final File _segmentDir = new File(_tempDir, "segmentDir");
protected final File _tarDir = new File(_tempDir, "tarDir");
protected List<KafkaServerStartable> _kafkaStarters;
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HadoopSegmentBuildPushOfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HadoopSegmentBuildPushOfflineClusterIntegrationTest.java
index 49903ef..7b39366 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HadoopSegmentBuildPushOfflineClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HadoopSegmentBuildPushOfflineClusterIntegrationTest.java
@@ -19,6 +19,7 @@
package org.apache.pinot.integration.tests;
import java.io.File;
+import java.io.IOException;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
@@ -31,14 +32,20 @@ import org.apache.pinot.common.data.Schema;
import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
import org.apache.pinot.hadoop.job.JobConfigConstants;
import org.apache.pinot.hadoop.job.SegmentCreationJob;
+import org.apache.pinot.hadoop.job.SegmentPreprocessingJob;
import org.apache.pinot.hadoop.job.SegmentTarPushJob;
import org.apache.pinot.util.TestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
+import static org.apache.pinot.hadoop.job.JobConfigConstants.*;
+
public class HadoopSegmentBuildPushOfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet {
+ private static final Logger LOGGER = LoggerFactory.getLogger(HadoopSegmentBuildPushOfflineClusterIntegrationTest.class);
private static final int NUM_BROKERS = 1;
private static final int NUM_SERVERS = 1;
@@ -66,7 +73,7 @@ public class HadoopSegmentBuildPushOfflineClusterIntegrationTest extends BaseClu
// Start the MR Yarn cluster
final Configuration conf = new Configuration();
- _mrCluster = new MiniMRYarnCluster(getClass().getName(), 1);
+ _mrCluster = new MiniMRYarnCluster(getClass().getName(), 2);
_mrCluster.init(conf);
_mrCluster.start();
@@ -93,7 +100,7 @@ public class HadoopSegmentBuildPushOfflineClusterIntegrationTest extends BaseClu
// Create the table
addOfflineTable(getTableName(), _schema.getTimeColumnName(), _schema.getOutgoingTimeUnit().toString(), null, null,
- getLoadMode(), SegmentVersion.v3, getInvertedIndexColumns(), getBloomFilterIndexColumns(), getTaskConfig(), null, null);
+ getLoadMode(), SegmentVersion.v3, getInvertedIndexColumns(), getBloomFilterIndexColumns(), getTaskConfig(), getSegmentPartitionConfig(), getSortedColumn());
// Generate and push Pinot segments from Hadoop
generateAndPushSegmentsFromHadoop();
@@ -161,6 +168,25 @@ public class HadoopSegmentBuildPushOfflineClusterIntegrationTest extends BaseClu
properties.setProperty(JobConfigConstants.PUSH_TO_HOSTS, getDefaultControllerConfiguration().getControllerHost());
properties.setProperty(JobConfigConstants.PUSH_TO_PORT, getDefaultControllerConfiguration().getControllerPort());
+ Properties preComputeProperties = new Properties();
+ preComputeProperties.putAll(properties);
+ preComputeProperties.setProperty(ENABLE_PARTITIONING, Boolean.TRUE.toString());
+ preComputeProperties.setProperty(ENABLE_SORTING, Boolean.TRUE.toString());
+
+ preComputeProperties.setProperty(JobConfigConstants.PATH_TO_INPUT, _avroDir.getPath());
+ preComputeProperties.setProperty(JobConfigConstants.PREPROCESS_PATH_TO_OUTPUT, _preprocessingDir.getPath());
+// properties.setProperty(JobConfigConstants.PATH_TO_INPUT, _preprocessingDir.getPath());
+
+ // Run segment pre-processing job
+ SegmentPreprocessingJob segmentPreprocessingJob = new SegmentPreprocessingJob(preComputeProperties);
+ Configuration preComputeConfig = _mrCluster.getConfig();
+ segmentPreprocessingJob.setConf(preComputeConfig);
+ segmentPreprocessingJob.run();
+ LOGGER.info("Segment preprocessing job finished.");
+
+ // Verify partitioning and sorting.
+ verifyPreprocessingJob(preComputeConfig);
+
// Run segment creation job
SegmentCreationJob creationJob = new SegmentCreationJob(properties);
Configuration config = _mrCluster.getConfig();
@@ -172,4 +198,47 @@ public class HadoopSegmentBuildPushOfflineClusterIntegrationTest extends BaseClu
pushJob.setConf(_mrCluster.getConfig());
pushJob.run();
}
+
+ private void verifyPreprocessingJob(Configuration preComputeConfig) throws IOException {
+ // TODO: Uncomment once this job is released
+// // Fetch partitioning config and sorting config.
+// SegmentPartitionConfig segmentPartitionConfig = getSegmentPartitionConfig();
+// Map.Entry<String, ColumnPartitionConfig>
+// entry = segmentPartitionConfig.getColumnPartitionMap().entrySet().iterator().next();
+// String partitionColumn = entry.getKey();
+// String partitionFunctionString = entry.getValue().getFunctionName();
+// int numPartitions = entry.getValue().getNumPartitions();
+// PartitionFunction partitionFunction = PartitionFunctionFactory.getPartitionFunction(partitionFunctionString, numPartitions);
+// String sortedColumn = getSortedColumn();
+//
+// // Get output files.
+// FileSystem fileSystem = FileSystem.get(preComputeConfig);
+// FileStatus[] fileStatuses = fileSystem.listStatus(new Path(_preprocessingDir.getPath()));
+// Assert.assertEquals(fileStatuses.length, numPartitions, "Number of output file should be the same as the number of partitions.");
+//
+// Set<Integer> partitionIdSet = new HashSet<>();
+// Object previousObject;
+// for (FileStatus fileStatus : fileStatuses) {
+// Path avroFile = fileStatus.getPath();
+// DataFileStream<GenericRecord> dataFileStream = new DataFileStream<>(fileSystem.open(avroFile), new GenericDatumReader<>());
+//
+// // Reset hash set and previous object
+// partitionIdSet.clear();
+// previousObject = null;
+// while (dataFileStream.hasNext()) {
+// GenericRecord genericRecord = dataFileStream.next();
+// partitionIdSet.add(partitionFunction.getPartition(genericRecord.get(partitionColumn)));
+// Assert.assertEquals(partitionIdSet.size(), 1, "Partition Id should be the same within a file.");
+// org.apache.avro.Schema sortedColumnSchema = genericRecord.getSchema().getField(sortedColumn).schema();
+// Object currentObject = genericRecord.get(sortedColumn);
+// if (previousObject == null) {
+// previousObject = currentObject;
+// continue;
+// }
+// // The values of sorted column should be sorted in ascending order.
+// Assert.assertTrue(GenericData.get().compare(previousObject, currentObject, sortedColumnSchema) <= 0);
+// previousObject = currentObject;
+// }
+// }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org