You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by je...@apache.org on 2019/06/27 21:04:17 UTC
[incubator-pinot] 01/01: Disable preprocess
This is an automated email from the ASF dual-hosted git repository.
jenniferdai pushed a commit to branch disablePreprocess
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit f4fdc7639c52fc4a039dd44632fd1c6196580851
Author: Jennifer Dai <jd...@linkedin.com>
AuthorDate: Thu Jun 27 14:04:03 2019 -0700
Disable preprocess
---
.../pinot/hadoop/job/SegmentPreprocessingJob.java | 171 +--------------------
1 file changed, 2 insertions(+), 169 deletions(-)
diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentPreprocessingJob.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentPreprocessingJob.java
index aadeb3b..a3f3901 100644
--- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentPreprocessingJob.java
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentPreprocessingJob.java
@@ -20,8 +20,6 @@ package org.apache.pinot.hadoop.job;
import com.google.common.base.Preconditions;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -32,41 +30,21 @@ import org.apache.avro.Schema;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.mapred.AvroKey;
-import org.apache.avro.mapred.AvroValue;
-import org.apache.avro.mapreduce.AvroJob;
-import org.apache.avro.mapreduce.AvroKeyOutputFormat;
-import org.apache.avro.mapreduce.AvroMultipleOutputs;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapred.JobContext;
import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import org.apache.pinot.common.config.ColumnPartitionConfig;
import org.apache.pinot.common.config.IndexingConfig;
import org.apache.pinot.common.config.SegmentPartitionConfig;
import org.apache.pinot.common.config.TableConfig;
import org.apache.pinot.common.config.TableCustomConfig;
-import org.apache.pinot.hadoop.io.CombineAvroKeyInputFormat;
-import org.apache.pinot.hadoop.job.mappers.SegmentPreprocessingMapper;
-import org.apache.pinot.hadoop.job.partitioners.GenericPartitioner;
-import org.apache.pinot.hadoop.job.reducers.SegmentPreprocessingReducer;
-import org.apache.pinot.hadoop.utils.JobPreparationHelper;
import org.apache.pinot.hadoop.utils.PushLocation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.hadoop.mapreduce.MRJobConfig.*;
-import static org.apache.hadoop.security.UserGroupInformation.*;
-
/**
* A Hadoop job which provides partitioning, sorting, and resizing against the input files, which is raw data in Avro format.
@@ -159,153 +137,8 @@ public class SegmentPreprocessingJob extends BaseSegmentJob {
public void run()
throws Exception {
- if (!_enablePartitioning && !_enableSorting && !_enableResizing) {
- _logger.info("Pre-processing job is disabled.");
- return;
- } else {
- _logger.info("Starting {}", getClass().getSimpleName());
- }
-
- _fileSystem = FileSystem.get(_conf);
- final List<Path> inputDataPath = getDataFilePaths(_inputSegmentDir);
-
- if (_fileSystem.exists(_preprocessedOutputDir)) {
- _logger.warn("Found the output folder {}, deleting it", _preprocessedOutputDir);
- _fileSystem.delete(_preprocessedOutputDir, true);
- }
- JobPreparationHelper.setDirPermission(_fileSystem, _preprocessedOutputDir, _defaultPermissionsMask);
-
- _tableConfig = getTableConfig();
- Preconditions.checkArgument(_tableConfig != null, "Table config shouldn't be null");
-
- if (_enablePartitioning) {
- fetchPartitioningConfig();
- _logger.info("partition.column: {}", _partitionColumn);
- _logger.info("num.partitions: {}", _numberOfPartitions);
- _logger.info("partition.function: {}", _partitionColumn);
- }
-
- if (_enableSorting) {
- fetchSortingConfig();
- _logger.info("sorted.column: {}", _sortedColumn);
- }
-
- if (_enableResizing) {
- fetchResizingConfig();
- _logger.info("minimum number of output files: {}", _numberOfOutputFiles);
- }
-
- _logger.info("Initializing a pre-processing job");
- Job job = Job.getInstance(_conf);
-
- job.getConfiguration().set(JobContext.JOB_NAME, this.getClass().getName());
- // Turn this on to always firstly use class paths that user specifies.
- job.getConfiguration().set(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, "true");
- // Turn this off since we don't need an empty file in the output directory
- job.getConfiguration().set(FileOutputCommitter.SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, "false");
-
- job.setJarByClass(SegmentPreprocessingJob.class);
-
- String hadoopTokenFileLocation = System.getenv(HADOOP_TOKEN_FILE_LOCATION);
- if (hadoopTokenFileLocation != null) {
- job.getConfiguration().set(MAPREDUCE_JOB_CREDENTIALS_BINARY, hadoopTokenFileLocation);
- }
-
- // Schema configs.
- Schema schema = getSchema(inputDataPath.get(0));
- _logger.info("Schema is: {}", schema.toString(true));
-
- // Validates configs against schema.
- validateConfigsAgainstSchema(schema);
-
- // Mapper configs.
- job.setMapperClass(SegmentPreprocessingMapper.class);
- job.setMapOutputKeyClass(AvroKey.class);
- job.setMapOutputValueClass(AvroValue.class);
- job.getConfiguration().setInt(JobContext.NUM_MAPS, inputDataPath.size());
-
- // Reducer configs.
- job.setReducerClass(SegmentPreprocessingReducer.class);
- job.setOutputKeyClass(AvroKey.class);
- job.setOutputValueClass(NullWritable.class);
-
- AvroMultipleOutputs.addNamedOutput(job, "avro", AvroKeyOutputFormat.class, schema);
- AvroMultipleOutputs.setCountersEnabled(job, true);
- // Use LazyOutputFormat to avoid creating empty files.
- LazyOutputFormat.setOutputFormatClass(job, AvroKeyOutputFormat.class);
-
- // Input and output paths.
- FileInputFormat.setInputPaths(job, _inputSegmentDir);
- FileOutputFormat.setOutputPath(job, _preprocessedOutputDir);
- _logger.info("Total number of files to be pre-processed: {}", inputDataPath.size());
-
- // Set up mapper output key
- Set<Schema.Field> fieldSet = new HashSet<>();
-
- // Partition configs.
- int numReduceTasks = (_numberOfPartitions != 0) ? _numberOfPartitions : inputDataPath.size();
- if (_partitionColumn != null) {
- job.getConfiguration().set(JobConfigConstants.ENABLE_PARTITIONING, "true");
- job.setPartitionerClass(GenericPartitioner.class);
- job.getConfiguration().set("partition.column", _partitionColumn);
- if (_partitionFunction != null) {
- job.getConfiguration().set("partition.function", _partitionFunction);
- }
- job.getConfiguration().set("num.partitions", Integer.toString(numReduceTasks));
- } else {
- if (_numberOfOutputFiles > 0) {
- numReduceTasks = _numberOfOutputFiles;
- }
- // Partitioning is disabled. Adding hashcode as one of the fields to mapper output key.
- // so that all the rows can be spread evenly.
- addHashCodeField(fieldSet);
- }
- setMaxNumRecordsConfigIfSpecified(job);
- job.setInputFormatClass(CombineAvroKeyInputFormat.class);
-
- _logger.info("Number of reduce tasks for pre-processing job: {}", numReduceTasks);
- job.setNumReduceTasks(numReduceTasks);
-
- // Sort config.
- if (_sortedColumn != null) {
- _logger.info("Adding sorted column: {} to job config", _sortedColumn);
- job.getConfiguration().set("sorted.column", _sortedColumn);
-
- addSortedColumnField(schema, fieldSet);
- } else {
- // If sorting is disabled, hashcode will be the only factor for sort/group comparator.
- addHashCodeField(fieldSet);
- }
-
- // Creates a wrapper for the schema of output key in mapper.
- Schema mapperOutputKeySchema = Schema.createRecord(/*name*/"record", /*doc*/"", /*namespace*/"", false);
- mapperOutputKeySchema.setFields(new ArrayList<>(fieldSet));
- _logger.info("Mapper output schema: {}", mapperOutputKeySchema);
-
- AvroJob.setInputKeySchema(job, schema);
- AvroJob.setMapOutputKeySchema(job, mapperOutputKeySchema);
- AvroJob.setMapOutputValueSchema(job, schema);
- AvroJob.setOutputKeySchema(job, schema);
-
- // Since we aren't extending AbstractHadoopJob, we need to add the jars for the job to
- // distributed cache ourselves. Take a look at how the addFilesToDistributedCache is
- // implemented so that you know what it does.
- _logger.info("HDFS class path: " + _pathToDependencyJar);
- if (_pathToDependencyJar != null) {
- _logger.info("Copying jars locally.");
- JobPreparationHelper.addDepsJarToDistributedCacheHelper(_fileSystem, job, _pathToDependencyJar);
- } else {
- _logger.info("Property '{}' not specified.", JobConfigConstants.PATH_TO_DEPS_JAR);
- }
-
- long startTime = System.currentTimeMillis();
- // Submit the job for execution.
- job.waitForCompletion(true);
- if (!job.isSuccessful()) {
- throw new RuntimeException("Job failed : " + job);
- }
-
- _logger.info("Finished pre-processing job in {}ms", (System.currentTimeMillis() - startTime));
+ _logger.info("Pre-processing job is disabled.");
+ return;
}
@Nullable
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org