You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by je...@apache.org on 2019/06/27 21:04:17 UTC

[incubator-pinot] 01/01: Disable preprocess

This is an automated email from the ASF dual-hosted git repository.

jenniferdai pushed a commit to branch disablePreprocess
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit f4fdc7639c52fc4a039dd44632fd1c6196580851
Author: Jennifer Dai <jd...@linkedin.com>
AuthorDate: Thu Jun 27 14:04:03 2019 -0700

    Disable preprocess
---
 .../pinot/hadoop/job/SegmentPreprocessingJob.java  | 171 +--------------------
 1 file changed, 2 insertions(+), 169 deletions(-)

diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentPreprocessingJob.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentPreprocessingJob.java
index aadeb3b..a3f3901 100644
--- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentPreprocessingJob.java
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentPreprocessingJob.java
@@ -20,8 +20,6 @@ package org.apache.pinot.hadoop.job;
 
 import com.google.common.base.Preconditions;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -32,41 +30,21 @@ import org.apache.avro.Schema;
 import org.apache.avro.file.DataFileStream;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.mapred.AvroKey;
-import org.apache.avro.mapred.AvroValue;
-import org.apache.avro.mapreduce.AvroJob;
-import org.apache.avro.mapreduce.AvroKeyOutputFormat;
-import org.apache.avro.mapreduce.AvroMultipleOutputs;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapred.JobContext;
 import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
 import org.apache.pinot.common.config.ColumnPartitionConfig;
 import org.apache.pinot.common.config.IndexingConfig;
 import org.apache.pinot.common.config.SegmentPartitionConfig;
 import org.apache.pinot.common.config.TableConfig;
 import org.apache.pinot.common.config.TableCustomConfig;
-import org.apache.pinot.hadoop.io.CombineAvroKeyInputFormat;
-import org.apache.pinot.hadoop.job.mappers.SegmentPreprocessingMapper;
-import org.apache.pinot.hadoop.job.partitioners.GenericPartitioner;
-import org.apache.pinot.hadoop.job.reducers.SegmentPreprocessingReducer;
-import org.apache.pinot.hadoop.utils.JobPreparationHelper;
 import org.apache.pinot.hadoop.utils.PushLocation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.hadoop.mapreduce.MRJobConfig.*;
-import static org.apache.hadoop.security.UserGroupInformation.*;
-
 
 /**
  * A Hadoop job which provides partitioning, sorting, and resizing against the input files, which is raw data in Avro format.
@@ -159,153 +137,8 @@ public class SegmentPreprocessingJob extends BaseSegmentJob {
 
   public void run()
       throws Exception {
-    if (!_enablePartitioning && !_enableSorting && !_enableResizing) {
-      _logger.info("Pre-processing job is disabled.");
-      return;
-    } else {
-      _logger.info("Starting {}", getClass().getSimpleName());
-    }
-
-    _fileSystem = FileSystem.get(_conf);
-    final List<Path> inputDataPath = getDataFilePaths(_inputSegmentDir);
-
-    if (_fileSystem.exists(_preprocessedOutputDir)) {
-      _logger.warn("Found the output folder {}, deleting it", _preprocessedOutputDir);
-      _fileSystem.delete(_preprocessedOutputDir, true);
-    }
-    JobPreparationHelper.setDirPermission(_fileSystem, _preprocessedOutputDir, _defaultPermissionsMask);
-
-    _tableConfig = getTableConfig();
-    Preconditions.checkArgument(_tableConfig != null, "Table config shouldn't be null");
-
-    if (_enablePartitioning) {
-      fetchPartitioningConfig();
-      _logger.info("partition.column: {}", _partitionColumn);
-      _logger.info("num.partitions: {}", _numberOfPartitions);
-      _logger.info("partition.function: {}", _partitionColumn);
-    }
-
-    if (_enableSorting) {
-      fetchSortingConfig();
-      _logger.info("sorted.column: {}", _sortedColumn);
-    }
-
-    if (_enableResizing) {
-      fetchResizingConfig();
-      _logger.info("minimum number of output files: {}", _numberOfOutputFiles);
-    }
-
-    _logger.info("Initializing a pre-processing job");
-    Job job = Job.getInstance(_conf);
-
-    job.getConfiguration().set(JobContext.JOB_NAME, this.getClass().getName());
-    // Turn this on to always firstly use class paths that user specifies.
-    job.getConfiguration().set(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, "true");
-    // Turn this off since we don't need an empty file in the output directory
-    job.getConfiguration().set(FileOutputCommitter.SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, "false");
-
-    job.setJarByClass(SegmentPreprocessingJob.class);
-
-    String hadoopTokenFileLocation = System.getenv(HADOOP_TOKEN_FILE_LOCATION);
-    if (hadoopTokenFileLocation != null) {
-      job.getConfiguration().set(MAPREDUCE_JOB_CREDENTIALS_BINARY, hadoopTokenFileLocation);
-    }
-
-    // Schema configs.
-    Schema schema = getSchema(inputDataPath.get(0));
-    _logger.info("Schema is: {}", schema.toString(true));
-
-    // Validates configs against schema.
-    validateConfigsAgainstSchema(schema);
-
-    // Mapper configs.
-    job.setMapperClass(SegmentPreprocessingMapper.class);
-    job.setMapOutputKeyClass(AvroKey.class);
-    job.setMapOutputValueClass(AvroValue.class);
-    job.getConfiguration().setInt(JobContext.NUM_MAPS, inputDataPath.size());
-
-    // Reducer configs.
-    job.setReducerClass(SegmentPreprocessingReducer.class);
-    job.setOutputKeyClass(AvroKey.class);
-    job.setOutputValueClass(NullWritable.class);
-
-    AvroMultipleOutputs.addNamedOutput(job, "avro", AvroKeyOutputFormat.class, schema);
-    AvroMultipleOutputs.setCountersEnabled(job, true);
-    // Use LazyOutputFormat to avoid creating empty files.
-    LazyOutputFormat.setOutputFormatClass(job, AvroKeyOutputFormat.class);
-
-    // Input and output paths.
-    FileInputFormat.setInputPaths(job, _inputSegmentDir);
-    FileOutputFormat.setOutputPath(job, _preprocessedOutputDir);
-    _logger.info("Total number of files to be pre-processed: {}", inputDataPath.size());
-
-    // Set up mapper output key
-    Set<Schema.Field> fieldSet = new HashSet<>();
-
-    // Partition configs.
-    int numReduceTasks = (_numberOfPartitions != 0) ? _numberOfPartitions : inputDataPath.size();
-    if (_partitionColumn != null) {
-      job.getConfiguration().set(JobConfigConstants.ENABLE_PARTITIONING, "true");
-      job.setPartitionerClass(GenericPartitioner.class);
-      job.getConfiguration().set("partition.column", _partitionColumn);
-      if (_partitionFunction != null) {
-        job.getConfiguration().set("partition.function", _partitionFunction);
-      }
-      job.getConfiguration().set("num.partitions", Integer.toString(numReduceTasks));
-    } else {
-      if (_numberOfOutputFiles > 0) {
-        numReduceTasks = _numberOfOutputFiles;
-      }
-      // Partitioning is disabled. Adding hashcode as one of the fields to mapper output key.
-      // so that all the rows can be spread evenly.
-      addHashCodeField(fieldSet);
-    }
-    setMaxNumRecordsConfigIfSpecified(job);
-    job.setInputFormatClass(CombineAvroKeyInputFormat.class);
-
-    _logger.info("Number of reduce tasks for pre-processing job: {}", numReduceTasks);
-    job.setNumReduceTasks(numReduceTasks);
-
-    // Sort config.
-    if (_sortedColumn != null) {
-      _logger.info("Adding sorted column: {} to job config", _sortedColumn);
-      job.getConfiguration().set("sorted.column", _sortedColumn);
-
-      addSortedColumnField(schema, fieldSet);
-    } else {
-      // If sorting is disabled, hashcode will be the only factor for sort/group comparator.
-      addHashCodeField(fieldSet);
-    }
-
-    // Creates a wrapper for the schema of output key in mapper.
-    Schema mapperOutputKeySchema = Schema.createRecord(/*name*/"record", /*doc*/"", /*namespace*/"", false);
-    mapperOutputKeySchema.setFields(new ArrayList<>(fieldSet));
-    _logger.info("Mapper output schema: {}", mapperOutputKeySchema);
-
-    AvroJob.setInputKeySchema(job, schema);
-    AvroJob.setMapOutputKeySchema(job, mapperOutputKeySchema);
-    AvroJob.setMapOutputValueSchema(job, schema);
-    AvroJob.setOutputKeySchema(job, schema);
-
-    // Since we aren't extending AbstractHadoopJob, we need to add the jars for the job to
-    // distributed cache ourselves. Take a look at how the addFilesToDistributedCache is
-    // implemented so that you know what it does.
-    _logger.info("HDFS class path: " + _pathToDependencyJar);
-    if (_pathToDependencyJar != null) {
-      _logger.info("Copying jars locally.");
-      JobPreparationHelper.addDepsJarToDistributedCacheHelper(_fileSystem, job, _pathToDependencyJar);
-    } else {
-      _logger.info("Property '{}' not specified.", JobConfigConstants.PATH_TO_DEPS_JAR);
-    }
-
-    long startTime = System.currentTimeMillis();
-    // Submit the job for execution.
-    job.waitForCompletion(true);
-    if (!job.isSuccessful()) {
-      throw new RuntimeException("Job failed : " + job);
-    }
-
-    _logger.info("Finished pre-processing job in {}ms", (System.currentTimeMillis() - startTime));
+    _logger.info("Pre-processing job is disabled.");
+    return;
   }
 
   @Nullable


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org