You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2021/06/22 22:08:37 UTC
[incubator-pinot] branch master updated: Support data preprocessing
for AVRO and ORC formats (#7062)
This is an automated email from the ASF dual-hosted git repository.
jlli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 01b6c22 Support data preprocessing for AVRO and ORC formats (#7062)
01b6c22 is described below
commit 01b6c221059c7d90ff618c47b09ea6db338a0b54
Author: Jialiang Li <jl...@linkedin.com>
AuthorDate: Tue Jun 22 15:07:53 2021 -0700
Support data preprocessing for AVRO and ORC formats (#7062)
Co-authored-by: Jack Li(Analytics Engineering) <jl...@jlli-mn1.linkedin.biz>
---
.../v0_deprecated/pinot-hadoop/pom.xml | 4 +
.../hadoop/job/HadoopSegmentPreprocessingJob.java | 382 +++++++--------------
.../pinot/hadoop/job/InternalConfigConstants.java | 13 +-
.../job/mappers/AvroDataPreprocessingMapper.java | 85 +++++
.../job/mappers/OrcDataPreprocessingMapper.java | 87 +++++
.../job/mappers/SegmentPreprocessingMapper.java | 53 ++-
.../AvroDataPreprocessingPartitioner.java | 77 +++++
.../OrcDataPreprocessingPartitioner.java | 83 +++++
.../preprocess/AvroDataPreprocessingHelper.java | 155 +++++++++
.../job/preprocess/DataPreprocessingHelper.java | 228 ++++++++++++
.../preprocess/DataPreprocessingHelperFactory.java | 55 +++
.../job/preprocess/OrcDataPreprocessingHelper.java | 231 +++++++++++++
...ucer.java => AvroDataPreprocessingReducer.java} | 45 +--
...ducer.java => OrcDataPreprocessingReducer.java} | 57 +--
.../hadoop/utils/preprocess/DataFileUtils.java | 62 ++++
.../utils/preprocess/DataPreprocessingUtils.java | 100 ++++++
.../pinot/hadoop/utils/preprocess/HadoopUtils.java | 41 +++
.../pinot/hadoop/utils/preprocess/OrcUtils.java | 88 +++++
.../hadoop/utils/preprocess/TextComparator.java | 41 +++
.../ingestion/jobs/SegmentPreprocessingJob.java | 26 +-
.../java/org/apache/pinot/spi/data/FieldSpec.java | 7 +
pom.xml | 5 +
22 files changed, 1569 insertions(+), 356 deletions(-)
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/pom.xml b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/pom.xml
index 1c21482..339c832 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/pom.xml
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/pom.xml
@@ -200,6 +200,10 @@
<classifier>hadoop2</classifier>
</dependency>
<dependency>
+ <groupId>org.apache.orc</groupId>
+ <artifactId>orc-mapreduce</artifactId>
+ </dependency>
+ <dependency>
<groupId>javax.xml.bind</groupId>
<artifactId>jaxb-api</artifactId>
</dependency>
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/HadoopSegmentPreprocessingJob.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/HadoopSegmentPreprocessingJob.java
index b4e87fc..81a9902 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/HadoopSegmentPreprocessingJob.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/HadoopSegmentPreprocessingJob.java
@@ -27,66 +27,53 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
-import org.apache.avro.Schema;
-import org.apache.avro.file.DataFileStream;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.mapred.AvroKey;
-import org.apache.avro.mapred.AvroValue;
-import org.apache.avro.mapreduce.AvroJob;
-import org.apache.avro.mapreduce.AvroKeyOutputFormat;
-import org.apache.avro.mapreduce.AvroMultipleOutputs;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapred.JobContext;
import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.pinot.hadoop.io.CombineAvroKeyInputFormat;
-import org.apache.pinot.hadoop.job.mappers.SegmentPreprocessingMapper;
-import org.apache.pinot.hadoop.job.partitioners.GenericPartitioner;
-import org.apache.pinot.hadoop.job.reducers.SegmentPreprocessingReducer;
+import org.apache.pinot.hadoop.job.preprocess.DataPreprocessingHelper;
+import org.apache.pinot.hadoop.job.preprocess.DataPreprocessingHelperFactory;
import org.apache.pinot.hadoop.utils.PinotHadoopJobPreparationHelper;
+import org.apache.pinot.hadoop.utils.preprocess.DataPreprocessingUtils;
+import org.apache.pinot.hadoop.utils.preprocess.HadoopUtils;
import org.apache.pinot.ingestion.common.ControllerRestApi;
import org.apache.pinot.ingestion.common.JobConfigConstants;
import org.apache.pinot.ingestion.jobs.SegmentPreprocessingJob;
-import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory;
import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
+import org.apache.pinot.spi.config.table.FieldConfig;
import org.apache.pinot.spi.config.table.IndexingConfig;
import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
-import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableCustomConfig;
-import org.apache.pinot.spi.data.DateTimeFieldSpec;
-import org.apache.pinot.spi.data.DateTimeFormatSpec;
-import org.apache.pinot.spi.utils.IngestionConfigUtils;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * A Hadoop job which provides partitioning, sorting, and resizing against the input files, which is raw data in Avro format.
+ * A Hadoop job which provides partitioning, sorting, and resizing against the input files, which is raw data in either Avro or Orc format.
* Thus, the output files are partitioned, sorted, resized after this job.
* In order to run this job, the following configs need to be specified in job properties:
* * enable.preprocessing: false by default. Enables preprocessing job.
*/
public class HadoopSegmentPreprocessingJob extends SegmentPreprocessingJob {
- private static final Logger _logger = LoggerFactory.getLogger(HadoopSegmentPreprocessingJob.class);
- protected FileSystem _fileSystem;
+ private static final Logger LOGGER = LoggerFactory.getLogger(HadoopSegmentPreprocessingJob.class);
+
private String _partitionColumn;
private int _numPartitions;
private String _partitionFunction;
- private String _sortedColumn;
+
+ private String _sortingColumn;
+ private FieldSpec.DataType _sortingColumnType;
+
private int _numOutputFiles;
+ private int _maxNumRecordsPerFile;
+
private TableConfig _tableConfig;
private org.apache.pinot.spi.data.Schema _pinotTableSchema;
+ private Set<DataPreprocessingUtils.Operation> _preprocessingOperations;
+
public HadoopSegmentPreprocessingJob(final Properties properties) {
super(properties);
}
@@ -94,102 +81,39 @@ public class HadoopSegmentPreprocessingJob extends SegmentPreprocessingJob {
public void run()
throws Exception {
if (!_enablePreprocessing) {
- _logger.info("Pre-processing job is disabled.");
+ LOGGER.info("Pre-processing job is disabled.");
return;
} else {
- _logger.info("Starting {}", getClass().getSimpleName());
+ LOGGER.info("Starting {}", getClass().getSimpleName());
}
- _fileSystem = FileSystem.get(_inputSegmentDir.toUri(), getConf());
- final List<Path> inputDataPaths = getDataFilePaths(_inputSegmentDir);
- Preconditions.checkState(inputDataPaths.size() != 0, "No files in the input directory.");
-
- if (_fileSystem.exists(_preprocessedOutputDir)) {
- _logger.warn("Found output folder {}, deleting", _preprocessedOutputDir);
- _fileSystem.delete(_preprocessedOutputDir, true);
- }
setTableConfigAndSchema();
-
- _logger.info("Initializing a pre-processing job");
- Job job = Job.getInstance(getConf());
-
- Path sampleAvroPath = inputDataPaths.get(0);
- int numInputPaths = inputDataPaths.size();
-
- setValidationConfigs(job, sampleAvroPath);
- setHadoopJobConfigs(job, numInputPaths);
-
- // Avro Schema configs.
- Schema avroSchema = getSchema(sampleAvroPath);
- _logger.info("Schema is: {}", avroSchema.toString(true));
- setSchemaParams(job, avroSchema);
-
- // Set up mapper output key
- Set<Schema.Field> fieldSet = new HashSet<>();
-
+ fetchPreProcessingOperations();
fetchPartitioningConfig();
fetchSortingConfig();
fetchResizingConfig();
- validateConfigsAgainstSchema(avroSchema);
-
- // Partition configs.
- int numReduceTasks = 0;
- if (_partitionColumn != null) {
- numReduceTasks = _numPartitions;
- job.getConfiguration().set(InternalConfigConstants.ENABLE_PARTITIONING, "true");
- job.setPartitionerClass(GenericPartitioner.class);
- job.getConfiguration().set(InternalConfigConstants.PARTITION_COLUMN_CONFIG, _partitionColumn);
- if (_partitionFunction != null) {
- job.getConfiguration().set(InternalConfigConstants.PARTITION_FUNCTION_CONFIG, _partitionFunction);
- }
- job.getConfiguration().set(InternalConfigConstants.NUM_PARTITIONS_CONFIG, Integer.toString(numReduceTasks));
- setMaxNumRecordsConfigIfSpecified(job);
- } else {
- if (_numOutputFiles > 0) {
- numReduceTasks = _numOutputFiles;
- } else {
- // default number of input paths
- numReduceTasks = inputDataPaths.size();
- }
- // Partitioning is disabled. Adding hashcode as one of the fields to mapper output key.
- // so that all the rows can be spread evenly.
- addHashCodeField(fieldSet);
- }
- job.setInputFormatClass(CombineAvroKeyInputFormat.class);
-
- _logger.info("Number of reduce tasks for pre-processing job: {}", numReduceTasks);
- job.setNumReduceTasks(numReduceTasks);
-
- // Sort config.
- if (_sortedColumn != null) {
- _logger.info("Adding sorted column: {} to job config", _sortedColumn);
- job.getConfiguration().set(InternalConfigConstants.SORTED_COLUMN_CONFIG, _sortedColumn);
- addSortedColumnField(avroSchema, fieldSet);
- } else {
- // If sorting is disabled, hashcode will be the only factor for sort/group comparator.
- addHashCodeField(fieldSet);
- }
+ // Cleans up preprocessed output dir if exists
+ cleanUpPreprocessedOutputs(_preprocessedOutputDir);
- // Creates a wrapper for the schema of output key in mapper.
- Schema mapperOutputKeySchema = Schema.createRecord(/*name*/"record", /*doc*/"", /*namespace*/"", false);
- mapperOutputKeySchema.setFields(new ArrayList<>(fieldSet));
- _logger.info("Mapper output schema: {}", mapperOutputKeySchema);
+ DataPreprocessingHelper dataPreprocessingHelper =
+ DataPreprocessingHelperFactory.generateDataPreprocessingHelper(_inputSegmentDir, _preprocessedOutputDir);
+ dataPreprocessingHelper
+ .registerConfigs(_tableConfig, _pinotTableSchema, _partitionColumn, _numPartitions, _partitionFunction,
+ _sortingColumn, _sortingColumnType, _numOutputFiles, _maxNumRecordsPerFile);
- AvroJob.setInputKeySchema(job, avroSchema);
- AvroJob.setMapOutputKeySchema(job, mapperOutputKeySchema);
- AvroJob.setMapOutputValueSchema(job, avroSchema);
- AvroJob.setOutputKeySchema(job, avroSchema);
+ Job job = dataPreprocessingHelper.setUpJob();
// Since we aren't extending AbstractHadoopJob, we need to add the jars for the job to
// distributed cache ourselves. Take a look at how the addFilesToDistributedCache is
// implemented so that you know what it does.
- _logger.info("HDFS class path: " + _pathToDependencyJar);
+ LOGGER.info("HDFS class path: " + _pathToDependencyJar);
if (_pathToDependencyJar != null) {
- _logger.info("Copying jars locally.");
- PinotHadoopJobPreparationHelper.addDepsJarToDistributedCacheHelper(_fileSystem, job, _pathToDependencyJar);
+ LOGGER.info("Copying jars locally.");
+ PinotHadoopJobPreparationHelper
+ .addDepsJarToDistributedCacheHelper(HadoopUtils.DEFAULT_FILE_SYSTEM, job, _pathToDependencyJar);
} else {
- _logger.info("Property '{}' not specified.", JobConfigConstants.PATH_TO_DEPS_JAR);
+ LOGGER.info("Property '{}' not specified.", JobConfigConstants.PATH_TO_DEPS_JAR);
}
long startTime = System.currentTimeMillis();
@@ -199,11 +123,28 @@ public class HadoopSegmentPreprocessingJob extends SegmentPreprocessingJob {
throw new RuntimeException("Job failed : " + job);
}
- _logger.info("Finished pre-processing job in {}ms", (System.currentTimeMillis() - startTime));
+ LOGGER.info("Finished pre-processing job in {}ms", (System.currentTimeMillis() - startTime));
+ }
+
+ private void fetchPreProcessingOperations() {
+ _preprocessingOperations = new HashSet<>();
+ TableCustomConfig customConfig = _tableConfig.getCustomConfig();
+ if (customConfig != null) {
+ Map<String, String> customConfigMap = customConfig.getCustomConfigs();
+ if (customConfigMap != null && !customConfigMap.isEmpty()) {
+ String preprocessingOperationsString =
+ customConfigMap.getOrDefault(InternalConfigConstants.PREPROCESS_OPERATIONS, "");
+ DataPreprocessingUtils.getOperations(_preprocessingOperations, preprocessingOperationsString);
+ }
+ }
}
private void fetchPartitioningConfig() {
// Fetch partition info from table config.
+ if (!_preprocessingOperations.contains(DataPreprocessingUtils.Operation.PARTITION)) {
+ LOGGER.info("Partitioning is disabled.");
+ return;
+ }
SegmentPartitionConfig segmentPartitionConfig = _tableConfig.getIndexingConfig().getSegmentPartitionConfig();
if (segmentPartitionConfig != null) {
Map<String, ColumnPartitionConfig> columnPartitionMap = segmentPartitionConfig.getColumnPartitionMap();
@@ -215,123 +156,96 @@ public class HadoopSegmentPreprocessingJob extends SegmentPreprocessingJob {
_partitionFunction = segmentPartitionConfig.getFunctionName(_partitionColumn);
}
} else {
- _logger.info("Segment partition config is null for table: {}", _tableConfig.getTableName());
+ LOGGER.info("Segment partition config is null for table: {}", _tableConfig.getTableName());
}
}
private void fetchSortingConfig() {
- // Fetch sorting info from table config.
+ if (!_preprocessingOperations.contains(DataPreprocessingUtils.Operation.SORT)) {
+ LOGGER.info("Sorting is disabled.");
+ return;
+ }
+ // Fetch sorting info from table config first.
+ List<String> sortingColumns = new ArrayList<>();
+ List<FieldConfig> fieldConfigs = _tableConfig.getFieldConfigList();
+ if (fieldConfigs != null && !fieldConfigs.isEmpty()) {
+ for (FieldConfig fieldConfig : fieldConfigs) {
+ if (fieldConfig.getIndexType() == FieldConfig.IndexType.SORTED) {
+ sortingColumns.add(fieldConfig.getName());
+ }
+ }
+ }
+ if (!sortingColumns.isEmpty()) {
+ Preconditions.checkArgument(sortingColumns.size() == 1, "There should be at most 1 sorted column in the table.");
+ _sortingColumn = sortingColumns.get(0);
+ return;
+ }
+
+ // There is no sorted column specified in field configs, try to find sorted column from indexing config.
IndexingConfig indexingConfig = _tableConfig.getIndexingConfig();
List<String> sortedColumns = indexingConfig.getSortedColumn();
if (sortedColumns != null) {
Preconditions.checkArgument(sortedColumns.size() <= 1, "There should be at most 1 sorted column in the table.");
if (sortedColumns.size() == 1) {
- _sortedColumn = sortedColumns.get(0);
+ _sortingColumn = sortedColumns.get(0);
+ FieldSpec fieldSpec = _pinotTableSchema.getFieldSpecFor(_sortingColumn);
+ Preconditions.checkState(fieldSpec != null, "Failed to find sorting column: {} in the schema", _sortingColumn);
+ Preconditions
+ .checkState(fieldSpec.isSingleValueField(), "Cannot sort on multi-value column: %s", _sortingColumn);
+ _sortingColumnType = fieldSpec.getDataType();
+ Preconditions
+ .checkState(_sortingColumnType.canBeASortedColumn(), "Cannot sort on %s column: %s", _sortingColumnType,
+ _sortingColumn);
+ LOGGER.info("Sorting the data with column: {} of type: {}", _sortingColumn, _sortingColumnType);
}
}
}
private void fetchResizingConfig() {
+ if (!_preprocessingOperations.contains(DataPreprocessingUtils.Operation.RESIZE)) {
+ LOGGER.info("Resizing is disabled.");
+ return;
+ }
TableCustomConfig tableCustomConfig = _tableConfig.getCustomConfig();
if (tableCustomConfig == null) {
_numOutputFiles = 0;
return;
}
Map<String, String> customConfigsMap = tableCustomConfig.getCustomConfigs();
- if (customConfigsMap != null && customConfigsMap.containsKey(InternalConfigConstants.PREPROCESS_NUM_FILES)) {
- _numOutputFiles = Integer.parseInt(customConfigsMap.get(InternalConfigConstants.PREPROCESS_NUM_FILES));
+ if (customConfigsMap != null && customConfigsMap.containsKey(InternalConfigConstants.PREPROCESSING_NUM_REDUCERS)) {
+ _numOutputFiles = Integer.parseInt(customConfigsMap.get(InternalConfigConstants.PREPROCESSING_NUM_REDUCERS));
Preconditions.checkState(_numOutputFiles > 0, String
- .format("The value of %s should be positive! Current value: %s", InternalConfigConstants.PREPROCESS_NUM_FILES,
- _numOutputFiles));
+ .format("The value of %s should be positive! Current value: %s",
+ InternalConfigConstants.PREPROCESSING_NUM_REDUCERS, _numOutputFiles));
} else {
_numOutputFiles = 0;
}
- }
- private void setMaxNumRecordsConfigIfSpecified(Job job) {
- TableCustomConfig tableCustomConfig = _tableConfig.getCustomConfig();
- if (tableCustomConfig == null) {
- return;
- }
- Map<String, String> customConfigsMap = tableCustomConfig.getCustomConfigs();
- if (customConfigsMap != null && customConfigsMap
- .containsKey(InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE)) {
- int maxNumRecords =
- Integer.parseInt(customConfigsMap.get(InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE));
+ if (customConfigsMap != null) {
+ int maxNumRecords;
+ if (customConfigsMap.containsKey(InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE)) {
+ LOGGER.warn("The config: {} from custom config is deprecated. Use {} instead.",
+ InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE,
+ InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE);
+ maxNumRecords = Integer.parseInt(customConfigsMap.get(InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE));
+ } else if (customConfigsMap.containsKey(InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE)) {
+ maxNumRecords =
+ Integer.parseInt(customConfigsMap.get(InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE));
+ } else {
+ return;
+ }
+ // TODO: add a in-built maximum value for this config to avoid having too many small files.
+ // E.g. if the config is set to 1 which is smaller than this in-built value, the job should be abort from generating too many small files.
Preconditions.checkArgument(maxNumRecords > 0,
- "The value of " + InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE
+ "The value of " + InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE
+ " should be positive. Current value: " + maxNumRecords);
- _logger.info("Setting {} to {}", InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE, maxNumRecords);
- job.getConfiguration()
- .set(InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE, Integer.toString(maxNumRecords));
+ LOGGER.info("Setting {} to {}", InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE, maxNumRecords);
+ _maxNumRecordsPerFile = maxNumRecords;
}
}
- /**
- * Finds the avro file in the input folder, and returns its avro schema
- * @param inputPathDir Path to input directory
- * @return Input schema
- * @throws IOException exception when accessing to IO
- */
- private Schema getSchema(Path inputPathDir)
- throws IOException {
- FileSystem fs = FileSystem.get(new Configuration());
- Schema avroSchema = null;
- for (FileStatus fileStatus : fs.listStatus(inputPathDir)) {
- if (fileStatus.isFile() && fileStatus.getPath().getName().endsWith(".avro")) {
- _logger.info("Extracting schema from " + fileStatus.getPath());
- try (DataFileStream<GenericRecord> dataStreamReader = getAvroReader(inputPathDir)) {
- avroSchema = dataStreamReader.getSchema();
- }
- break;
- }
- }
- return avroSchema;
- }
-
- private void addSortedColumnField(Schema schema, Set<Schema.Field> fieldSet) {
- // Sorting is enabled. Adding sorted column value to mapper output key.
- Schema sortedColumnSchema = schema.getField(_sortedColumn).schema();
- Schema sortedColumnAsKeySchema;
- if (sortedColumnSchema.getType().equals(Schema.Type.UNION)) {
- sortedColumnAsKeySchema = Schema.createUnion(sortedColumnSchema.getTypes());
- } else if (sortedColumnSchema.getType().equals(Schema.Type.ARRAY)) {
- sortedColumnAsKeySchema = Schema.createArray(sortedColumnSchema.getElementType());
- } else {
- sortedColumnAsKeySchema = Schema.create(sortedColumnSchema.getType());
- }
- Schema.Field columnField = new Schema.Field(_sortedColumn, sortedColumnAsKeySchema, "sortedColumn", null);
- fieldSet.add(columnField);
- }
-
- private void validateConfigsAgainstSchema(Schema schema) {
- if (_partitionColumn != null) {
- Preconditions.checkArgument(schema.getField(_partitionColumn) != null,
- String.format("Partition column: %s is not found from the schema of input files.", _partitionColumn));
- Preconditions.checkArgument(_numPartitions > 0,
- String.format("Number of partitions should be positive. Current value: %s", _numPartitions));
- Preconditions.checkArgument(_partitionFunction != null, "Partition function should not be null!");
- try {
- PartitionFunctionFactory.PartitionFunctionType.fromString(_partitionFunction);
- } catch (IllegalArgumentException e) {
- _logger.error("Partition function needs to be one of Modulo, Murmur, ByteArray, HashCode, it is currently {}",
- _partitionColumn);
- throw new IllegalArgumentException(e);
- }
- }
- if (_sortedColumn != null) {
- Preconditions.checkArgument(schema.getField(_sortedColumn) != null,
- String.format("Sorted column: %s is not found from the schema of input files.", _sortedColumn));
- }
- }
-
- private void addHashCodeField(Set<Schema.Field> fieldSet) {
- Schema.Field hashCodeField = new Schema.Field("hashcode", Schema.create(Schema.Type.INT), "hashcode", null);
- fieldSet.add(hashCodeField);
- }
-
@Override
- protected org.apache.pinot.spi.data.Schema getSchema()
+ protected Schema getSchema()
throws IOException {
try (ControllerRestApi controllerRestApi = getControllerRestApi()) {
if (controllerRestApi != null) {
@@ -360,74 +274,14 @@ public class HadoopSegmentPreprocessingJob extends SegmentPreprocessingJob {
Preconditions.checkState(_pinotTableSchema != null, "Schema cannot be null");
}
- private void setValidationConfigs(Job job, Path path)
+ /**
+ * Cleans up outputs in preprocessed output directory.
+ */
+ public static void cleanUpPreprocessedOutputs(Path preprocessedOutputDir)
throws IOException {
- SegmentsValidationAndRetentionConfig validationConfig = _tableConfig.getValidationConfig();
-
- // TODO: Serialize and deserialize validation config by creating toJson and fromJson
- // If the use case is an append use case, check that one time unit is contained in one file. If there is more than one,
- // the job should be disabled, as we should not resize for these use cases. Therefore, setting the time column name
- // and value
- if (IngestionConfigUtils.getBatchSegmentIngestionType(_tableConfig).equalsIgnoreCase("APPEND")) {
- job.getConfiguration().set(InternalConfigConstants.IS_APPEND, "true");
- String timeColumnName = validationConfig.getTimeColumnName();
- job.getConfiguration().set(InternalConfigConstants.TIME_COLUMN_CONFIG, timeColumnName);
- if (timeColumnName != null) {
- DateTimeFieldSpec dateTimeFieldSpec = _pinotTableSchema.getSpecForTimeColumn(timeColumnName);
- if (dateTimeFieldSpec != null) {
- DateTimeFormatSpec formatSpec = new DateTimeFormatSpec(dateTimeFieldSpec.getFormat());
- job.getConfiguration()
- .set(InternalConfigConstants.SEGMENT_TIME_TYPE, formatSpec.getColumnUnit().toString());
- job.getConfiguration()
- .set(InternalConfigConstants.SEGMENT_TIME_FORMAT, formatSpec.getTimeFormat().toString());
- job.getConfiguration()
- .set(InternalConfigConstants.SEGMENT_TIME_SDF_PATTERN, formatSpec.getSDFPattern());
- }
- }
- job.getConfiguration().set(InternalConfigConstants.SEGMENT_PUSH_FREQUENCY,
- IngestionConfigUtils.getBatchSegmentIngestionFrequency(_tableConfig));
- try (DataFileStream<GenericRecord> dataStreamReader = getAvroReader(path)) {
- job.getConfiguration()
- .set(InternalConfigConstants.TIME_COLUMN_VALUE, dataStreamReader.next().get(timeColumnName).toString());
- }
- }
- }
-
- private void setHadoopJobConfigs(Job job, int numInputPaths) {
- job.getConfiguration().set(JobContext.JOB_NAME, this.getClass().getName());
- // Turn this on to always firstly use class paths that user specifies.
- job.getConfiguration().set(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, "true");
- // Turn this off since we don't need an empty file in the output directory
- job.getConfiguration().set(FileOutputCommitter.SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, "false");
-
- job.setJarByClass(HadoopSegmentPreprocessingJob.class);
-
- String hadoopTokenFileLocation = System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
- if (hadoopTokenFileLocation != null) {
- job.getConfiguration().set(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY, hadoopTokenFileLocation);
+ if (HadoopUtils.DEFAULT_FILE_SYSTEM.exists(preprocessedOutputDir)) {
+ LOGGER.warn("Found output folder {}, deleting", preprocessedOutputDir);
+ HadoopUtils.DEFAULT_FILE_SYSTEM.delete(preprocessedOutputDir, true);
}
-
- // Mapper configs.
- job.setMapperClass(SegmentPreprocessingMapper.class);
- job.setMapOutputKeyClass(AvroKey.class);
- job.setMapOutputValueClass(AvroValue.class);
- job.getConfiguration().setInt(JobContext.NUM_MAPS, numInputPaths);
-
- // Reducer configs.
- job.setReducerClass(SegmentPreprocessingReducer.class);
- job.setOutputKeyClass(AvroKey.class);
- job.setOutputValueClass(NullWritable.class);
- }
-
- private void setSchemaParams(Job job, Schema avroSchema)
- throws IOException {
- AvroMultipleOutputs.addNamedOutput(job, "avro", AvroKeyOutputFormat.class, avroSchema);
- AvroMultipleOutputs.setCountersEnabled(job, true);
- // Use LazyOutputFormat to avoid creating empty files.
- LazyOutputFormat.setOutputFormatClass(job, AvroKeyOutputFormat.class);
-
- // Input and output paths.
- FileInputFormat.setInputPaths(job, _inputSegmentDir);
- FileOutputFormat.setOutputPath(job, _preprocessedOutputDir);
}
}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/InternalConfigConstants.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/InternalConfigConstants.java
index 3a7938d..3701db2 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/InternalConfigConstants.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/InternalConfigConstants.java
@@ -31,19 +31,26 @@ public class InternalConfigConstants {
public static final String SEGMENT_TIME_FORMAT = "segment.time.format";
public static final String SEGMENT_TIME_SDF_PATTERN = "segment.time.sdf.pattern";
+ // The operations of preprocessing that is enabled.
+ public static final String PREPROCESS_OPERATIONS = "preprocessing.operations";
+
// Partitioning configs
public static final String PARTITION_COLUMN_CONFIG = "partition.column";
public static final String NUM_PARTITIONS_CONFIG = "num.partitions";
public static final String PARTITION_FUNCTION_CONFIG = "partition.function";
- public static final String SORTED_COLUMN_CONFIG = "sorted.column";
+ public static final String SORTING_COLUMN_CONFIG = "sorting.column";
+ public static final String SORTING_COLUMN_TYPE = "sorting.type";
public static final String ENABLE_PARTITIONING = "enable.partitioning";
- // max records per file in each partition. No effect otherwise.
+ @Deprecated
+ // Use PREPROCESSING_MAX_NUM_RECORDS_PER_FILE.
public static final String PARTITION_MAX_RECORDS_PER_FILE = "partition.max.records.per.file";
+ // max records per file in each partition. No effect otherwise.
+ public static final String PREPROCESSING_MAX_NUM_RECORDS_PER_FILE = "preprocessing.max.num.records.per.file";
// Number of segments we want generated.
- public static final String PREPROCESS_NUM_FILES = "preprocess.num.files";
+ public static final String PREPROCESSING_NUM_REDUCERS = "preprocessing.num.reducers";
public static final String FAIL_ON_SCHEMA_MISMATCH = "fail.on.schema.mismatch";
}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/AvroDataPreprocessingMapper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/AvroDataPreprocessingMapper.java
new file mode 100644
index 0000000..6278e8e
--- /dev/null
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/AvroDataPreprocessingMapper.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.hadoop.job.mappers;
+
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapred.AvroValue;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.pinot.hadoop.job.InternalConfigConstants;
+import org.apache.pinot.hadoop.utils.preprocess.DataPreprocessingUtils;
+import org.apache.pinot.plugin.inputformat.avro.AvroRecordExtractor;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class AvroDataPreprocessingMapper extends Mapper<AvroKey<GenericRecord>, NullWritable, WritableComparable, AvroValue<GenericRecord>> {
+ private static final Logger LOGGER = LoggerFactory.getLogger(AvroDataPreprocessingMapper.class);
+
+ private String _sortingColumn = null;
+ private FieldSpec.DataType _sortingColumnType = null;
+ private AvroRecordExtractor _avroRecordExtractor;
+
+ @Override
+ public void setup(Context context) {
+ Configuration configuration = context.getConfiguration();
+ _avroRecordExtractor = new AvroRecordExtractor();
+ String sortingColumnConfig = configuration.get(InternalConfigConstants.SORTING_COLUMN_CONFIG);
+ if (sortingColumnConfig != null) {
+ _sortingColumn = sortingColumnConfig;
+ _sortingColumnType = FieldSpec.DataType.valueOf(configuration.get(InternalConfigConstants.SORTING_COLUMN_TYPE));
+ LOGGER.info("Initialized AvroDataPreprocessingMapper with sortingColumn: {} of type: {}", _sortingColumn,
+ _sortingColumnType);
+ } else {
+ LOGGER.info("Initialized AvroDataPreprocessingMapper without sorting column");
+ }
+ }
+
+ @Override
+ public void map(AvroKey<GenericRecord> key, NullWritable value, Context context)
+ throws IOException, InterruptedException {
+ GenericRecord record = key.datum();
+ if (_sortingColumn != null) {
+ Object object = record.get(_sortingColumn);
+ Preconditions
+ .checkState(object != null, "Failed to find value for sorting column: %s in record: %s", _sortingColumn,
+ record);
+ Object convertedValue = _avroRecordExtractor.convert(object);
+ Preconditions.checkState(convertedValue != null, "Invalid value: %s for sorting column: %s in record: %s", object,
+ _sortingColumn, record);
+ WritableComparable outputKey;
+ try {
+ outputKey = DataPreprocessingUtils.convertToWritableComparable(convertedValue, _sortingColumnType);
+ } catch (Exception e) {
+ throw new IllegalStateException(
+ String.format("Caught exception while processing sorting column: %s in record: %s", _sortingColumn, record),
+ e);
+ }
+ context.write(outputKey, new AvroValue<>(record));
+ } else {
+ context.write(NullWritable.get(), new AvroValue<>(record));
+ }
+ }
+}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/OrcDataPreprocessingMapper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/OrcDataPreprocessingMapper.java
new file mode 100644
index 0000000..d7d0694
--- /dev/null
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/OrcDataPreprocessingMapper.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.hadoop.job.mappers;
+
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.orc.mapred.OrcStruct;
+import org.apache.orc.mapred.OrcValue;
+import org.apache.pinot.hadoop.job.InternalConfigConstants;
+import org.apache.pinot.hadoop.utils.preprocess.DataPreprocessingUtils;
+import org.apache.pinot.hadoop.utils.preprocess.OrcUtils;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class OrcDataPreprocessingMapper extends Mapper<NullWritable, OrcStruct, WritableComparable, OrcValue> {
+ private static final Logger LOGGER = LoggerFactory.getLogger(OrcDataPreprocessingMapper.class);
+
+ private final OrcValue _valueWrapper = new OrcValue();
+ private String _sortingColumn = null;
+ private FieldSpec.DataType _sortingColumnType = null;
+ private int _sortingColumnId = -1;
+
+ @Override
+ public void setup(Context context) {
+ Configuration configuration = context.getConfiguration();
+ String sortingColumnConfig = configuration.get(InternalConfigConstants.SORTING_COLUMN_CONFIG);
+ if (sortingColumnConfig != null) {
+ _sortingColumn = sortingColumnConfig;
+ _sortingColumnType = FieldSpec.DataType.valueOf(configuration.get(InternalConfigConstants.SORTING_COLUMN_TYPE));
+ LOGGER.info("Initialized OrcDataPreprocessingMapper with sortingColumn: {} of type: {}", _sortingColumn,
+ _sortingColumnType);
+ } else {
+ LOGGER.info("Initialized OrcDataPreprocessingMapper without sorting column");
+ }
+ }
+
+ @Override
+ public void map(NullWritable key, OrcStruct value, Context context)
+ throws IOException, InterruptedException {
+ _valueWrapper.value = value;
+ if (_sortingColumn != null) {
+ if (_sortingColumnId == -1) {
+ List<String> fieldNames = value.getSchema().getFieldNames();
+ _sortingColumnId = fieldNames.indexOf(_sortingColumn);
+ Preconditions.checkState(_sortingColumnId != -1, "Failed to find sorting column: %s in the ORC fields: %s",
+ _sortingColumn, fieldNames);
+ LOGGER.info("Field id for sorting column: {} is: {}", _sortingColumn, _sortingColumnId);
+ }
+ WritableComparable sortingColumnValue = value.getFieldValue(_sortingColumnId);
+ WritableComparable outputKey;
+ try {
+ outputKey = DataPreprocessingUtils
+ .convertToWritableComparable(OrcUtils.convert(sortingColumnValue), _sortingColumnType);
+ } catch (Exception e) {
+ throw new IllegalStateException(String
+ .format("Caught exception while processing sorting column: %s, id: %d in ORC struct: %s", _sortingColumn,
+ _sortingColumnId, value), e);
+ }
+ context.write(outputKey, _valueWrapper);
+ } else {
+ context.write(NullWritable.get(), _valueWrapper);
+ }
+ }
+}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentPreprocessingMapper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentPreprocessingMapper.java
index e3f088d..3d3fcec 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentPreprocessingMapper.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentPreprocessingMapper.java
@@ -20,6 +20,7 @@ package org.apache.pinot.hadoop.job.mappers;
import com.google.common.base.Preconditions;
import java.io.IOException;
+import java.util.Map;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
@@ -40,6 +41,7 @@ import org.slf4j.LoggerFactory;
public class SegmentPreprocessingMapper extends Mapper<AvroKey<GenericRecord>, NullWritable, AvroKey<GenericRecord>, AvroValue<GenericRecord>> {
private static final Logger LOGGER = LoggerFactory.getLogger(SegmentPreprocessingMapper.class);
+ private Configuration _jobConf;
private String _sortedColumn = null;
private String _timeColumn = null;
private Schema _outputKeySchema;
@@ -52,43 +54,42 @@ public class SegmentPreprocessingMapper extends Mapper<AvroKey<GenericRecord>, N
@Override
public void setup(final Context context) {
- Configuration configuration = context.getConfiguration();
-
- String tableName = configuration.get(JobConfigConstants.SEGMENT_TABLE_NAME);
-
- _isAppend = configuration.get(InternalConfigConstants.IS_APPEND).equalsIgnoreCase("true");
+ _jobConf = context.getConfiguration();
+ logConfigurations();
+ String tableName = _jobConf.get(JobConfigConstants.SEGMENT_TABLE_NAME);
+ _isAppend = "true".equalsIgnoreCase(_jobConf.get(InternalConfigConstants.IS_APPEND));
if (_isAppend) {
// Get time column name
- _timeColumn = configuration.get(InternalConfigConstants.TIME_COLUMN_CONFIG);
+ _timeColumn = _jobConf.get(InternalConfigConstants.TIME_COLUMN_CONFIG);
// Get sample time column value
- String timeColumnValue = configuration.get(InternalConfigConstants.TIME_COLUMN_VALUE);
- String pushFrequency = configuration.get(InternalConfigConstants.SEGMENT_PUSH_FREQUENCY);
+ String timeColumnValue = _jobConf.get(InternalConfigConstants.TIME_COLUMN_VALUE);
+ String pushFrequency = _jobConf.get(InternalConfigConstants.SEGMENT_PUSH_FREQUENCY);
- String timeType = configuration.get(InternalConfigConstants.SEGMENT_TIME_TYPE);
- String timeFormat = configuration.get(InternalConfigConstants.SEGMENT_TIME_FORMAT);
+ String timeType = _jobConf.get(InternalConfigConstants.SEGMENT_TIME_TYPE);
+ String timeFormat = _jobConf.get(InternalConfigConstants.SEGMENT_TIME_FORMAT);
DateTimeFormatSpec dateTimeFormatSpec;
if (timeFormat.equals(DateTimeFieldSpec.TimeFormat.EPOCH.toString())) {
dateTimeFormatSpec = new DateTimeFormatSpec(1, timeType, timeFormat);
} else {
dateTimeFormatSpec = new DateTimeFormatSpec(1, timeType, timeFormat,
- configuration.get(InternalConfigConstants.SEGMENT_TIME_SDF_PATTERN));
+ _jobConf.get(InternalConfigConstants.SEGMENT_TIME_SDF_PATTERN));
}
_normalizedDateSegmentNameGenerator =
new NormalizedDateSegmentNameGenerator(tableName, null, false, "APPEND", pushFrequency, dateTimeFormatSpec);
_sampleNormalizedTimeColumnValue = _normalizedDateSegmentNameGenerator.getNormalizedDate(timeColumnValue);
}
- String sortedColumn = configuration.get(InternalConfigConstants.SORTED_COLUMN_CONFIG);
+ String sortedColumn = _jobConf.get(InternalConfigConstants.SORTING_COLUMN_CONFIG);
// Logging the configs for the mapper
LOGGER.info("Sorted Column: " + sortedColumn);
if (sortedColumn != null) {
_sortedColumn = sortedColumn;
}
- _outputKeySchema = AvroJob.getMapOutputKeySchema(configuration);
- _outputSchema = AvroJob.getMapOutputValueSchema(configuration);
- _enablePartitioning = Boolean.parseBoolean(configuration.get(InternalConfigConstants.ENABLE_PARTITIONING, "false"));
+ _outputKeySchema = AvroJob.getMapOutputKeySchema(_jobConf);
+ _outputSchema = AvroJob.getMapOutputValueSchema(_jobConf);
+ _enablePartitioning = Boolean.parseBoolean(_jobConf.get(InternalConfigConstants.ENABLE_PARTITIONING, "false"));
}
@Override
@@ -130,4 +131,26 @@ public class SegmentPreprocessingMapper extends Mapper<AvroKey<GenericRecord>, N
throw e;
}
}
+
+ protected void logConfigurations() {
+ StringBuilder stringBuilder = new StringBuilder();
+ stringBuilder.append('{');
+ boolean firstEntry = true;
+ for (Map.Entry<String, String> entry : _jobConf) {
+ if (!firstEntry) {
+ stringBuilder.append(", \n");
+ } else {
+ firstEntry = false;
+ }
+
+ stringBuilder.append(entry.getKey());
+ stringBuilder.append('=');
+ stringBuilder.append(entry.getValue());
+ }
+ stringBuilder.append('}');
+
+ LOGGER.info("*********************************************************************");
+ LOGGER.info("Job Configurations: {}", stringBuilder.toString());
+ LOGGER.info("*********************************************************************");
+ }
}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/AvroDataPreprocessingPartitioner.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/AvroDataPreprocessingPartitioner.java
new file mode 100644
index 0000000..74799c7
--- /dev/null
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/AvroDataPreprocessingPartitioner.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.hadoop.job.partitioners;
+
+import com.google.common.base.Preconditions;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.mapred.AvroValue;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.pinot.hadoop.job.InternalConfigConstants;
+import org.apache.pinot.plugin.inputformat.avro.AvroRecordExtractor;
+import org.apache.pinot.segment.spi.partition.PartitionFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class AvroDataPreprocessingPartitioner extends Partitioner<WritableComparable, AvroValue<GenericRecord>> implements Configurable {
+ private static final Logger LOGGER = LoggerFactory.getLogger(AvroDataPreprocessingPartitioner.class);
+
+ private Configuration _conf;
+ private String _partitionColumn;
+ private PartitionFunction _partitionFunction;
+ private AvroRecordExtractor _avroRecordExtractor;
+
+ @Override
+ public void setConf(Configuration conf) {
+ _conf = conf;
+ _avroRecordExtractor = new AvroRecordExtractor();
+ _partitionColumn = conf.get(InternalConfigConstants.PARTITION_COLUMN_CONFIG);
+ String partitionFunctionName = conf.get(InternalConfigConstants.PARTITION_FUNCTION_CONFIG);
+ int numPartitions = Integer.parseInt(conf.get(InternalConfigConstants.NUM_PARTITIONS_CONFIG));
+ _partitionFunction = PartitionFunctionFactory.getPartitionFunction(partitionFunctionName, numPartitions);
+ LOGGER.info(
+ "Initialized AvroDataPreprocessingPartitioner with partitionColumn: {}, partitionFunction: {}, numPartitions: {}",
+ _partitionColumn, partitionFunctionName, numPartitions);
+ }
+
+ @Override
+ public Configuration getConf() {
+ return _conf;
+ }
+
+ @Override
+ public int getPartition(WritableComparable key, AvroValue<GenericRecord> value, int numPartitions) {
+ GenericRecord record = value.datum();
+ Object object = record.get(_partitionColumn);
+ Preconditions
+ .checkState(object != null, "Failed to find value for partition column: %s in record: %s", _partitionColumn,
+ record);
+ Object convertedValue = _avroRecordExtractor.convert(object);
+ Preconditions.checkState(convertedValue != null, "Invalid value: %s for partition column: %s in record: %s", object,
+ _partitionColumn, record);
+ Preconditions.checkState(convertedValue instanceof Number || convertedValue instanceof String,
+ "Value for partition column: %s must be either a Number or a String, found: %s in record: %s", _partitionColumn,
+ convertedValue.getClass(), record);
+ // NOTE: Always partition with String type value because Broker uses String type value to prune segments
+ return _partitionFunction.getPartition(convertedValue.toString());
+ }
+}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/OrcDataPreprocessingPartitioner.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/OrcDataPreprocessingPartitioner.java
new file mode 100644
index 0000000..bef2cef
--- /dev/null
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/OrcDataPreprocessingPartitioner.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.hadoop.job.partitioners;
+
+import com.google.common.base.Preconditions;
+import java.util.List;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.orc.mapred.OrcStruct;
+import org.apache.orc.mapred.OrcValue;
+import org.apache.pinot.hadoop.job.InternalConfigConstants;
+import org.apache.pinot.hadoop.utils.preprocess.OrcUtils;
+import org.apache.pinot.segment.spi.partition.PartitionFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class OrcDataPreprocessingPartitioner extends Partitioner<WritableComparable, OrcValue> implements Configurable {
+ private static final Logger LOGGER = LoggerFactory.getLogger(OrcDataPreprocessingPartitioner.class);
+
+ private Configuration _conf;
+ private String _partitionColumn;
+ private PartitionFunction _partitionFunction;
+ private int _partitionColumnId = -1;
+
+ @Override
+ public void setConf(Configuration conf) {
+ _conf = conf;
+ _partitionColumn = conf.get(InternalConfigConstants.PARTITION_COLUMN_CONFIG);
+ String partitionFunctionName = conf.get(InternalConfigConstants.PARTITION_FUNCTION_CONFIG);
+ int numPartitions = Integer.parseInt(conf.get(InternalConfigConstants.NUM_PARTITIONS_CONFIG));
+ _partitionFunction = PartitionFunctionFactory.getPartitionFunction(partitionFunctionName, numPartitions);
+ LOGGER.info(
+ "Initialized OrcDataPreprocessingPartitioner with partitionColumn: {}, partitionFunction: {}, numPartitions: {}",
+ _partitionColumn, partitionFunctionName, numPartitions);
+ }
+
+ @Override
+ public Configuration getConf() {
+ return _conf;
+ }
+
+ @Override
+ public int getPartition(WritableComparable key, OrcValue value, int numPartitions) {
+ OrcStruct orcStruct = (OrcStruct) value.value;
+ if (_partitionColumnId == -1) {
+ List<String> fieldNames = orcStruct.getSchema().getFieldNames();
+ _partitionColumnId = fieldNames.indexOf(_partitionColumn);
+ Preconditions.checkState(_partitionColumnId != -1, "Failed to find partition column: %s in the ORC fields: %s",
+ _partitionColumn, fieldNames);
+ LOGGER.info("Field id for partition column: {} is: {}", _partitionColumn, _partitionColumnId);
+ }
+ WritableComparable partitionColumnValue = orcStruct.getFieldValue(_partitionColumnId);
+ Object convertedValue;
+ try {
+ convertedValue = OrcUtils.convert(partitionColumnValue);
+ } catch (Exception e) {
+ throw new IllegalStateException(String
+ .format("Caught exception while processing partition column: %s, id: %d in ORC struct: %s", _partitionColumn,
+ _partitionColumnId, orcStruct), e);
+ }
+ // NOTE: Always partition with String type value because Broker uses String type value to prune segments
+ return _partitionFunction.getPartition(convertedValue.toString());
+ }
+}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/AvroDataPreprocessingHelper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/AvroDataPreprocessingHelper.java
new file mode 100644
index 0000000..9e5f5f2
--- /dev/null
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/AvroDataPreprocessingHelper.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.hadoop.job.preprocess;
+
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.util.List;
+import java.util.zip.GZIPInputStream;
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapreduce.AvroJob;
+import org.apache.avro.mapreduce.AvroKeyInputFormat;
+import org.apache.avro.mapreduce.AvroKeyOutputFormat;
+import org.apache.avro.mapreduce.AvroMultipleOutputs;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
+import org.apache.pinot.hadoop.job.mappers.AvroDataPreprocessingMapper;
+import org.apache.pinot.hadoop.job.partitioners.AvroDataPreprocessingPartitioner;
+import org.apache.pinot.hadoop.job.reducers.AvroDataPreprocessingReducer;
+import org.apache.pinot.hadoop.utils.preprocess.HadoopUtils;
+import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class AvroDataPreprocessingHelper extends DataPreprocessingHelper {
+ private static final Logger LOGGER = LoggerFactory.getLogger(AvroDataPreprocessingHelper.class);
+
+ public AvroDataPreprocessingHelper(List<Path> inputDataPaths, Path outputPath) {
+ super(inputDataPaths, outputPath);
+ }
+
+ @Override
+ public Class<? extends Partitioner> getPartitioner() {
+ return AvroDataPreprocessingPartitioner.class;
+ }
+
+ @Override
+ public void setUpMapperReducerConfigs(Job job)
+ throws IOException {
+ Schema avroSchema = getAvroSchema(_sampleRawDataPath);
+ LOGGER.info("Avro schema is: {}", avroSchema.toString(true));
+ validateConfigsAgainstSchema(avroSchema);
+
+ job.setInputFormatClass(AvroKeyInputFormat.class);
+ job.setMapperClass(AvroDataPreprocessingMapper.class);
+
+ job.setReducerClass(AvroDataPreprocessingReducer.class);
+ AvroMultipleOutputs.addNamedOutput(job, "avro", AvroKeyOutputFormat.class, avroSchema);
+ AvroMultipleOutputs.setCountersEnabled(job, true);
+ // Use LazyOutputFormat to avoid creating empty files.
+ LazyOutputFormat.setOutputFormatClass(job, AvroKeyOutputFormat.class);
+ job.setOutputKeyClass(AvroKey.class);
+ job.setOutputValueClass(NullWritable.class);
+
+ AvroJob.setInputKeySchema(job, avroSchema);
+ AvroJob.setMapOutputValueSchema(job, avroSchema);
+ AvroJob.setOutputKeySchema(job, avroSchema);
+ }
+
+ @Override
+ String getSampleTimeColumnValue(String timeColumnName)
+ throws IOException {
+ String sampleTimeColumnValue;
+ try (DataFileStream<GenericRecord> dataStreamReader = getAvroReader(_sampleRawDataPath)) {
+ sampleTimeColumnValue = dataStreamReader.next().get(timeColumnName).toString();
+ }
+ return sampleTimeColumnValue;
+ }
+
+ /**
+ * Finds the avro file in the input folder, and returns its avro schema
+ * @param inputPathDir Path to input directory
+ * @return Input schema
+ * @throws IOException exception when accessing to IO
+ */
+ private Schema getAvroSchema(Path inputPathDir)
+ throws IOException {
+ Schema avroSchema = null;
+ for (FileStatus fileStatus : HadoopUtils.DEFAULT_FILE_SYSTEM.listStatus(inputPathDir)) {
+ if (fileStatus.isFile() && fileStatus.getPath().getName().endsWith(".avro")) {
+ LOGGER.info("Extracting schema from " + fileStatus.getPath());
+ try (DataFileStream<GenericRecord> dataStreamReader = getAvroReader(inputPathDir)) {
+ avroSchema = dataStreamReader.getSchema();
+ }
+ break;
+ }
+ }
+ return avroSchema;
+ }
+
+ /**
+ * Helper method that returns avro reader for the given avro file.
+ * If file name ends in 'gz' then returns the GZIP version, otherwise gives the regular reader.
+ *
+ * @param avroFile File to read
+ * @return Avro reader for the file.
+ * @throws IOException exception when accessing to IO
+ */
+ private DataFileStream<GenericRecord> getAvroReader(Path avroFile)
+ throws IOException {
+ FileSystem fs = FileSystem.get(new Configuration());
+ if (avroFile.getName().endsWith("gz")) {
+ return new DataFileStream<>(new GZIPInputStream(fs.open(avroFile)), new GenericDatumReader<>());
+ } else {
+ return new DataFileStream<>(fs.open(avroFile), new GenericDatumReader<>());
+ }
+ }
+
+ private void validateConfigsAgainstSchema(Schema schema) {
+ if (_partitionColumn != null) {
+ Preconditions.checkArgument(schema.getField(_partitionColumn) != null,
+ String.format("Partition column: %s is not found from the schema of input files.", _partitionColumn));
+ Preconditions.checkArgument(_numPartitions > 0,
+ String.format("Number of partitions should be positive. Current value: %s", _numPartitions));
+ Preconditions.checkArgument(_partitionFunction != null, "Partition function should not be null!");
+ try {
+ PartitionFunctionFactory.PartitionFunctionType.fromString(_partitionFunction);
+ } catch (IllegalArgumentException e) {
+ LOGGER.error("Partition function needs to be one of Modulo, Murmur, ByteArray, HashCode, it is currently {}",
+ _partitionColumn);
+ throw new IllegalArgumentException(e);
+ }
+ }
+ if (_sortingColumn != null) {
+ Preconditions.checkArgument(schema.getField(_sortingColumn) != null,
+ String.format("Sorted column: %s is not found from the schema of input files.", _sortingColumn));
+ }
+ }
+}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelper.java
new file mode 100644
index 0000000..a505d09
--- /dev/null
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelper.java
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.hadoop.job.preprocess;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.pinot.hadoop.job.HadoopSegmentPreprocessingJob;
+import org.apache.pinot.hadoop.job.InternalConfigConstants;
+import org.apache.pinot.hadoop.job.partitioners.GenericPartitioner;
+import org.apache.pinot.hadoop.utils.preprocess.HadoopUtils;
+import org.apache.pinot.hadoop.utils.preprocess.TextComparator;
+import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.DateTimeFieldSpec;
+import org.apache.pinot.spi.data.DateTimeFormatSpec;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.IngestionConfigUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public abstract class DataPreprocessingHelper {
+ private static final Logger LOGGER = LoggerFactory.getLogger(DataPreprocessingHelper.class);
+
+ String _partitionColumn;
+ int _numPartitions;
+ String _partitionFunction;
+
+ String _sortingColumn;
+ private FieldSpec.DataType _sortingColumnType;
+
+ private int _numOutputFiles;
+ private int _maxNumRecordsPerFile;
+
+ private TableConfig _tableConfig;
+ private Schema _pinotTableSchema;
+
+ List<Path> _inputDataPaths;
+ Path _sampleRawDataPath;
+ Path _outputPath;
+
+ public DataPreprocessingHelper(List<Path> inputDataPaths, Path outputPath) {
+ _inputDataPaths = inputDataPaths;
+ _sampleRawDataPath = inputDataPaths.get(0);
+ _outputPath = outputPath;
+ }
+
+ public void registerConfigs(TableConfig tableConfig, Schema tableSchema, String partitionColumn, int numPartitions,
+ String partitionFunction, String sortingColumn, FieldSpec.DataType sortingColumnType, int numOutputFiles,
+ int maxNumRecordsPerFile) {
+ _tableConfig = tableConfig;
+ _pinotTableSchema = tableSchema;
+ _partitionColumn = partitionColumn;
+ _numPartitions = numPartitions;
+ _partitionFunction = partitionFunction;
+
+ _sortingColumn = sortingColumn;
+ _sortingColumnType = sortingColumnType;
+
+ _numOutputFiles = numOutputFiles;
+ _maxNumRecordsPerFile = maxNumRecordsPerFile;
+ }
+
+ public Job setUpJob()
+ throws IOException {
+ LOGGER.info("Initializing a pre-processing job");
+ Job job = Job.getInstance(HadoopUtils.DEFAULT_CONFIGURATION);
+ Configuration jobConf = job.getConfiguration();
+ // Input and output paths.
+ int numInputPaths = _inputDataPaths.size();
+ jobConf.setInt(JobContext.NUM_MAPS, numInputPaths);
+ setValidationConfigs(job, _sampleRawDataPath);
+ for (Path inputFile : _inputDataPaths) {
+ FileInputFormat.addInputPath(job, inputFile);
+ }
+ setHadoopJobConfigs(job);
+
+ // Sorting column
+ if (_sortingColumn != null) {
+ LOGGER.info("Adding sorting column: {} to job config", _sortingColumn);
+ jobConf.set(InternalConfigConstants.SORTING_COLUMN_CONFIG, _sortingColumn);
+ jobConf.set(InternalConfigConstants.SORTING_COLUMN_TYPE, _sortingColumnType.name());
+
+ switch (_sortingColumnType) {
+ case INT:
+ job.setMapOutputKeyClass(IntWritable.class);
+ break;
+ case LONG:
+ job.setMapOutputKeyClass(LongWritable.class);
+ break;
+ case FLOAT:
+ job.setMapOutputKeyClass(FloatWritable.class);
+ break;
+ case DOUBLE:
+ job.setMapOutputKeyClass(DoubleWritable.class);
+ break;
+ case STRING:
+ job.setMapOutputKeyClass(Text.class);
+ job.setSortComparatorClass(TextComparator.class);
+ break;
+ default:
+ throw new IllegalStateException();
+ }
+ } else {
+ job.setMapOutputKeyClass(NullWritable.class);
+ }
+
+ // Partition column
+ int numReduceTasks = 0;
+ if (_partitionColumn != null) {
+ numReduceTasks = _numPartitions;
+ jobConf.set(InternalConfigConstants.ENABLE_PARTITIONING, "true");
+ job.setPartitionerClass(GenericPartitioner.class);
+ jobConf.set(InternalConfigConstants.PARTITION_COLUMN_CONFIG, _partitionColumn);
+ if (_partitionFunction != null) {
+ jobConf.set(InternalConfigConstants.PARTITION_FUNCTION_CONFIG, _partitionFunction);
+ }
+ jobConf.setInt(InternalConfigConstants.NUM_PARTITIONS_CONFIG, numReduceTasks);
+ job.setPartitionerClass(getPartitioner());
+ } else {
+ if (_numOutputFiles > 0) {
+ numReduceTasks = _numOutputFiles;
+ } else {
+ // default number of input paths
+ numReduceTasks = _inputDataPaths.size();
+ }
+ }
+ // Maximum number of records per output file
+ jobConf
+ .set(InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE, Integer.toString(_maxNumRecordsPerFile));
+ // Number of reducers
+ LOGGER.info("Number of reduce tasks for pre-processing job: {}", numReduceTasks);
+ job.setNumReduceTasks(numReduceTasks);
+
+ setUpMapperReducerConfigs(job);
+
+ return job;
+ }
+
+ abstract Class<? extends Partitioner> getPartitioner();
+
+ abstract void setUpMapperReducerConfigs(Job job)
+ throws IOException;
+
+ abstract String getSampleTimeColumnValue(String timeColumnName)
+ throws IOException;
+
+ private void setValidationConfigs(Job job, Path path)
+ throws IOException {
+ SegmentsValidationAndRetentionConfig validationConfig = _tableConfig.getValidationConfig();
+
+ // TODO: Serialize and deserialize validation config by creating toJson and fromJson
+ // If the use case is an append use case, check that one time unit is contained in one file. If there is more than one,
+ // the job should be disabled, as we should not resize for these use cases. Therefore, setting the time column name
+ // and value
+ if (IngestionConfigUtils.getBatchSegmentIngestionType(_tableConfig).equalsIgnoreCase("APPEND")) {
+ job.getConfiguration().set(InternalConfigConstants.IS_APPEND, "true");
+ String timeColumnName = validationConfig.getTimeColumnName();
+ job.getConfiguration().set(InternalConfigConstants.TIME_COLUMN_CONFIG, timeColumnName);
+ if (timeColumnName != null) {
+ DateTimeFieldSpec dateTimeFieldSpec = _pinotTableSchema.getSpecForTimeColumn(timeColumnName);
+ if (dateTimeFieldSpec != null) {
+ DateTimeFormatSpec formatSpec = new DateTimeFormatSpec(dateTimeFieldSpec.getFormat());
+ job.getConfiguration().set(InternalConfigConstants.SEGMENT_TIME_TYPE, formatSpec.getColumnUnit().toString());
+ job.getConfiguration()
+ .set(InternalConfigConstants.SEGMENT_TIME_FORMAT, formatSpec.getTimeFormat().toString());
+ job.getConfiguration().set(InternalConfigConstants.SEGMENT_TIME_SDF_PATTERN, formatSpec.getSDFPattern());
+ }
+ }
+ job.getConfiguration().set(InternalConfigConstants.SEGMENT_PUSH_FREQUENCY,
+ IngestionConfigUtils.getBatchSegmentIngestionFrequency(_tableConfig));
+
+ String sampleTimeColumnValue = getSampleTimeColumnValue(timeColumnName);
+ if (sampleTimeColumnValue != null) {
+ job.getConfiguration().set(InternalConfigConstants.TIME_COLUMN_VALUE, sampleTimeColumnValue);
+ }
+ }
+ }
+
+ private void setHadoopJobConfigs(Job job) {
+ job.setJarByClass(HadoopSegmentPreprocessingJob.class);
+ job.setJobName(getClass().getName());
+ FileOutputFormat.setOutputPath(job, _outputPath);
+ job.getConfiguration().set(JobContext.JOB_NAME, this.getClass().getName());
+ // Turn this on to always firstly use class paths that user specifies.
+ job.getConfiguration().set(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, "true");
+ // Turn this off since we don't need an empty file in the output directory
+ job.getConfiguration().set(FileOutputCommitter.SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, "false");
+
+ String hadoopTokenFileLocation = System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
+ if (hadoopTokenFileLocation != null) {
+ job.getConfiguration().set(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY, hadoopTokenFileLocation);
+ }
+ }
+}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelperFactory.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelperFactory.java
new file mode 100644
index 0000000..2e91773
--- /dev/null
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelperFactory.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.hadoop.job.preprocess;
+
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.util.List;
+import org.apache.hadoop.fs.Path;
+import org.apache.pinot.hadoop.utils.preprocess.DataFileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class DataPreprocessingHelperFactory {
+ private static final Logger LOGGER = LoggerFactory.getLogger(DataPreprocessingHelperFactory.class);
+
+ public static DataPreprocessingHelper generateDataPreprocessingHelper(Path inputPaths, Path outputPath)
+ throws IOException {
+ final List<Path> avroFiles = DataFileUtils.getDataFiles(inputPaths, DataFileUtils.AVRO_FILE_EXTENSION);
+ final List<Path> orcFiles = DataFileUtils.getDataFiles(inputPaths, DataFileUtils.ORC_FILE_EXTENSION);
+
+ int numAvroFiles = avroFiles.size();
+ int numOrcFiles = orcFiles.size();
+ Preconditions.checkState(numAvroFiles == 0 || numOrcFiles == 0,
+ "Cannot preprocess mixed AVRO files: %s and ORC files: %s in directories: %s", avroFiles, orcFiles,
+ inputPaths);
+ Preconditions
+ .checkState(numAvroFiles > 0 || numOrcFiles > 0, "Failed to find any AVRO or ORC file in directories: %s",
+ inputPaths);
+
+ if (numAvroFiles > 0) {
+ LOGGER.info("Found AVRO files: {} in directories: {}", avroFiles, inputPaths);
+ return new AvroDataPreprocessingHelper(avroFiles, outputPath);
+ } else {
+ LOGGER.info("Found ORC files: {} in directories: {}", orcFiles, inputPaths);
+ return new OrcDataPreprocessingHelper(orcFiles, outputPath);
+ }
+ }
+}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/OrcDataPreprocessingHelper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/OrcDataPreprocessingHelper.java
new file mode 100644
index 0000000..aec0bb0
--- /dev/null
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/OrcDataPreprocessingHelper.java
@@ -0,0 +1,231 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.hadoop.job.preprocess;
+
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
+import org.apache.orc.OrcConf;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.RecordReader;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.mapred.OrcStruct;
+import org.apache.orc.mapred.OrcValue;
+import org.apache.orc.mapreduce.OrcInputFormat;
+import org.apache.orc.mapreduce.OrcOutputFormat;
+import org.apache.pinot.hadoop.job.mappers.OrcDataPreprocessingMapper;
+import org.apache.pinot.hadoop.job.partitioners.OrcDataPreprocessingPartitioner;
+import org.apache.pinot.hadoop.job.reducers.OrcDataPreprocessingReducer;
+import org.apache.pinot.hadoop.utils.preprocess.HadoopUtils;
+import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory;
+import org.apache.pinot.spi.utils.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class OrcDataPreprocessingHelper extends DataPreprocessingHelper {
+ private static final Logger LOGGER = LoggerFactory.getLogger(OrcDataPreprocessingHelper.class);
+
+ public OrcDataPreprocessingHelper(List<Path> inputDataPaths, Path outputPath) {
+ super(inputDataPaths, outputPath);
+ }
+
+ @Override
+ Class<? extends Partitioner> getPartitioner() {
+ return OrcDataPreprocessingPartitioner.class;
+ }
+
+ @Override
+ void setUpMapperReducerConfigs(Job job) {
+ TypeDescription orcSchema = getOrcSchema(_sampleRawDataPath);
+ String orcSchemaString = orcSchema.toString();
+ LOGGER.info("Orc schema is: {}", orcSchemaString);
+ validateConfigsAgainstSchema(orcSchema);
+
+ job.setInputFormatClass(OrcInputFormat.class);
+ job.setMapperClass(OrcDataPreprocessingMapper.class);
+ job.setMapOutputValueClass(OrcValue.class);
+ Configuration jobConf = job.getConfiguration();
+ OrcConf.MAPRED_SHUFFLE_VALUE_SCHEMA.setString(jobConf, orcSchemaString);
+
+ job.setReducerClass(OrcDataPreprocessingReducer.class);
+ // Use LazyOutputFormat to avoid creating empty files.
+ LazyOutputFormat.setOutputFormatClass(job, OrcOutputFormat.class);
+ job.setOutputKeyClass(NullWritable.class);
+ job.setOutputValueClass(OrcStruct.class);
+ OrcConf.MAPRED_OUTPUT_SCHEMA.setString(jobConf, orcSchemaString);
+ }
+
+ @Override
+ String getSampleTimeColumnValue(String timeColumnName)
+ throws IOException {
+ try (Reader reader = OrcFile
+ .createReader(_sampleRawDataPath, OrcFile.readerOptions(HadoopUtils.DEFAULT_CONFIGURATION))) {
+ Reader.Options options = new Reader.Options();
+ options.range(0, 1);
+ RecordReader records = reader.rows(options);
+ TypeDescription orcSchema = reader.getSchema();
+ VectorizedRowBatch vectorizedRowBatch = orcSchema.createRowBatch();
+
+ if (records.nextBatch(vectorizedRowBatch)) {
+ List<String> orcFields = orcSchema.getFieldNames();
+ int numFields = orcFields.size();
+ for (int i = 0; i < numFields; i++) {
+ String fieldName = orcFields.get(i);
+ if (timeColumnName.equals(fieldName)) {
+ ColumnVector columnVector = vectorizedRowBatch.cols[i];
+ TypeDescription fieldType = orcSchema.getChildren().get(i);
+ TypeDescription.Category category = fieldType.getCategory();
+ return getValue(fieldName, columnVector, category);
+ }
+ }
+ }
+ }
+ return null;
+ }
+
+ private String getValue(String field, ColumnVector columnVector, TypeDescription.Category category) {
+ switch (category) {
+ case BOOLEAN:
+ LongColumnVector longColumnVector = (LongColumnVector) columnVector;
+ if (longColumnVector.noNulls || !longColumnVector.isNull[0]) {
+ return Boolean.toString(longColumnVector.vector[0] == 1);
+ } else {
+ return null;
+ }
+ case BYTE:
+ case SHORT:
+ case INT:
+ // Extract to Integer
+ longColumnVector = (LongColumnVector) columnVector;
+ if (longColumnVector.noNulls || !longColumnVector.isNull[0]) {
+ return Integer.toString((int) longColumnVector.vector[0]);
+ } else {
+ return null;
+ }
+ case LONG:
+ case DATE:
+ // Extract to Long
+ longColumnVector = (LongColumnVector) columnVector;
+ if (longColumnVector.noNulls || !longColumnVector.isNull[0]) {
+ return Long.toString(longColumnVector.vector[0]);
+ } else {
+ return null;
+ }
+ case TIMESTAMP:
+ // Extract to Long
+ TimestampColumnVector timestampColumnVector = (TimestampColumnVector) columnVector;
+ if (timestampColumnVector.noNulls || !timestampColumnVector.isNull[0]) {
+ return Long.toString(timestampColumnVector.time[0]);
+ } else {
+ return null;
+ }
+ case FLOAT:
+ // Extract to Float
+ DoubleColumnVector doubleColumnVector = (DoubleColumnVector) columnVector;
+ if (doubleColumnVector.noNulls || !doubleColumnVector.isNull[0]) {
+ return Float.toString((float) doubleColumnVector.vector[0]);
+ } else {
+ return null;
+ }
+ case DOUBLE:
+ // Extract to Double
+ doubleColumnVector = (DoubleColumnVector) columnVector;
+ if (doubleColumnVector.noNulls || !doubleColumnVector.isNull[0]) {
+ return Double.toString(doubleColumnVector.vector[0]);
+ } else {
+ return null;
+ }
+ case STRING:
+ case VARCHAR:
+ case CHAR:
+ // Extract to String
+ BytesColumnVector bytesColumnVector = (BytesColumnVector) columnVector;
+ if (bytesColumnVector.noNulls || !bytesColumnVector.isNull[0]) {
+ int length = bytesColumnVector.length[0];
+ return StringUtils.decodeUtf8(bytesColumnVector.vector[0], bytesColumnVector.start[0], length);
+ } else {
+ return null;
+ }
+ case BINARY:
+ // Extract to byte[]
+ bytesColumnVector = (BytesColumnVector) columnVector;
+ if (bytesColumnVector.noNulls || !bytesColumnVector.isNull[0]) {
+ int length = bytesColumnVector.length[0];
+ byte[] bytes = new byte[length];
+ System.arraycopy(bytesColumnVector.vector[0], bytesColumnVector.start[0], bytes, 0, length);
+ return new String(bytes, StandardCharsets.UTF_8);
+ } else {
+ return null;
+ }
+ default:
+ // Unsupported types
+ throw new IllegalStateException("Unsupported field type: " + category + " for field: " + field);
+ }
+ }
+
+ /**
+ * Finds the orc file and return its orc schema.
+ */
+ private TypeDescription getOrcSchema(Path orcFile) {
+ TypeDescription orcSchema;
+ try (Reader reader = OrcFile.createReader(orcFile, OrcFile.readerOptions(HadoopUtils.DEFAULT_CONFIGURATION))) {
+ orcSchema = reader.getSchema();
+ } catch (Exception e) {
+ throw new IllegalStateException("Caught exception while extracting ORC schema from file: " + orcFile, e);
+ }
+ return orcSchema;
+ }
+
+ private void validateConfigsAgainstSchema(TypeDescription schema) {
+ List<String> fieldNames = schema.getFieldNames();
+ if (_partitionColumn != null) {
+ Preconditions.checkArgument(fieldNames.contains(_partitionColumn),
+ String.format("Partition column: %s is not found from the schema of input files.", _partitionColumn));
+ Preconditions.checkArgument(_numPartitions > 0,
+ String.format("Number of partitions should be positive. Current value: %s", _numPartitions));
+ Preconditions.checkArgument(_partitionFunction != null, "Partition function should not be null!");
+ try {
+ PartitionFunctionFactory.PartitionFunctionType.fromString(_partitionFunction);
+ } catch (IllegalArgumentException e) {
+ LOGGER.error("Partition function needs to be one of Modulo, Murmur, ByteArray, HashCode, it is currently {}",
+ _partitionColumn);
+ throw new IllegalArgumentException(e);
+ }
+ }
+ if (_sortingColumn != null) {
+ Preconditions.checkArgument(fieldNames.contains(_sortingColumn),
+ String.format("Sorted column: %s is not found from the schema of input files.", _sortingColumn));
+ }
+ }
+}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/SegmentPreprocessingReducer.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/AvroDataPreprocessingReducer.java
similarity index 63%
copy from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/SegmentPreprocessingReducer.java
copy to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/AvroDataPreprocessingReducer.java
index 9c07e6f..5fcbf10 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/SegmentPreprocessingReducer.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/AvroDataPreprocessingReducer.java
@@ -19,7 +19,6 @@
package org.apache.pinot.hadoop.job.reducers;
import java.io.IOException;
-import java.util.concurrent.atomic.AtomicInteger;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapred.AvroValue;
@@ -33,33 +32,43 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class SegmentPreprocessingReducer<T> extends Reducer<T, AvroValue<GenericRecord>, AvroKey<GenericRecord>, NullWritable> {
- private static final Logger LOGGER = LoggerFactory.getLogger(SegmentPreprocessingReducer.class);
+public class AvroDataPreprocessingReducer<T> extends Reducer<T, AvroValue<GenericRecord>, AvroKey<GenericRecord>, NullWritable> {
+ private static final Logger LOGGER = LoggerFactory.getLogger(AvroDataPreprocessingReducer.class);
private AvroMultipleOutputs _multipleOutputs;
- private AtomicInteger _counter;
- private int _maxNumberOfRecords;
+ private long _numRecords;
+ private int _maxNumRecordsPerFile;
private String _filePrefix;
@Override
public void setup(Context context) {
- LOGGER.info("Using multiple outputs strategy.");
Configuration configuration = context.getConfiguration();
- _multipleOutputs = new AvroMultipleOutputs(context);
- _counter = new AtomicInteger();
// If it's 0, the output file won't be split into multiple files.
// If not, output file will be split when the number of records reaches this number.
- _maxNumberOfRecords = configuration.getInt(InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE, 0);
- LOGGER.info("Maximum number of records per file: {}", _maxNumberOfRecords);
- _filePrefix = RandomStringUtils.randomAlphanumeric(4);
+ _maxNumRecordsPerFile = configuration.getInt(InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE, 0);
+ if (_maxNumRecordsPerFile > 0) {
+ LOGGER.info("Using multiple outputs strategy.");
+ _multipleOutputs = new AvroMultipleOutputs(context);
+ _numRecords = 0L;
+ _filePrefix = RandomStringUtils.randomAlphanumeric(4);
+ LOGGER.info("Initialized AvroDataPreprocessingReducer with maxNumRecordsPerFile: {}", _maxNumRecordsPerFile);
+ } else {
+ LOGGER.info("Initialized AvroDataPreprocessingReducer without limit on maxNumRecordsPerFile");
+ }
}
@Override
public void reduce(final T inputRecord, final Iterable<AvroValue<GenericRecord>> values, final Context context)
throws IOException, InterruptedException {
- for (final AvroValue<GenericRecord> value : values) {
- String fileName = generateFileName();
- _multipleOutputs.write(new AvroKey<>(value.datum()), NullWritable.get(), fileName);
+ if (_maxNumRecordsPerFile > 0) {
+ for (final AvroValue<GenericRecord> value : values) {
+ String fileName = _filePrefix + (_numRecords++ / _maxNumRecordsPerFile);
+ _multipleOutputs.write(new AvroKey<>(value.datum()), NullWritable.get(), fileName);
+ }
+ } else {
+ for (final AvroValue<GenericRecord> value : values) {
+ context.write(new AvroKey<>(value.datum()), NullWritable.get());
+ }
}
}
@@ -73,12 +82,4 @@ public class SegmentPreprocessingReducer<T> extends Reducer<T, AvroValue<Generic
}
LOGGER.info("Finished cleaning up reducer.");
}
-
- private String generateFileName() {
- if (_maxNumberOfRecords == 0) {
- return _filePrefix;
- } else {
- return _filePrefix + (_counter.getAndIncrement() / _maxNumberOfRecords);
- }
- }
}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/SegmentPreprocessingReducer.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/OrcDataPreprocessingReducer.java
similarity index 54%
rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/SegmentPreprocessingReducer.java
rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/OrcDataPreprocessingReducer.java
index 9c07e6f..a3387a2 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/SegmentPreprocessingReducer.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/OrcDataPreprocessingReducer.java
@@ -19,47 +19,56 @@
package org.apache.pinot.hadoop.job.reducers;
import java.io.IOException;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.mapred.AvroKey;
-import org.apache.avro.mapred.AvroValue;
-import org.apache.avro.mapreduce.AvroMultipleOutputs;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
+import org.apache.orc.mapred.OrcStruct;
+import org.apache.orc.mapred.OrcValue;
import org.apache.pinot.hadoop.job.InternalConfigConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class SegmentPreprocessingReducer<T> extends Reducer<T, AvroValue<GenericRecord>, AvroKey<GenericRecord>, NullWritable> {
- private static final Logger LOGGER = LoggerFactory.getLogger(SegmentPreprocessingReducer.class);
+public class OrcDataPreprocessingReducer extends Reducer<WritableComparable, OrcValue, NullWritable, OrcStruct> {
+ private static final Logger LOGGER = LoggerFactory.getLogger(OrcDataPreprocessingReducer.class);
- private AvroMultipleOutputs _multipleOutputs;
- private AtomicInteger _counter;
- private int _maxNumberOfRecords;
+ private int _maxNumRecordsPerFile;
+ private MultipleOutputs<NullWritable, OrcStruct> _multipleOutputs;
+ private long _numRecords;
private String _filePrefix;
@Override
public void setup(Context context) {
- LOGGER.info("Using multiple outputs strategy.");
Configuration configuration = context.getConfiguration();
- _multipleOutputs = new AvroMultipleOutputs(context);
- _counter = new AtomicInteger();
// If it's 0, the output file won't be split into multiple files.
// If not, output file will be split when the number of records reaches this number.
- _maxNumberOfRecords = configuration.getInt(InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE, 0);
- LOGGER.info("Maximum number of records per file: {}", _maxNumberOfRecords);
- _filePrefix = RandomStringUtils.randomAlphanumeric(4);
+ _maxNumRecordsPerFile = configuration.getInt(InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE, 0);
+ if (_maxNumRecordsPerFile > 0) {
+ LOGGER.info("Using multiple outputs strategy.");
+ _multipleOutputs = new MultipleOutputs<>(context);
+ _numRecords = 0L;
+ _filePrefix = RandomStringUtils.randomAlphanumeric(4);
+ LOGGER.info("Initialized OrcDataPreprocessingReducer with maxNumRecordsPerFile: {}", _maxNumRecordsPerFile);
+ } else {
+ LOGGER.info("Initialized OrcDataPreprocessingReducer without limit on maxNumRecordsPerFile");
+ }
}
@Override
- public void reduce(final T inputRecord, final Iterable<AvroValue<GenericRecord>> values, final Context context)
+ public void reduce(WritableComparable key, Iterable<OrcValue> values, Context context)
throws IOException, InterruptedException {
- for (final AvroValue<GenericRecord> value : values) {
- String fileName = generateFileName();
- _multipleOutputs.write(new AvroKey<>(value.datum()), NullWritable.get(), fileName);
+ if (_maxNumRecordsPerFile > 0) {
+ for (final OrcValue value : values) {
+ String fileName = _filePrefix + (_numRecords++ / _maxNumRecordsPerFile);
+ _multipleOutputs.write(NullWritable.get(), (OrcStruct) value.value, fileName);
+ }
+ } else {
+ for (final OrcValue value : values) {
+ context.write(NullWritable.get(), (OrcStruct) value.value);
+ }
}
}
@@ -73,12 +82,4 @@ public class SegmentPreprocessingReducer<T> extends Reducer<T, AvroValue<Generic
}
LOGGER.info("Finished cleaning up reducer.");
}
-
- private String generateFileName() {
- if (_maxNumberOfRecords == 0) {
- return _filePrefix;
- } else {
- return _filePrefix + (_counter.getAndIncrement() / _maxNumberOfRecords);
- }
- }
}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/DataFileUtils.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/DataFileUtils.java
new file mode 100644
index 0000000..58e1c1d
--- /dev/null
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/DataFileUtils.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.hadoop.utils.preprocess;
+
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+
+
+public class DataFileUtils {
+ private DataFileUtils() {
+ }
+
+ public static final String AVRO_FILE_EXTENSION = ".avro";
+ public static final String ORC_FILE_EXTENSION = ".orc";
+
+ /**
+ * Returns the data files under the input directory with the given file extension.
+ */
+ public static List<Path> getDataFiles(Path inputDir, String dataFileExtension)
+ throws IOException {
+ FileStatus fileStatus = HadoopUtils.DEFAULT_FILE_SYSTEM.getFileStatus(inputDir);
+ Preconditions.checkState(fileStatus.isDirectory(), "Path: %s is not a directory", inputDir);
+ List<Path> dataFiles = new ArrayList<>();
+ getDataFilesHelper(HadoopUtils.DEFAULT_FILE_SYSTEM.listStatus(inputDir), dataFileExtension, dataFiles);
+ return dataFiles;
+ }
+
+ private static void getDataFilesHelper(FileStatus[] fileStatuses, String dataFileExtension, List<Path> dataFiles)
+ throws IOException {
+ for (FileStatus fileStatus : fileStatuses) {
+ Path path = fileStatus.getPath();
+ if (fileStatus.isDirectory()) {
+ getDataFilesHelper(HadoopUtils.DEFAULT_FILE_SYSTEM.listStatus(path), dataFileExtension, dataFiles);
+ } else {
+ Preconditions.checkState(fileStatus.isFile(), "Path: %s is neither a directory nor a file", path);
+ if (path.getName().endsWith(dataFileExtension)) {
+ dataFiles.add(path);
+ }
+ }
+ }
+ }
+}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/DataPreprocessingUtils.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/DataPreprocessingUtils.java
new file mode 100644
index 0000000..bf399af
--- /dev/null
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/DataPreprocessingUtils.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.hadoop.utils.preprocess;
+
+import java.util.Set;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.pinot.spi.data.FieldSpec;
+
+
+public class DataPreprocessingUtils {
+ private DataPreprocessingUtils() {
+ }
+
+ /**
+ * Converts a value into {@link WritableComparable} based on the given data type.
+ * <p>NOTE: The passed in value must be either a Number or a String.
+ */
+ public static WritableComparable convertToWritableComparable(Object value, FieldSpec.DataType dataType) {
+ if (value instanceof Number) {
+ Number numberValue = (Number) value;
+ switch (dataType) {
+ case INT:
+ return new IntWritable(numberValue.intValue());
+ case LONG:
+ return new LongWritable(numberValue.longValue());
+ case FLOAT:
+ return new FloatWritable(numberValue.floatValue());
+ case DOUBLE:
+ return new DoubleWritable(numberValue.doubleValue());
+ case STRING:
+ return new Text(numberValue.toString());
+ default:
+ throw new IllegalArgumentException("Unsupported data type: " + dataType);
+ }
+ } else if (value instanceof String) {
+ String stringValue = (String) value;
+ switch (dataType) {
+ case INT:
+ return new IntWritable(Integer.parseInt(stringValue));
+ case LONG:
+ return new LongWritable(Long.parseLong(stringValue));
+ case FLOAT:
+ return new FloatWritable(Float.parseFloat(stringValue));
+ case DOUBLE:
+ return new DoubleWritable(Double.parseDouble(stringValue));
+ case STRING:
+ return new Text(stringValue);
+ default:
+ throw new IllegalArgumentException("Unsupported data type: " + dataType);
+ }
+ } else {
+ throw new IllegalArgumentException(
+ String.format("Value: %s must be either a Number or a String, found: %s", value, value.getClass()));
+ }
+ }
+
+ public static Set<Operation> getOperations(Set<Operation> operationSet, String preprocessingOperationsString) {
+ String[] preprocessingOpsArray = preprocessingOperationsString.split(",");
+ for (String preprocessingOps : preprocessingOpsArray) {
+ operationSet.add(Operation.getOperation(preprocessingOps.trim().toUpperCase()));
+ }
+ return operationSet;
+ }
+
+ public enum Operation {
+ PARTITION,
+ SORT,
+ RESIZE;
+
+ public static Operation getOperation(String operationString) {
+ for (Operation operation : Operation.values()) {
+ if (operation.name().equals(operationString)) {
+ return operation;
+ }
+ }
+ throw new IllegalArgumentException("Unsupported data preprocessing operation: " + operationString);
+ }
+ }
+}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/HadoopUtils.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/HadoopUtils.java
new file mode 100644
index 0000000..0596259
--- /dev/null
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/HadoopUtils.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.hadoop.utils.preprocess;
+
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+
+
+public class HadoopUtils {
+ private HadoopUtils() {
+ }
+
+ public static final Configuration DEFAULT_CONFIGURATION;
+ public static final FileSystem DEFAULT_FILE_SYSTEM;
+
+ static {
+ DEFAULT_CONFIGURATION = new Configuration();
+ try {
+ DEFAULT_FILE_SYSTEM = FileSystem.get(DEFAULT_CONFIGURATION);
+ } catch (IOException e) {
+ throw new IllegalStateException("Failed to get the default file system", e);
+ }
+ }
+}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/OrcUtils.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/OrcUtils.java
new file mode 100644
index 0000000..dcfc3b5
--- /dev/null
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/OrcUtils.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.hadoop.utils.preprocess;
+
+import org.apache.hadoop.hive.serde2.io.DateWritable;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.ShortWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.orc.mapred.OrcTimestamp;
+
+
+public class OrcUtils {
+ private OrcUtils() {
+ }
+
+ /**
+ * Converts the ORC value into Number or String.
+ * <p>The following ORC types are supported:
+ * <ul>
+ * <li>IntWritable -> Integer</li>
+ * <li>LongWritable -> Long</li>
+ * <li>FloatWritable -> Float</li>
+ * <li>DoubleWritable -> Double</li>
+ * <li>Text -> String</li>
+ * <li>BooleanWritable -> String</li>
+ * <li>ByteWritable -> Byte</li>
+ * <li>ShortWritable -> Short</li>
+ * <li>DateWritable -> Long</li>
+ * <li>OrcTimestamp -> Long</li>
+ * </ul>
+ */
+ public static Object convert(WritableComparable orcValue) {
+ if (orcValue instanceof IntWritable) {
+ return ((IntWritable) orcValue).get();
+ }
+ if (orcValue instanceof LongWritable) {
+ return ((LongWritable) orcValue).get();
+ }
+ if (orcValue instanceof FloatWritable) {
+ return ((FloatWritable) orcValue).get();
+ }
+ if (orcValue instanceof DoubleWritable) {
+ return ((DoubleWritable) orcValue).get();
+ }
+ if (orcValue instanceof Text) {
+ return orcValue.toString();
+ }
+ if (orcValue instanceof BooleanWritable) {
+ return Boolean.toString(((BooleanWritable) orcValue).get());
+ }
+ if (orcValue instanceof ByteWritable) {
+ return ((ByteWritable) orcValue).get();
+ }
+ if (orcValue instanceof ShortWritable) {
+ return ((ShortWritable) orcValue).get();
+ }
+ if (orcValue instanceof DateWritable) {
+ return ((DateWritable) orcValue).get().getTime();
+ }
+ if (orcValue instanceof OrcTimestamp) {
+ return ((OrcTimestamp) orcValue).getTime();
+ }
+ throw new IllegalArgumentException(
+ String.format("Illegal ORC value: %s, class: %s", orcValue, orcValue.getClass()));
+ }
+}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/TextComparator.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/TextComparator.java
new file mode 100644
index 0000000..65f1222
--- /dev/null
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/TextComparator.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.hadoop.utils.preprocess;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.pinot.spi.utils.StringUtils;
+
+
+/**
+ * Override the Text comparison logic to compare with the decoded String instead of the byte array.
+ */
+public class TextComparator extends WritableComparator {
+ public TextComparator() {
+ super(Text.class);
+ }
+
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ int n1 = WritableUtils.decodeVIntSize(b1[s1]);
+ int n2 = WritableUtils.decodeVIntSize(b2[s2]);
+ return StringUtils.decodeUtf8(b1, s1 + n1, l1 - n1).compareTo(StringUtils.decodeUtf8(b2, s2 + n2, l2 - n2));
+ }
+}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/jobs/SegmentPreprocessingJob.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/jobs/SegmentPreprocessingJob.java
index 2e9d98c..2e3b023 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/jobs/SegmentPreprocessingJob.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/jobs/SegmentPreprocessingJob.java
@@ -22,15 +22,11 @@ import com.google.common.base.Preconditions;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
-import java.util.zip.GZIPInputStream;
-import org.apache.avro.file.DataFileStream;
-import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.pinot.ingestion.common.ControllerRestApi;
import org.apache.pinot.ingestion.common.JobConfigConstants;
+import org.apache.pinot.spi.data.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -75,26 +71,8 @@ public abstract class SegmentPreprocessingJob extends BaseSegmentJob {
protected abstract void run()
throws Exception;
- /**
- * Helper method that returns avro reader for the given avro file.
- * If file name ends in 'gz' then returns the GZIP version, otherwise gives the regular reader.
- *
- * @param avroFile File to read
- * @return Avro reader for the file.
- * @throws IOException exception when accessing to IO
- */
- protected DataFileStream<GenericRecord> getAvroReader(Path avroFile)
- throws IOException {
- FileSystem fs = FileSystem.get(new Configuration());
- if (avroFile.getName().endsWith("gz")) {
- return new DataFileStream<>(new GZIPInputStream(fs.open(avroFile)), new GenericDatumReader<>());
- } else {
- return new DataFileStream<>(fs.open(avroFile), new GenericDatumReader<>());
- }
- }
-
@Override
- protected org.apache.pinot.spi.data.Schema getSchema()
+ protected Schema getSchema()
throws IOException {
try (ControllerRestApi controllerRestApi = getControllerRestApi()) {
if (controllerRestApi != null) {
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java
index f966f9b..7af16df 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java
@@ -502,6 +502,13 @@ public abstract class FieldSpec implements Comparable<FieldSpec>, Serializable {
throw new IllegalArgumentException(String.format("Cannot convert value: '%s' to type: %s", value, this));
}
}
+
+ /**
+ * Checks whether the data type can be a sorted column.
+ */
+ public boolean canBeASortedColumn() {
+ return this != BYTES && this != JSON && this != STRUCT && this != MAP && this != LIST;
+ }
}
@Override
diff --git a/pom.xml b/pom.xml
index a116022..d3013da 100644
--- a/pom.xml
+++ b/pom.xml
@@ -950,6 +950,11 @@
<classifier>hadoop2</classifier>
</dependency>
<dependency>
+ <groupId>org.apache.orc</groupId>
+ <artifactId>orc-mapreduce</artifactId>
+ <version>1.5.9</version>
+ </dependency>
+ <dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
<version>1.9.13</version>
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org