You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2021/06/15 21:06:38 UTC
[incubator-pinot] 01/01: Support data preprocessing for AVRO and
ORC formats
This is an automated email from the ASF dual-hosted git repository.
jlli pushed a commit to branch support-orc-format-preprocessing
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 07d12179170eda0abef6ac5d9ad17ed869fc2ec4
Author: Jack Li(Analytics Engineering) <jl...@jlli-mn1.linkedin.biz>
AuthorDate: Tue Jun 15 10:02:47 2021 -0700
Support data preprocessing for AVRO and ORC formats
---
.../v0_deprecated/pinot-hadoop/pom.xml | 4 +
.../hadoop/job/HadoopSegmentPreprocessingJob.java | 371 +++++++++++++--------
.../pinot/hadoop/job/InternalConfigConstants.java | 3 +-
.../job/mappers/AvroDataPreprocessingMapper.java | 85 +++++
.../job/mappers/OrcDataPreprocessingMapper.java | 87 +++++
.../job/mappers/SegmentPreprocessingMapper.java | 53 ++-
.../AvroDataPreprocessingPartitioner.java | 77 +++++
.../OrcDataPreprocessingPartitioner.java | 83 +++++
...ucer.java => AvroDataPreprocessingReducer.java} | 45 +--
...ducer.java => OrcDataPreprocessingReducer.java} | 57 ++--
.../hadoop/utils/preprocess/DataFileUtils.java | 62 ++++
.../utils/preprocess/DataPreprocessingUtils.java | 76 +++++
.../pinot/hadoop/utils/preprocess/HadoopUtils.java | 41 +++
.../pinot/hadoop/utils/preprocess/OrcUtils.java | 88 +++++
.../hadoop/utils/preprocess/RawDataFormat.java | 26 ++
.../hadoop/utils/preprocess/TextComparator.java | 41 +++
pom.xml | 5 +
17 files changed, 1001 insertions(+), 203 deletions(-)
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/pom.xml b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/pom.xml
index a0ea8ec..b8218e2 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/pom.xml
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/pom.xml
@@ -193,6 +193,10 @@
<classifier>hadoop2</classifier>
</dependency>
<dependency>
+ <groupId>org.apache.orc</groupId>
+ <artifactId>orc-mapreduce</artifactId>
+ </dependency>
+ <dependency>
<groupId>javax.xml.bind</groupId>
<artifactId>jaxb-api</artifactId>
</dependency>
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/HadoopSegmentPreprocessingJob.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/HadoopSegmentPreprocessingJob.java
index b4e87fc..c0edae7 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/HadoopSegmentPreprocessingJob.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/HadoopSegmentPreprocessingJob.java
@@ -21,7 +21,6 @@ package org.apache.pinot.hadoop.job;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.io.InputStream;
-import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -31,15 +30,20 @@ import org.apache.avro.Schema;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.mapred.AvroKey;
-import org.apache.avro.mapred.AvroValue;
import org.apache.avro.mapreduce.AvroJob;
+import org.apache.avro.mapreduce.AvroKeyInputFormat;
import org.apache.avro.mapreduce.AvroKeyOutputFormat;
import org.apache.avro.mapreduce.AvroMultipleOutputs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobContext;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.MRJobConfig;
@@ -48,11 +52,25 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.pinot.hadoop.io.CombineAvroKeyInputFormat;
-import org.apache.pinot.hadoop.job.mappers.SegmentPreprocessingMapper;
+import org.apache.orc.OrcConf;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.mapred.OrcStruct;
+import org.apache.orc.mapred.OrcValue;
+import org.apache.orc.mapreduce.OrcInputFormat;
+import org.apache.orc.mapreduce.OrcOutputFormat;
+import org.apache.pinot.hadoop.job.mappers.AvroDataPreprocessingMapper;
+import org.apache.pinot.hadoop.job.mappers.OrcDataPreprocessingMapper;
+import org.apache.pinot.hadoop.job.partitioners.AvroDataPreprocessingPartitioner;
import org.apache.pinot.hadoop.job.partitioners.GenericPartitioner;
-import org.apache.pinot.hadoop.job.reducers.SegmentPreprocessingReducer;
+import org.apache.pinot.hadoop.job.partitioners.OrcDataPreprocessingPartitioner;
+import org.apache.pinot.hadoop.job.reducers.AvroDataPreprocessingReducer;
+import org.apache.pinot.hadoop.job.reducers.OrcDataPreprocessingReducer;
import org.apache.pinot.hadoop.utils.PinotHadoopJobPreparationHelper;
+import org.apache.pinot.hadoop.utils.preprocess.DataFileUtils;
+import org.apache.pinot.hadoop.utils.preprocess.HadoopUtils;
+import org.apache.pinot.hadoop.utils.preprocess.RawDataFormat;
+import org.apache.pinot.hadoop.utils.preprocess.TextComparator;
import org.apache.pinot.ingestion.common.ControllerRestApi;
import org.apache.pinot.ingestion.common.JobConfigConstants;
import org.apache.pinot.ingestion.jobs.SegmentPreprocessingJob;
@@ -65,28 +83,36 @@ import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableCustomConfig;
import org.apache.pinot.spi.data.DateTimeFieldSpec;
import org.apache.pinot.spi.data.DateTimeFormatSpec;
+import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.utils.IngestionConfigUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * A Hadoop job which provides partitioning, sorting, and resizing against the input files, which is raw data in Avro format.
+ * A Hadoop job which provides partitioning, sorting, and resizing against the input files, which is raw data in either Avro or Orc format.
* Thus, the output files are partitioned, sorted, resized after this job.
* In order to run this job, the following configs need to be specified in job properties:
* * enable.preprocessing: false by default. Enables preprocessing job.
*/
public class HadoopSegmentPreprocessingJob extends SegmentPreprocessingJob {
- private static final Logger _logger = LoggerFactory.getLogger(HadoopSegmentPreprocessingJob.class);
- protected FileSystem _fileSystem;
+ private static final Logger LOGGER = LoggerFactory.getLogger(HadoopSegmentPreprocessingJob.class);
+
private String _partitionColumn;
private int _numPartitions;
private String _partitionFunction;
- private String _sortedColumn;
+
+ private String _sortingColumn;
+ private FieldSpec.DataType _sortingColumnType;
+
private int _numOutputFiles;
+ private int _maxNumRecordsPerFile;
+
private TableConfig _tableConfig;
private org.apache.pinot.spi.data.Schema _pinotTableSchema;
+ private Set<String> _preprocessingOperations;
+
public HadoopSegmentPreprocessingJob(final Properties properties) {
super(properties);
}
@@ -94,56 +120,102 @@ public class HadoopSegmentPreprocessingJob extends SegmentPreprocessingJob {
public void run()
throws Exception {
if (!_enablePreprocessing) {
- _logger.info("Pre-processing job is disabled.");
+ LOGGER.info("Pre-processing job is disabled.");
return;
} else {
- _logger.info("Starting {}", getClass().getSimpleName());
+ LOGGER.info("Starting {}", getClass().getSimpleName());
}
- _fileSystem = FileSystem.get(_inputSegmentDir.toUri(), getConf());
- final List<Path> inputDataPaths = getDataFilePaths(_inputSegmentDir);
- Preconditions.checkState(inputDataPaths.size() != 0, "No files in the input directory.");
-
- if (_fileSystem.exists(_preprocessedOutputDir)) {
- _logger.warn("Found output folder {}, deleting", _preprocessedOutputDir);
- _fileSystem.delete(_preprocessedOutputDir, true);
- }
setTableConfigAndSchema();
-
- _logger.info("Initializing a pre-processing job");
- Job job = Job.getInstance(getConf());
-
- Path sampleAvroPath = inputDataPaths.get(0);
- int numInputPaths = inputDataPaths.size();
-
- setValidationConfigs(job, sampleAvroPath);
- setHadoopJobConfigs(job, numInputPaths);
-
- // Avro Schema configs.
- Schema avroSchema = getSchema(sampleAvroPath);
- _logger.info("Schema is: {}", avroSchema.toString(true));
- setSchemaParams(job, avroSchema);
-
- // Set up mapper output key
- Set<Schema.Field> fieldSet = new HashSet<>();
-
+ fetchPreProcessingOperations();
fetchPartitioningConfig();
fetchSortingConfig();
fetchResizingConfig();
- validateConfigsAgainstSchema(avroSchema);
- // Partition configs.
+ // Cleans up preprocessed output dir if exists
+ cleanUpPreprocessedOutputs(_preprocessedOutputDir);
+
+ final List<Path> avroFiles = DataFileUtils.getDataFiles(_inputSegmentDir, DataFileUtils.AVRO_FILE_EXTENSION);
+ final List<Path> orcFiles = DataFileUtils.getDataFiles(_inputSegmentDir, DataFileUtils.ORC_FILE_EXTENSION);
+
+ int numAvroFiles = avroFiles.size();
+ int numOrcFiles = orcFiles.size();
+ Preconditions.checkState(numAvroFiles == 0 || numOrcFiles == 0,
+ "Cannot preprocess mixed AVRO files: %s and ORC files: %s in directories: %s", avroFiles, orcFiles,
+ _inputSegmentDir);
+ Preconditions
+ .checkState(numAvroFiles > 0 || numOrcFiles > 0, "Failed to find any AVRO or ORC file in directories: %s",
+ _inputSegmentDir);
+ List<Path> inputDataPaths;
+ RawDataFormat rawDataFormat;
+ if (numAvroFiles > 0) {
+ rawDataFormat = RawDataFormat.AVRO;
+ inputDataPaths = avroFiles;
+ LOGGER.info("Find AVRO files: {} in directories: {}", avroFiles, _inputSegmentDir);
+ } else {
+ rawDataFormat = RawDataFormat.ORC;
+ inputDataPaths = orcFiles;
+ LOGGER.info("Find ORC files: {} in directories: {}", orcFiles, _inputSegmentDir);
+ }
+
+ LOGGER.info("Initializing a pre-processing job");
+ Job job = Job.getInstance(HadoopUtils.DEFAULT_CONFIGURATION);
+ Configuration jobConf = job.getConfiguration();
+ // Input and output paths.
+ Path sampleRawDataPath = inputDataPaths.get(0);
+ int numInputPaths = inputDataPaths.size();
+ setValidationConfigs(job, sampleRawDataPath);
+ for (Path inputFile : inputDataPaths) {
+ FileInputFormat.addInputPath(job, inputFile);
+ }
+ setHadoopJobConfigs(job);
+
+ // Sorting column
+ if (_sortingColumn != null) {
+ LOGGER.info("Adding sorting column: {} to job config", _sortingColumn);
+ jobConf.set(InternalConfigConstants.SORTING_COLUMN_CONFIG, _sortingColumn);
+ jobConf.set(InternalConfigConstants.SORTING_COLUMN_TYPE, _sortingColumnType.name());
+
+ switch (_sortingColumnType) {
+ case INT:
+ job.setMapOutputKeyClass(IntWritable.class);
+ break;
+ case LONG:
+ job.setMapOutputKeyClass(LongWritable.class);
+ break;
+ case FLOAT:
+ job.setMapOutputKeyClass(FloatWritable.class);
+ break;
+ case DOUBLE:
+ job.setMapOutputKeyClass(DoubleWritable.class);
+ break;
+ case STRING:
+ job.setMapOutputKeyClass(Text.class);
+ job.setSortComparatorClass(TextComparator.class);
+ break;
+ default:
+ throw new IllegalStateException();
+ }
+ } else {
+ job.setMapOutputKeyClass(NullWritable.class);
+ }
+
+ // Partition column
int numReduceTasks = 0;
if (_partitionColumn != null) {
numReduceTasks = _numPartitions;
- job.getConfiguration().set(InternalConfigConstants.ENABLE_PARTITIONING, "true");
+ jobConf.set(InternalConfigConstants.ENABLE_PARTITIONING, "true");
job.setPartitionerClass(GenericPartitioner.class);
- job.getConfiguration().set(InternalConfigConstants.PARTITION_COLUMN_CONFIG, _partitionColumn);
+ jobConf.set(InternalConfigConstants.PARTITION_COLUMN_CONFIG, _partitionColumn);
if (_partitionFunction != null) {
- job.getConfiguration().set(InternalConfigConstants.PARTITION_FUNCTION_CONFIG, _partitionFunction);
+ jobConf.set(InternalConfigConstants.PARTITION_FUNCTION_CONFIG, _partitionFunction);
+ }
+ jobConf.setInt(InternalConfigConstants.NUM_PARTITIONS_CONFIG, numReduceTasks);
+ if (rawDataFormat == RawDataFormat.AVRO) {
+ job.setPartitionerClass(AvroDataPreprocessingPartitioner.class);
+ } else {
+ job.setPartitionerClass(OrcDataPreprocessingPartitioner.class);
}
- job.getConfiguration().set(InternalConfigConstants.NUM_PARTITIONS_CONFIG, Integer.toString(numReduceTasks));
- setMaxNumRecordsConfigIfSpecified(job);
} else {
if (_numOutputFiles > 0) {
numReduceTasks = _numOutputFiles;
@@ -151,45 +223,64 @@ public class HadoopSegmentPreprocessingJob extends SegmentPreprocessingJob {
// default number of input paths
numReduceTasks = inputDataPaths.size();
}
- // Partitioning is disabled. Adding hashcode as one of the fields to mapper output key.
- // so that all the rows can be spread evenly.
- addHashCodeField(fieldSet);
}
- job.setInputFormatClass(CombineAvroKeyInputFormat.class);
-
- _logger.info("Number of reduce tasks for pre-processing job: {}", numReduceTasks);
+ // Maximum number of records per output file
+ jobConf.set(InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE, Integer.toString(_maxNumRecordsPerFile));
+ // Number of reducers
+ LOGGER.info("Number of reduce tasks for pre-processing job: {}", numReduceTasks);
job.setNumReduceTasks(numReduceTasks);
- // Sort config.
- if (_sortedColumn != null) {
- _logger.info("Adding sorted column: {} to job config", _sortedColumn);
- job.getConfiguration().set(InternalConfigConstants.SORTED_COLUMN_CONFIG, _sortedColumn);
-
- addSortedColumnField(avroSchema, fieldSet);
+ // Mapper and reducer configs.
+ if (rawDataFormat == RawDataFormat.AVRO) {
+ Schema avroSchema = getAvroSchema(sampleRawDataPath);
+ LOGGER.info("Avro schema is: {}", avroSchema.toString(true));
+
+ // Set up mapper output key
+ validateConfigsAgainstSchema(avroSchema);
+
+ job.setInputFormatClass(AvroKeyInputFormat.class);
+ job.setMapperClass(AvroDataPreprocessingMapper.class);
+ jobConf.setInt(JobContext.NUM_MAPS, numInputPaths);
+
+ job.setReducerClass(AvroDataPreprocessingReducer.class);
+ AvroMultipleOutputs.addNamedOutput(job, "avro", AvroKeyOutputFormat.class, avroSchema);
+ AvroMultipleOutputs.setCountersEnabled(job, true);
+ // Use LazyOutputFormat to avoid creating empty files.
+ LazyOutputFormat.setOutputFormatClass(job, AvroKeyOutputFormat.class);
+ job.setOutputKeyClass(AvroKey.class);
+ job.setOutputValueClass(NullWritable.class);
+
+ AvroJob.setInputKeySchema(job, avroSchema);
+ AvroJob.setMapOutputValueSchema(job, avroSchema);
+ AvroJob.setOutputKeySchema(job, avroSchema);
} else {
- // If sorting is disabled, hashcode will be the only factor for sort/group comparator.
- addHashCodeField(fieldSet);
+ String orcSchemaString = getOrcSchemaString(sampleRawDataPath);
+ LOGGER.info("Orc schema is: {}", orcSchemaString);
+
+ job.setInputFormatClass(OrcInputFormat.class);
+ job.setMapperClass(OrcDataPreprocessingMapper.class);
+ job.setMapOutputValueClass(OrcValue.class);
+ jobConf.setInt(JobContext.NUM_MAPS, numInputPaths);
+ OrcConf.MAPRED_SHUFFLE_VALUE_SCHEMA.setString(jobConf, orcSchemaString);
+
+ job.setReducerClass(OrcDataPreprocessingReducer.class);
+ // Use LazyOutputFormat to avoid creating empty files.
+ LazyOutputFormat.setOutputFormatClass(job, OrcOutputFormat.class);
+ job.setOutputKeyClass(NullWritable.class);
+ job.setOutputValueClass(OrcStruct.class);
+ OrcConf.MAPRED_OUTPUT_SCHEMA.setString(jobConf, orcSchemaString);
}
- // Creates a wrapper for the schema of output key in mapper.
- Schema mapperOutputKeySchema = Schema.createRecord(/*name*/"record", /*doc*/"", /*namespace*/"", false);
- mapperOutputKeySchema.setFields(new ArrayList<>(fieldSet));
- _logger.info("Mapper output schema: {}", mapperOutputKeySchema);
-
- AvroJob.setInputKeySchema(job, avroSchema);
- AvroJob.setMapOutputKeySchema(job, mapperOutputKeySchema);
- AvroJob.setMapOutputValueSchema(job, avroSchema);
- AvroJob.setOutputKeySchema(job, avroSchema);
-
// Since we aren't extending AbstractHadoopJob, we need to add the jars for the job to
// distributed cache ourselves. Take a look at how the addFilesToDistributedCache is
// implemented so that you know what it does.
- _logger.info("HDFS class path: " + _pathToDependencyJar);
+ LOGGER.info("HDFS class path: " + _pathToDependencyJar);
if (_pathToDependencyJar != null) {
- _logger.info("Copying jars locally.");
- PinotHadoopJobPreparationHelper.addDepsJarToDistributedCacheHelper(_fileSystem, job, _pathToDependencyJar);
+ LOGGER.info("Copying jars locally.");
+ PinotHadoopJobPreparationHelper
+ .addDepsJarToDistributedCacheHelper(HadoopUtils.DEFAULT_FILE_SYSTEM, job, _pathToDependencyJar);
} else {
- _logger.info("Property '{}' not specified.", JobConfigConstants.PATH_TO_DEPS_JAR);
+ LOGGER.info("Property '{}' not specified.", JobConfigConstants.PATH_TO_DEPS_JAR);
}
long startTime = System.currentTimeMillis();
@@ -199,11 +290,30 @@ public class HadoopSegmentPreprocessingJob extends SegmentPreprocessingJob {
throw new RuntimeException("Job failed : " + job);
}
- _logger.info("Finished pre-processing job in {}ms", (System.currentTimeMillis() - startTime));
+ LOGGER.info("Finished pre-processing job in {}ms", (System.currentTimeMillis() - startTime));
+ }
+
+ private void fetchPreProcessingOperations() {
+ _preprocessingOperations = new HashSet<>();
+ TableCustomConfig customConfig = _tableConfig.getCustomConfig();
+ if (customConfig != null) {
+ Map<String, String> customConfigMap = customConfig.getCustomConfigs();
+ if (customConfigMap != null && !customConfigMap.isEmpty()) {
+ String preprocessingOperationsString = customConfigMap.getOrDefault("preprocessing.operations", "");
+ String[] preprocessingOpsArray = preprocessingOperationsString.split(",");
+ for (String preprocessingOps : preprocessingOpsArray) {
+ _preprocessingOperations.add(preprocessingOps.trim().toLowerCase());
+ }
+ }
+ }
}
private void fetchPartitioningConfig() {
// Fetch partition info from table config.
+ if (!_preprocessingOperations.contains("partition")) {
+ LOGGER.info("Partitioning is disabled.");
+ return;
+ }
SegmentPartitionConfig segmentPartitionConfig = _tableConfig.getIndexingConfig().getSegmentPartitionConfig();
if (segmentPartitionConfig != null) {
Map<String, ColumnPartitionConfig> columnPartitionMap = segmentPartitionConfig.getColumnPartitionMap();
@@ -215,23 +325,39 @@ public class HadoopSegmentPreprocessingJob extends SegmentPreprocessingJob {
_partitionFunction = segmentPartitionConfig.getFunctionName(_partitionColumn);
}
} else {
- _logger.info("Segment partition config is null for table: {}", _tableConfig.getTableName());
+ LOGGER.info("Segment partition config is null for table: {}", _tableConfig.getTableName());
}
}
private void fetchSortingConfig() {
+ if (!_preprocessingOperations.contains("sort")) {
+ LOGGER.info("Sorting is disabled.");
+ return;
+ }
// Fetch sorting info from table config.
IndexingConfig indexingConfig = _tableConfig.getIndexingConfig();
List<String> sortedColumns = indexingConfig.getSortedColumn();
if (sortedColumns != null) {
Preconditions.checkArgument(sortedColumns.size() <= 1, "There should be at most 1 sorted column in the table.");
if (sortedColumns.size() == 1) {
- _sortedColumn = sortedColumns.get(0);
+ _sortingColumn = sortedColumns.get(0);
+ FieldSpec fieldSpec = _pinotTableSchema.getFieldSpecFor(_sortingColumn);
+ Preconditions.checkState(fieldSpec != null, "Failed to find sorting column: {} in the schema", _sortingColumn);
+ Preconditions
+ .checkState(fieldSpec.isSingleValueField(), "Cannot sort on multi-value column: %s", _sortingColumn);
+ _sortingColumnType = fieldSpec.getDataType();
+ Preconditions.checkState(_sortingColumnType != FieldSpec.DataType.BYTES, "Cannot sort on BYTES column: %s",
+ _sortingColumn);
+ LOGGER.info("Sorting the data with column: {} of type: {}", _sortingColumn, _sortingColumnType);
}
}
}
private void fetchResizingConfig() {
+ if (!_preprocessingOperations.contains("resize")) {
+ LOGGER.info("Resizing is disabled.");
+ return;
+ }
TableCustomConfig tableCustomConfig = _tableConfig.getCustomConfig();
if (tableCustomConfig == null) {
_numOutputFiles = 0;
@@ -246,14 +372,7 @@ public class HadoopSegmentPreprocessingJob extends SegmentPreprocessingJob {
} else {
_numOutputFiles = 0;
}
- }
- private void setMaxNumRecordsConfigIfSpecified(Job job) {
- TableCustomConfig tableCustomConfig = _tableConfig.getCustomConfig();
- if (tableCustomConfig == null) {
- return;
- }
- Map<String, String> customConfigsMap = tableCustomConfig.getCustomConfigs();
if (customConfigsMap != null && customConfigsMap
.containsKey(InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE)) {
int maxNumRecords =
@@ -261,9 +380,8 @@ public class HadoopSegmentPreprocessingJob extends SegmentPreprocessingJob {
Preconditions.checkArgument(maxNumRecords > 0,
"The value of " + InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE
+ " should be positive. Current value: " + maxNumRecords);
- _logger.info("Setting {} to {}", InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE, maxNumRecords);
- job.getConfiguration()
- .set(InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE, Integer.toString(maxNumRecords));
+ LOGGER.info("Setting {} to {}", InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE, maxNumRecords);
+ _maxNumRecordsPerFile = maxNumRecords;
}
}
@@ -273,13 +391,12 @@ public class HadoopSegmentPreprocessingJob extends SegmentPreprocessingJob {
* @return Input schema
* @throws IOException exception when accessing to IO
*/
- private Schema getSchema(Path inputPathDir)
+ private Schema getAvroSchema(Path inputPathDir)
throws IOException {
- FileSystem fs = FileSystem.get(new Configuration());
Schema avroSchema = null;
- for (FileStatus fileStatus : fs.listStatus(inputPathDir)) {
+ for (FileStatus fileStatus : HadoopUtils.DEFAULT_FILE_SYSTEM.listStatus(inputPathDir)) {
if (fileStatus.isFile() && fileStatus.getPath().getName().endsWith(".avro")) {
- _logger.info("Extracting schema from " + fileStatus.getPath());
+ LOGGER.info("Extracting schema from " + fileStatus.getPath());
try (DataFileStream<GenericRecord> dataStreamReader = getAvroReader(inputPathDir)) {
avroSchema = dataStreamReader.getSchema();
}
@@ -289,19 +406,17 @@ public class HadoopSegmentPreprocessingJob extends SegmentPreprocessingJob {
return avroSchema;
}
- private void addSortedColumnField(Schema schema, Set<Schema.Field> fieldSet) {
- // Sorting is enabled. Adding sorted column value to mapper output key.
- Schema sortedColumnSchema = schema.getField(_sortedColumn).schema();
- Schema sortedColumnAsKeySchema;
- if (sortedColumnSchema.getType().equals(Schema.Type.UNION)) {
- sortedColumnAsKeySchema = Schema.createUnion(sortedColumnSchema.getTypes());
- } else if (sortedColumnSchema.getType().equals(Schema.Type.ARRAY)) {
- sortedColumnAsKeySchema = Schema.createArray(sortedColumnSchema.getElementType());
- } else {
- sortedColumnAsKeySchema = Schema.create(sortedColumnSchema.getType());
+ /**
+ * Finds the orc file and return its orc schema.
+ */
+ private String getOrcSchemaString(Path orcFile) {
+ String orcSchemaString;
+ try (Reader reader = OrcFile.createReader(orcFile, OrcFile.readerOptions(HadoopUtils.DEFAULT_CONFIGURATION))) {
+ orcSchemaString = reader.getSchema().toString();
+ } catch (Exception e) {
+ throw new IllegalStateException("Caught exception while extracting ORC schema from file: " + orcFile, e);
}
- Schema.Field columnField = new Schema.Field(_sortedColumn, sortedColumnAsKeySchema, "sortedColumn", null);
- fieldSet.add(columnField);
+ return orcSchemaString;
}
private void validateConfigsAgainstSchema(Schema schema) {
@@ -314,22 +429,17 @@ public class HadoopSegmentPreprocessingJob extends SegmentPreprocessingJob {
try {
PartitionFunctionFactory.PartitionFunctionType.fromString(_partitionFunction);
} catch (IllegalArgumentException e) {
- _logger.error("Partition function needs to be one of Modulo, Murmur, ByteArray, HashCode, it is currently {}",
+ LOGGER.error("Partition function needs to be one of Modulo, Murmur, ByteArray, HashCode, it is currently {}",
_partitionColumn);
throw new IllegalArgumentException(e);
}
}
- if (_sortedColumn != null) {
- Preconditions.checkArgument(schema.getField(_sortedColumn) != null,
- String.format("Sorted column: %s is not found from the schema of input files.", _sortedColumn));
+ if (_sortingColumn != null) {
+ Preconditions.checkArgument(schema.getField(_sortingColumn) != null,
+ String.format("Sorted column: %s is not found from the schema of input files.", _sortingColumn));
}
}
- private void addHashCodeField(Set<Schema.Field> fieldSet) {
- Schema.Field hashCodeField = new Schema.Field("hashcode", Schema.create(Schema.Type.INT), "hashcode", null);
- fieldSet.add(hashCodeField);
- }
-
@Override
protected org.apache.pinot.spi.data.Schema getSchema()
throws IOException {
@@ -376,12 +486,10 @@ public class HadoopSegmentPreprocessingJob extends SegmentPreprocessingJob {
DateTimeFieldSpec dateTimeFieldSpec = _pinotTableSchema.getSpecForTimeColumn(timeColumnName);
if (dateTimeFieldSpec != null) {
DateTimeFormatSpec formatSpec = new DateTimeFormatSpec(dateTimeFieldSpec.getFormat());
- job.getConfiguration()
- .set(InternalConfigConstants.SEGMENT_TIME_TYPE, formatSpec.getColumnUnit().toString());
+ job.getConfiguration().set(InternalConfigConstants.SEGMENT_TIME_TYPE, formatSpec.getColumnUnit().toString());
job.getConfiguration()
.set(InternalConfigConstants.SEGMENT_TIME_FORMAT, formatSpec.getTimeFormat().toString());
- job.getConfiguration()
- .set(InternalConfigConstants.SEGMENT_TIME_SDF_PATTERN, formatSpec.getSDFPattern());
+ job.getConfiguration().set(InternalConfigConstants.SEGMENT_TIME_SDF_PATTERN, formatSpec.getSDFPattern());
}
}
job.getConfiguration().set(InternalConfigConstants.SEGMENT_PUSH_FREQUENCY,
@@ -393,41 +501,30 @@ public class HadoopSegmentPreprocessingJob extends SegmentPreprocessingJob {
}
}
- private void setHadoopJobConfigs(Job job, int numInputPaths) {
+ private void setHadoopJobConfigs(Job job) {
+ job.setJarByClass(HadoopSegmentPreprocessingJob.class);
+ job.setJobName(getClass().getName());
+ FileOutputFormat.setOutputPath(job, _preprocessedOutputDir);
job.getConfiguration().set(JobContext.JOB_NAME, this.getClass().getName());
// Turn this on to always firstly use class paths that user specifies.
job.getConfiguration().set(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, "true");
// Turn this off since we don't need an empty file in the output directory
job.getConfiguration().set(FileOutputCommitter.SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, "false");
- job.setJarByClass(HadoopSegmentPreprocessingJob.class);
-
String hadoopTokenFileLocation = System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
if (hadoopTokenFileLocation != null) {
job.getConfiguration().set(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY, hadoopTokenFileLocation);
}
-
- // Mapper configs.
- job.setMapperClass(SegmentPreprocessingMapper.class);
- job.setMapOutputKeyClass(AvroKey.class);
- job.setMapOutputValueClass(AvroValue.class);
- job.getConfiguration().setInt(JobContext.NUM_MAPS, numInputPaths);
-
- // Reducer configs.
- job.setReducerClass(SegmentPreprocessingReducer.class);
- job.setOutputKeyClass(AvroKey.class);
- job.setOutputValueClass(NullWritable.class);
}
- private void setSchemaParams(Job job, Schema avroSchema)
+ /**
+ * Cleans up outputs in preprocessed output directory.
+ */
+ public static void cleanUpPreprocessedOutputs(Path preprocessedOutputDir)
throws IOException {
- AvroMultipleOutputs.addNamedOutput(job, "avro", AvroKeyOutputFormat.class, avroSchema);
- AvroMultipleOutputs.setCountersEnabled(job, true);
- // Use LazyOutputFormat to avoid creating empty files.
- LazyOutputFormat.setOutputFormatClass(job, AvroKeyOutputFormat.class);
-
- // Input and output paths.
- FileInputFormat.setInputPaths(job, _inputSegmentDir);
- FileOutputFormat.setOutputPath(job, _preprocessedOutputDir);
+ if (HadoopUtils.DEFAULT_FILE_SYSTEM.exists(preprocessedOutputDir)) {
+ LOGGER.warn("Found output folder {}, deleting", preprocessedOutputDir);
+ HadoopUtils.DEFAULT_FILE_SYSTEM.delete(preprocessedOutputDir, true);
+ }
}
}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/InternalConfigConstants.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/InternalConfigConstants.java
index 3a7938d..c44f0f9 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/InternalConfigConstants.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/InternalConfigConstants.java
@@ -36,7 +36,8 @@ public class InternalConfigConstants {
public static final String NUM_PARTITIONS_CONFIG = "num.partitions";
public static final String PARTITION_FUNCTION_CONFIG = "partition.function";
- public static final String SORTED_COLUMN_CONFIG = "sorted.column";
+ public static final String SORTING_COLUMN_CONFIG = "sorting.column";
+ public static final String SORTING_COLUMN_TYPE = "sorting.type";
public static final String ENABLE_PARTITIONING = "enable.partitioning";
// max records per file in each partition. No effect otherwise.
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/AvroDataPreprocessingMapper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/AvroDataPreprocessingMapper.java
new file mode 100644
index 0000000..6278e8e
--- /dev/null
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/AvroDataPreprocessingMapper.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.hadoop.job.mappers;
+
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapred.AvroValue;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.pinot.hadoop.job.InternalConfigConstants;
+import org.apache.pinot.hadoop.utils.preprocess.DataPreprocessingUtils;
+import org.apache.pinot.plugin.inputformat.avro.AvroRecordExtractor;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class AvroDataPreprocessingMapper extends Mapper<AvroKey<GenericRecord>, NullWritable, WritableComparable, AvroValue<GenericRecord>> {
+ private static final Logger LOGGER = LoggerFactory.getLogger(AvroDataPreprocessingMapper.class);
+
+ private String _sortingColumn = null;
+ private FieldSpec.DataType _sortingColumnType = null;
+ private AvroRecordExtractor _avroRecordExtractor;
+
+ @Override
+ public void setup(Context context) {
+ Configuration configuration = context.getConfiguration();
+ _avroRecordExtractor = new AvroRecordExtractor();
+ String sortingColumnConfig = configuration.get(InternalConfigConstants.SORTING_COLUMN_CONFIG);
+ if (sortingColumnConfig != null) {
+ _sortingColumn = sortingColumnConfig;
+ _sortingColumnType = FieldSpec.DataType.valueOf(configuration.get(InternalConfigConstants.SORTING_COLUMN_TYPE));
+ LOGGER.info("Initialized AvroDataPreprocessingMapper with sortingColumn: {} of type: {}", _sortingColumn,
+ _sortingColumnType);
+ } else {
+ LOGGER.info("Initialized AvroDataPreprocessingMapper without sorting column");
+ }
+ }
+
+ @Override
+ public void map(AvroKey<GenericRecord> key, NullWritable value, Context context)
+ throws IOException, InterruptedException {
+ GenericRecord record = key.datum();
+ if (_sortingColumn != null) {
+ Object object = record.get(_sortingColumn);
+ Preconditions
+ .checkState(object != null, "Failed to find value for sorting column: %s in record: %s", _sortingColumn,
+ record);
+ Object convertedValue = _avroRecordExtractor.convert(object);
+ Preconditions.checkState(convertedValue != null, "Invalid value: %s for sorting column: %s in record: %s", object,
+ _sortingColumn, record);
+ WritableComparable outputKey;
+ try {
+ outputKey = DataPreprocessingUtils.convertToWritableComparable(convertedValue, _sortingColumnType);
+ } catch (Exception e) {
+ throw new IllegalStateException(
+ String.format("Caught exception while processing sorting column: %s in record: %s", _sortingColumn, record),
+ e);
+ }
+ context.write(outputKey, new AvroValue<>(record));
+ } else {
+ context.write(NullWritable.get(), new AvroValue<>(record));
+ }
+ }
+}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/OrcDataPreprocessingMapper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/OrcDataPreprocessingMapper.java
new file mode 100644
index 0000000..d7d0694
--- /dev/null
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/OrcDataPreprocessingMapper.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.hadoop.job.mappers;
+
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.orc.mapred.OrcStruct;
+import org.apache.orc.mapred.OrcValue;
+import org.apache.pinot.hadoop.job.InternalConfigConstants;
+import org.apache.pinot.hadoop.utils.preprocess.DataPreprocessingUtils;
+import org.apache.pinot.hadoop.utils.preprocess.OrcUtils;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class OrcDataPreprocessingMapper extends Mapper<NullWritable, OrcStruct, WritableComparable, OrcValue> {
+ private static final Logger LOGGER = LoggerFactory.getLogger(OrcDataPreprocessingMapper.class);
+
+ private final OrcValue _valueWrapper = new OrcValue();
+ private String _sortingColumn = null;
+ private FieldSpec.DataType _sortingColumnType = null;
+ private int _sortingColumnId = -1;
+
+ @Override
+ public void setup(Context context) {
+ Configuration configuration = context.getConfiguration();
+ String sortingColumnConfig = configuration.get(InternalConfigConstants.SORTING_COLUMN_CONFIG);
+ if (sortingColumnConfig != null) {
+ _sortingColumn = sortingColumnConfig;
+ _sortingColumnType = FieldSpec.DataType.valueOf(configuration.get(InternalConfigConstants.SORTING_COLUMN_TYPE));
+ LOGGER.info("Initialized OrcDataPreprocessingMapper with sortingColumn: {} of type: {}", _sortingColumn,
+ _sortingColumnType);
+ } else {
+ LOGGER.info("Initialized OrcDataPreprocessingMapper without sorting column");
+ }
+ }
+
+ @Override
+ public void map(NullWritable key, OrcStruct value, Context context)
+ throws IOException, InterruptedException {
+ _valueWrapper.value = value;
+ if (_sortingColumn != null) {
+ if (_sortingColumnId == -1) {
+ List<String> fieldNames = value.getSchema().getFieldNames();
+ _sortingColumnId = fieldNames.indexOf(_sortingColumn);
+ Preconditions.checkState(_sortingColumnId != -1, "Failed to find sorting column: %s in the ORC fields: %s",
+ _sortingColumn, fieldNames);
+ LOGGER.info("Field id for sorting column: {} is: {}", _sortingColumn, _sortingColumnId);
+ }
+ WritableComparable sortingColumnValue = value.getFieldValue(_sortingColumnId);
+ WritableComparable outputKey;
+ try {
+ outputKey = DataPreprocessingUtils
+ .convertToWritableComparable(OrcUtils.convert(sortingColumnValue), _sortingColumnType);
+ } catch (Exception e) {
+ throw new IllegalStateException(String
+ .format("Caught exception while processing sorting column: %s, id: %d in ORC struct: %s", _sortingColumn,
+ _sortingColumnId, value), e);
+ }
+ context.write(outputKey, _valueWrapper);
+ } else {
+ context.write(NullWritable.get(), _valueWrapper);
+ }
+ }
+}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentPreprocessingMapper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentPreprocessingMapper.java
index e3f088d..6decee1 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentPreprocessingMapper.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentPreprocessingMapper.java
@@ -20,6 +20,7 @@ package org.apache.pinot.hadoop.job.mappers;
import com.google.common.base.Preconditions;
import java.io.IOException;
+import java.util.Map;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
@@ -40,6 +41,7 @@ import org.slf4j.LoggerFactory;
public class SegmentPreprocessingMapper extends Mapper<AvroKey<GenericRecord>, NullWritable, AvroKey<GenericRecord>, AvroValue<GenericRecord>> {
private static final Logger LOGGER = LoggerFactory.getLogger(SegmentPreprocessingMapper.class);
+ private Configuration _jobConf;
private String _sortedColumn = null;
private String _timeColumn = null;
private Schema _outputKeySchema;
@@ -52,43 +54,42 @@ public class SegmentPreprocessingMapper extends Mapper<AvroKey<GenericRecord>, N
@Override
public void setup(final Context context) {
- Configuration configuration = context.getConfiguration();
-
- String tableName = configuration.get(JobConfigConstants.SEGMENT_TABLE_NAME);
-
- _isAppend = configuration.get(InternalConfigConstants.IS_APPEND).equalsIgnoreCase("true");
+ _jobConf = context.getConfiguration();
+ logConfigurations();
+ String tableName = _jobConf.get(JobConfigConstants.SEGMENT_TABLE_NAME);
+ _isAppend = "true".equalsIgnoreCase(_jobConf.get(InternalConfigConstants.IS_APPEND));
if (_isAppend) {
// Get time column name
- _timeColumn = configuration.get(InternalConfigConstants.TIME_COLUMN_CONFIG);
+ _timeColumn = _jobConf.get(InternalConfigConstants.TIME_COLUMN_CONFIG);
// Get sample time column value
- String timeColumnValue = configuration.get(InternalConfigConstants.TIME_COLUMN_VALUE);
- String pushFrequency = configuration.get(InternalConfigConstants.SEGMENT_PUSH_FREQUENCY);
+ String timeColumnValue = _jobConf.get(InternalConfigConstants.TIME_COLUMN_VALUE);
+ String pushFrequency = _jobConf.get(InternalConfigConstants.SEGMENT_PUSH_FREQUENCY);
- String timeType = configuration.get(InternalConfigConstants.SEGMENT_TIME_TYPE);
- String timeFormat = configuration.get(InternalConfigConstants.SEGMENT_TIME_FORMAT);
+ String timeType = _jobConf.get(InternalConfigConstants.SEGMENT_TIME_TYPE);
+ String timeFormat = _jobConf.get(InternalConfigConstants.SEGMENT_TIME_FORMAT);
DateTimeFormatSpec dateTimeFormatSpec;
if (timeFormat.equals(DateTimeFieldSpec.TimeFormat.EPOCH.toString())) {
dateTimeFormatSpec = new DateTimeFormatSpec(1, timeType, timeFormat);
} else {
dateTimeFormatSpec = new DateTimeFormatSpec(1, timeType, timeFormat,
- configuration.get(InternalConfigConstants.SEGMENT_TIME_SDF_PATTERN));
+ _jobConf.get(InternalConfigConstants.SEGMENT_TIME_SDF_PATTERN));
}
_normalizedDateSegmentNameGenerator =
new NormalizedDateSegmentNameGenerator(tableName, null, false, "APPEND", pushFrequency, dateTimeFormatSpec);
_sampleNormalizedTimeColumnValue = _normalizedDateSegmentNameGenerator.getNormalizedDate(timeColumnValue);
}
- String sortedColumn = configuration.get(InternalConfigConstants.SORTED_COLUMN_CONFIG);
+ String sortedColumn = _jobConf.get(InternalConfigConstants.SORTING_COLUMN_CONFIG);
// Logging the configs for the mapper
LOGGER.info("Sorted Column: " + sortedColumn);
if (sortedColumn != null) {
_sortedColumn = sortedColumn;
}
- _outputKeySchema = AvroJob.getMapOutputKeySchema(configuration);
- _outputSchema = AvroJob.getMapOutputValueSchema(configuration);
- _enablePartitioning = Boolean.parseBoolean(configuration.get(InternalConfigConstants.ENABLE_PARTITIONING, "false"));
+ _outputKeySchema = AvroJob.getMapOutputKeySchema(_jobConf);
+ _outputSchema = AvroJob.getMapOutputValueSchema(_jobConf);
+ _enablePartitioning = Boolean.parseBoolean(_jobConf.get(InternalConfigConstants.ENABLE_PARTITIONING, "false"));
}
@Override
@@ -130,4 +131,26 @@ public class SegmentPreprocessingMapper extends Mapper<AvroKey<GenericRecord>, N
throw e;
}
}
+
+ protected void logConfigurations() {
+ StringBuilder stringBuilder = new StringBuilder();
+ stringBuilder.append('{');
+ boolean firstEntry = true;
+ for (Map.Entry<String, String> entry : _jobConf) {
+ if (!firstEntry) {
+ stringBuilder.append(", ");
+ } else {
+ firstEntry = false;
+ }
+
+ stringBuilder.append(entry.getKey());
+ stringBuilder.append('=');
+ stringBuilder.append(entry.getValue());
+ }
+ stringBuilder.append('}');
+
+ LOGGER.info("*********************************************************************");
+ LOGGER.info("Job Configurations: {}", stringBuilder.toString());
+ LOGGER.info("*********************************************************************");
+ }
}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/AvroDataPreprocessingPartitioner.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/AvroDataPreprocessingPartitioner.java
new file mode 100644
index 0000000..74799c7
--- /dev/null
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/AvroDataPreprocessingPartitioner.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.hadoop.job.partitioners;
+
+import com.google.common.base.Preconditions;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.mapred.AvroValue;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.pinot.hadoop.job.InternalConfigConstants;
+import org.apache.pinot.plugin.inputformat.avro.AvroRecordExtractor;
+import org.apache.pinot.segment.spi.partition.PartitionFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class AvroDataPreprocessingPartitioner extends Partitioner<WritableComparable, AvroValue<GenericRecord>> implements Configurable {
+ private static final Logger LOGGER = LoggerFactory.getLogger(AvroDataPreprocessingPartitioner.class);
+
+ private Configuration _conf;
+ private String _partitionColumn;
+ private PartitionFunction _partitionFunction;
+ private AvroRecordExtractor _avroRecordExtractor;
+
+ @Override
+ public void setConf(Configuration conf) {
+ _conf = conf;
+ _avroRecordExtractor = new AvroRecordExtractor();
+ _partitionColumn = conf.get(InternalConfigConstants.PARTITION_COLUMN_CONFIG);
+ String partitionFunctionName = conf.get(InternalConfigConstants.PARTITION_FUNCTION_CONFIG);
+ int numPartitions = Integer.parseInt(conf.get(InternalConfigConstants.NUM_PARTITIONS_CONFIG));
+ _partitionFunction = PartitionFunctionFactory.getPartitionFunction(partitionFunctionName, numPartitions);
+ LOGGER.info(
+ "Initialized AvroDataPreprocessingPartitioner with partitionColumn: {}, partitionFunction: {}, numPartitions: {}",
+ _partitionColumn, partitionFunctionName, numPartitions);
+ }
+
+ @Override
+ public Configuration getConf() {
+ return _conf;
+ }
+
+ @Override
+ public int getPartition(WritableComparable key, AvroValue<GenericRecord> value, int numPartitions) {
+ GenericRecord record = value.datum();
+ Object object = record.get(_partitionColumn);
+ Preconditions
+ .checkState(object != null, "Failed to find value for partition column: %s in record: %s", _partitionColumn,
+ record);
+ Object convertedValue = _avroRecordExtractor.convert(object);
+ Preconditions.checkState(convertedValue != null, "Invalid value: %s for partition column: %s in record: %s", object,
+ _partitionColumn, record);
+ Preconditions.checkState(convertedValue instanceof Number || convertedValue instanceof String,
+ "Value for partition column: %s must be either a Number or a String, found: %s in record: %s", _partitionColumn,
+ convertedValue.getClass(), record);
+ // NOTE: Always partition with String type value because Broker uses String type value to prune segments
+ return _partitionFunction.getPartition(convertedValue.toString());
+ }
+}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/OrcDataPreprocessingPartitioner.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/OrcDataPreprocessingPartitioner.java
new file mode 100644
index 0000000..bef2cef
--- /dev/null
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/OrcDataPreprocessingPartitioner.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.hadoop.job.partitioners;
+
+import com.google.common.base.Preconditions;
+import java.util.List;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.orc.mapred.OrcStruct;
+import org.apache.orc.mapred.OrcValue;
+import org.apache.pinot.hadoop.job.InternalConfigConstants;
+import org.apache.pinot.hadoop.utils.preprocess.OrcUtils;
+import org.apache.pinot.segment.spi.partition.PartitionFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class OrcDataPreprocessingPartitioner extends Partitioner<WritableComparable, OrcValue> implements Configurable {
+ private static final Logger LOGGER = LoggerFactory.getLogger(OrcDataPreprocessingPartitioner.class);
+
+ private Configuration _conf;
+ private String _partitionColumn;
+ private PartitionFunction _partitionFunction;
+ private int _partitionColumnId = -1;
+
+ @Override
+ public void setConf(Configuration conf) {
+ _conf = conf;
+ _partitionColumn = conf.get(InternalConfigConstants.PARTITION_COLUMN_CONFIG);
+ String partitionFunctionName = conf.get(InternalConfigConstants.PARTITION_FUNCTION_CONFIG);
+ int numPartitions = Integer.parseInt(conf.get(InternalConfigConstants.NUM_PARTITIONS_CONFIG));
+ _partitionFunction = PartitionFunctionFactory.getPartitionFunction(partitionFunctionName, numPartitions);
+ LOGGER.info(
+ "Initialized OrcDataPreprocessingPartitioner with partitionColumn: {}, partitionFunction: {}, numPartitions: {}",
+ _partitionColumn, partitionFunctionName, numPartitions);
+ }
+
+ @Override
+ public Configuration getConf() {
+ return _conf;
+ }
+
+ @Override
+ public int getPartition(WritableComparable key, OrcValue value, int numPartitions) {
+ OrcStruct orcStruct = (OrcStruct) value.value;
+ if (_partitionColumnId == -1) {
+ List<String> fieldNames = orcStruct.getSchema().getFieldNames();
+ _partitionColumnId = fieldNames.indexOf(_partitionColumn);
+ Preconditions.checkState(_partitionColumnId != -1, "Failed to find partition column: %s in the ORC fields: %s",
+ _partitionColumn, fieldNames);
+ LOGGER.info("Field id for partition column: {} is: {}", _partitionColumn, _partitionColumnId);
+ }
+ WritableComparable partitionColumnValue = orcStruct.getFieldValue(_partitionColumnId);
+ Object convertedValue;
+ try {
+ convertedValue = OrcUtils.convert(partitionColumnValue);
+ } catch (Exception e) {
+ throw new IllegalStateException(String
+ .format("Caught exception while processing partition column: %s, id: %d in ORC struct: %s", _partitionColumn,
+ _partitionColumnId, orcStruct), e);
+ }
+ // NOTE: Always partition with String type value because Broker uses String type value to prune segments
+ return _partitionFunction.getPartition(convertedValue.toString());
+ }
+}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/SegmentPreprocessingReducer.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/AvroDataPreprocessingReducer.java
similarity index 63%
copy from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/SegmentPreprocessingReducer.java
copy to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/AvroDataPreprocessingReducer.java
index 9c07e6f..7e37ec5 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/SegmentPreprocessingReducer.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/AvroDataPreprocessingReducer.java
@@ -19,7 +19,6 @@
package org.apache.pinot.hadoop.job.reducers;
import java.io.IOException;
-import java.util.concurrent.atomic.AtomicInteger;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapred.AvroValue;
@@ -33,33 +32,43 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class SegmentPreprocessingReducer<T> extends Reducer<T, AvroValue<GenericRecord>, AvroKey<GenericRecord>, NullWritable> {
- private static final Logger LOGGER = LoggerFactory.getLogger(SegmentPreprocessingReducer.class);
+public class AvroDataPreprocessingReducer<T> extends Reducer<T, AvroValue<GenericRecord>, AvroKey<GenericRecord>, NullWritable> {
+ private static final Logger LOGGER = LoggerFactory.getLogger(AvroDataPreprocessingReducer.class);
private AvroMultipleOutputs _multipleOutputs;
- private AtomicInteger _counter;
- private int _maxNumberOfRecords;
+ private long _numRecords;
+ private int _maxNumRecordsPerFile;
private String _filePrefix;
@Override
public void setup(Context context) {
- LOGGER.info("Using multiple outputs strategy.");
Configuration configuration = context.getConfiguration();
- _multipleOutputs = new AvroMultipleOutputs(context);
- _counter = new AtomicInteger();
// If it's 0, the output file won't be split into multiple files.
// If not, output file will be split when the number of records reaches this number.
- _maxNumberOfRecords = configuration.getInt(InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE, 0);
- LOGGER.info("Maximum number of records per file: {}", _maxNumberOfRecords);
- _filePrefix = RandomStringUtils.randomAlphanumeric(4);
+ _maxNumRecordsPerFile = configuration.getInt(InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE, 0);
+ if (_maxNumRecordsPerFile > 0) {
+ LOGGER.info("Using multiple outputs strategy.");
+ _multipleOutputs = new AvroMultipleOutputs(context);
+ _numRecords = 0L;
+ _filePrefix = RandomStringUtils.randomAlphanumeric(4);
+ LOGGER.info("Initialized AvroDataPreprocessingReducer with maxNumRecordsPerFile: {}", _maxNumRecordsPerFile);
+ } else {
+ LOGGER.info("Initialized AvroDataPreprocessingReducer without limit on maxNumRecordsPerFile");
+ }
}
@Override
public void reduce(final T inputRecord, final Iterable<AvroValue<GenericRecord>> values, final Context context)
throws IOException, InterruptedException {
- for (final AvroValue<GenericRecord> value : values) {
- String fileName = generateFileName();
- _multipleOutputs.write(new AvroKey<>(value.datum()), NullWritable.get(), fileName);
+ if (_maxNumRecordsPerFile > 0) {
+ for (final AvroValue<GenericRecord> value : values) {
+ String fileName = _filePrefix + (_numRecords++ / _maxNumRecordsPerFile);
+ _multipleOutputs.write(new AvroKey<>(value.datum()), NullWritable.get(), fileName);
+ }
+ } else {
+ for (final AvroValue<GenericRecord> value : values) {
+ context.write(new AvroKey<>(value.datum()), NullWritable.get());
+ }
}
}
@@ -73,12 +82,4 @@ public class SegmentPreprocessingReducer<T> extends Reducer<T, AvroValue<Generic
}
LOGGER.info("Finished cleaning up reducer.");
}
-
- private String generateFileName() {
- if (_maxNumberOfRecords == 0) {
- return _filePrefix;
- } else {
- return _filePrefix + (_counter.getAndIncrement() / _maxNumberOfRecords);
- }
- }
}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/SegmentPreprocessingReducer.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/OrcDataPreprocessingReducer.java
similarity index 54%
rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/SegmentPreprocessingReducer.java
rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/OrcDataPreprocessingReducer.java
index 9c07e6f..70bf0dd 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/SegmentPreprocessingReducer.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/OrcDataPreprocessingReducer.java
@@ -19,47 +19,56 @@
package org.apache.pinot.hadoop.job.reducers;
import java.io.IOException;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.mapred.AvroKey;
-import org.apache.avro.mapred.AvroValue;
-import org.apache.avro.mapreduce.AvroMultipleOutputs;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
+import org.apache.orc.mapred.OrcStruct;
+import org.apache.orc.mapred.OrcValue;
import org.apache.pinot.hadoop.job.InternalConfigConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class SegmentPreprocessingReducer<T> extends Reducer<T, AvroValue<GenericRecord>, AvroKey<GenericRecord>, NullWritable> {
- private static final Logger LOGGER = LoggerFactory.getLogger(SegmentPreprocessingReducer.class);
+public class OrcDataPreprocessingReducer extends Reducer<WritableComparable, OrcValue, NullWritable, OrcStruct> {
+ private static final Logger LOGGER = LoggerFactory.getLogger(OrcDataPreprocessingReducer.class);
- private AvroMultipleOutputs _multipleOutputs;
- private AtomicInteger _counter;
- private int _maxNumberOfRecords;
+ private int _maxNumRecordsPerFile;
+ private MultipleOutputs<NullWritable, OrcStruct> _multipleOutputs;
+ private long _numRecords;
private String _filePrefix;
@Override
public void setup(Context context) {
- LOGGER.info("Using multiple outputs strategy.");
Configuration configuration = context.getConfiguration();
- _multipleOutputs = new AvroMultipleOutputs(context);
- _counter = new AtomicInteger();
// If it's 0, the output file won't be split into multiple files.
// If not, output file will be split when the number of records reaches this number.
- _maxNumberOfRecords = configuration.getInt(InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE, 0);
- LOGGER.info("Maximum number of records per file: {}", _maxNumberOfRecords);
- _filePrefix = RandomStringUtils.randomAlphanumeric(4);
+ _maxNumRecordsPerFile = configuration.getInt(InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE, 0);
+ if (_maxNumRecordsPerFile > 0) {
+ LOGGER.info("Using multiple outputs strategy.");
+ _multipleOutputs = new MultipleOutputs<>(context);
+ _numRecords = 0L;
+ _filePrefix = RandomStringUtils.randomAlphanumeric(4);
+ LOGGER.info("Initialized OrcDataPreprocessingReducer with maxNumRecordsPerFile: {}", _maxNumRecordsPerFile);
+ } else {
+ LOGGER.info("Initialized OrcDataPreprocessingReducer without limit on maxNumRecordsPerFile");
+ }
}
@Override
- public void reduce(final T inputRecord, final Iterable<AvroValue<GenericRecord>> values, final Context context)
+ public void reduce(WritableComparable key, Iterable<OrcValue> values, Context context)
throws IOException, InterruptedException {
- for (final AvroValue<GenericRecord> value : values) {
- String fileName = generateFileName();
- _multipleOutputs.write(new AvroKey<>(value.datum()), NullWritable.get(), fileName);
+ if (_maxNumRecordsPerFile > 0) {
+ for (final OrcValue value : values) {
+ String fileName = _filePrefix + (_numRecords++ / _maxNumRecordsPerFile);
+ _multipleOutputs.write(NullWritable.get(), (OrcStruct) value.value, fileName);
+ }
+ } else {
+ for (final OrcValue value : values) {
+ context.write(NullWritable.get(), (OrcStruct) value.value);
+ }
}
}
@@ -73,12 +82,4 @@ public class SegmentPreprocessingReducer<T> extends Reducer<T, AvroValue<Generic
}
LOGGER.info("Finished cleaning up reducer.");
}
-
- private String generateFileName() {
- if (_maxNumberOfRecords == 0) {
- return _filePrefix;
- } else {
- return _filePrefix + (_counter.getAndIncrement() / _maxNumberOfRecords);
- }
- }
}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/DataFileUtils.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/DataFileUtils.java
new file mode 100644
index 0000000..58e1c1d
--- /dev/null
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/DataFileUtils.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.hadoop.utils.preprocess;
+
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+
+
+public class DataFileUtils {
+ private DataFileUtils() {
+ }
+
+ public static final String AVRO_FILE_EXTENSION = ".avro";
+ public static final String ORC_FILE_EXTENSION = ".orc";
+
+ /**
+ * Returns the data files under the input directory with the given file extension.
+ */
+ public static List<Path> getDataFiles(Path inputDir, String dataFileExtension)
+ throws IOException {
+ FileStatus fileStatus = HadoopUtils.DEFAULT_FILE_SYSTEM.getFileStatus(inputDir);
+ Preconditions.checkState(fileStatus.isDirectory(), "Path: %s is not a directory", inputDir);
+ List<Path> dataFiles = new ArrayList<>();
+ getDataFilesHelper(HadoopUtils.DEFAULT_FILE_SYSTEM.listStatus(inputDir), dataFileExtension, dataFiles);
+ return dataFiles;
+ }
+
+ private static void getDataFilesHelper(FileStatus[] fileStatuses, String dataFileExtension, List<Path> dataFiles)
+ throws IOException {
+ for (FileStatus fileStatus : fileStatuses) {
+ Path path = fileStatus.getPath();
+ if (fileStatus.isDirectory()) {
+ getDataFilesHelper(HadoopUtils.DEFAULT_FILE_SYSTEM.listStatus(path), dataFileExtension, dataFiles);
+ } else {
+ Preconditions.checkState(fileStatus.isFile(), "Path: %s is neither a directory nor a file", path);
+ if (path.getName().endsWith(dataFileExtension)) {
+ dataFiles.add(path);
+ }
+ }
+ }
+ }
+}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/DataPreprocessingUtils.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/DataPreprocessingUtils.java
new file mode 100644
index 0000000..234e2d3
--- /dev/null
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/DataPreprocessingUtils.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.hadoop.utils.preprocess;
+
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.pinot.spi.data.FieldSpec;
+
+
+public class DataPreprocessingUtils {
+ private DataPreprocessingUtils() {
+ }
+
+ /**
+ * Converts a value into {@link WritableComparable} based on the given data type.
+ * <p>NOTE: The passed in value must be either a Number or a String.
+ */
+ public static WritableComparable convertToWritableComparable(Object value, FieldSpec.DataType dataType) {
+ if (value instanceof Number) {
+ Number numberValue = (Number) value;
+ switch (dataType) {
+ case INT:
+ return new IntWritable(numberValue.intValue());
+ case LONG:
+ return new LongWritable(numberValue.longValue());
+ case FLOAT:
+ return new FloatWritable(numberValue.floatValue());
+ case DOUBLE:
+ return new DoubleWritable(numberValue.doubleValue());
+ case STRING:
+ return new Text(numberValue.toString());
+ default:
+ throw new IllegalArgumentException("Unsupported data type: " + dataType);
+ }
+ } else if (value instanceof String) {
+ String stringValue = (String) value;
+ switch (dataType) {
+ case INT:
+ return new IntWritable(Integer.parseInt(stringValue));
+ case LONG:
+ return new LongWritable(Long.parseLong(stringValue));
+ case FLOAT:
+ return new FloatWritable(Float.parseFloat(stringValue));
+ case DOUBLE:
+ return new DoubleWritable(Double.parseDouble(stringValue));
+ case STRING:
+ return new Text(stringValue);
+ default:
+ throw new IllegalArgumentException("Unsupported data type: " + dataType);
+ }
+ } else {
+ throw new IllegalArgumentException(
+ String.format("Value: %s must be either a Number or a String, found: %s", value, value.getClass()));
+ }
+ }
+}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/HadoopUtils.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/HadoopUtils.java
new file mode 100644
index 0000000..0596259
--- /dev/null
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/HadoopUtils.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.hadoop.utils.preprocess;
+
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+
+
+public class HadoopUtils {
+ private HadoopUtils() {
+ }
+
+ public static final Configuration DEFAULT_CONFIGURATION;
+ public static final FileSystem DEFAULT_FILE_SYSTEM;
+
+ static {
+ DEFAULT_CONFIGURATION = new Configuration();
+ try {
+ DEFAULT_FILE_SYSTEM = FileSystem.get(DEFAULT_CONFIGURATION);
+ } catch (IOException e) {
+ throw new IllegalStateException("Failed to get the default file system", e);
+ }
+ }
+}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/OrcUtils.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/OrcUtils.java
new file mode 100644
index 0000000..dcfc3b5
--- /dev/null
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/OrcUtils.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.hadoop.utils.preprocess;
+
+import org.apache.hadoop.hive.serde2.io.DateWritable;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.ShortWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.orc.mapred.OrcTimestamp;
+
+
+public class OrcUtils {
+ private OrcUtils() {
+ }
+
+ /**
+ * Converts the ORC value into Number or String.
+ * <p>The following ORC types are supported:
+ * <ul>
+ * <li>IntWritable -> Integer</li>
+ * <li>LongWritable -> Long</li>
+ * <li>FloatWritable -> Float</li>
+ * <li>DoubleWritable -> Double</li>
+ * <li>Text -> String</li>
+ * <li>BooleanWritable -> String</li>
+ * <li>ByteWritable -> Byte</li>
+ * <li>ShortWritable -> Short</li>
+ * <li>DateWritable -> Long</li>
+ * <li>OrcTimestamp -> Long</li>
+ * </ul>
+ */
+ public static Object convert(WritableComparable orcValue) {
+ if (orcValue instanceof IntWritable) {
+ return ((IntWritable) orcValue).get();
+ }
+ if (orcValue instanceof LongWritable) {
+ return ((LongWritable) orcValue).get();
+ }
+ if (orcValue instanceof FloatWritable) {
+ return ((FloatWritable) orcValue).get();
+ }
+ if (orcValue instanceof DoubleWritable) {
+ return ((DoubleWritable) orcValue).get();
+ }
+ if (orcValue instanceof Text) {
+ return orcValue.toString();
+ }
+ if (orcValue instanceof BooleanWritable) {
+ return Boolean.toString(((BooleanWritable) orcValue).get());
+ }
+ if (orcValue instanceof ByteWritable) {
+ return ((ByteWritable) orcValue).get();
+ }
+ if (orcValue instanceof ShortWritable) {
+ return ((ShortWritable) orcValue).get();
+ }
+ if (orcValue instanceof DateWritable) {
+ return ((DateWritable) orcValue).get().getTime();
+ }
+ if (orcValue instanceof OrcTimestamp) {
+ return ((OrcTimestamp) orcValue).getTime();
+ }
+ throw new IllegalArgumentException(
+ String.format("Illegal ORC value: %s, class: %s", orcValue, orcValue.getClass()));
+ }
+}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/RawDataFormat.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/RawDataFormat.java
new file mode 100644
index 0000000..0ae3ca3
--- /dev/null
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/RawDataFormat.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.hadoop.utils.preprocess;
+
+/**
+ * The current supported data formats in Pinot Preprocessing jobs
+ */
+public enum RawDataFormat {
+ AVRO, ORC
+}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/TextComparator.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/TextComparator.java
new file mode 100644
index 0000000..65f1222
--- /dev/null
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/TextComparator.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.hadoop.utils.preprocess;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.pinot.spi.utils.StringUtils;
+
+
+/**
+ * Override the Text comparison logic to compare with the decoded String instead of the byte array.
+ */
+public class TextComparator extends WritableComparator {
+ public TextComparator() {
+ super(Text.class);
+ }
+
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ int n1 = WritableUtils.decodeVIntSize(b1[s1]);
+ int n2 = WritableUtils.decodeVIntSize(b2[s2]);
+ return StringUtils.decodeUtf8(b1, s1 + n1, l1 - n1).compareTo(StringUtils.decodeUtf8(b2, s2 + n2, l2 - n2));
+ }
+}
diff --git a/pom.xml b/pom.xml
index aaf122f..7337462 100644
--- a/pom.xml
+++ b/pom.xml
@@ -944,6 +944,11 @@
<classifier>hadoop2</classifier>
</dependency>
<dependency>
+ <groupId>org.apache.orc</groupId>
+ <artifactId>orc-mapreduce</artifactId>
+ <version>1.5.9</version>
+ </dependency>
+ <dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
<version>1.9.13</version>
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org