You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by je...@apache.org on 2019/06/27 21:04:16 UTC

[incubator-pinot] branch disablePreprocess created (now f4fdc76)

This is an automated email from the ASF dual-hosted git repository.

jenniferdai pushed a change to branch disablePreprocess
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git.


      at f4fdc76  Disable preprocess

This branch includes the following new commits:

     new f4fdc76  Disable preprocess

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 01/01: Disable preprocess

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jenniferdai pushed a commit to branch disablePreprocess
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit f4fdc7639c52fc4a039dd44632fd1c6196580851
Author: Jennifer Dai <jd...@linkedin.com>
AuthorDate: Thu Jun 27 14:04:03 2019 -0700

    Disable preprocess
---
 .../pinot/hadoop/job/SegmentPreprocessingJob.java  | 171 +--------------------
 1 file changed, 2 insertions(+), 169 deletions(-)

diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentPreprocessingJob.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentPreprocessingJob.java
index aadeb3b..a3f3901 100644
--- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentPreprocessingJob.java
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentPreprocessingJob.java
@@ -20,8 +20,6 @@ package org.apache.pinot.hadoop.job;
 
 import com.google.common.base.Preconditions;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -32,41 +30,21 @@ import org.apache.avro.Schema;
 import org.apache.avro.file.DataFileStream;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.mapred.AvroKey;
-import org.apache.avro.mapred.AvroValue;
-import org.apache.avro.mapreduce.AvroJob;
-import org.apache.avro.mapreduce.AvroKeyOutputFormat;
-import org.apache.avro.mapreduce.AvroMultipleOutputs;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapred.JobContext;
 import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
 import org.apache.pinot.common.config.ColumnPartitionConfig;
 import org.apache.pinot.common.config.IndexingConfig;
 import org.apache.pinot.common.config.SegmentPartitionConfig;
 import org.apache.pinot.common.config.TableConfig;
 import org.apache.pinot.common.config.TableCustomConfig;
-import org.apache.pinot.hadoop.io.CombineAvroKeyInputFormat;
-import org.apache.pinot.hadoop.job.mappers.SegmentPreprocessingMapper;
-import org.apache.pinot.hadoop.job.partitioners.GenericPartitioner;
-import org.apache.pinot.hadoop.job.reducers.SegmentPreprocessingReducer;
-import org.apache.pinot.hadoop.utils.JobPreparationHelper;
 import org.apache.pinot.hadoop.utils.PushLocation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.hadoop.mapreduce.MRJobConfig.*;
-import static org.apache.hadoop.security.UserGroupInformation.*;
-
 
 /**
  * A Hadoop job which provides partitioning, sorting, and resizing against the input files, which is raw data in Avro format.
@@ -159,153 +137,8 @@ public class SegmentPreprocessingJob extends BaseSegmentJob {
 
   public void run()
       throws Exception {
-    if (!_enablePartitioning && !_enableSorting && !_enableResizing) {
-      _logger.info("Pre-processing job is disabled.");
-      return;
-    } else {
-      _logger.info("Starting {}", getClass().getSimpleName());
-    }
-
-    _fileSystem = FileSystem.get(_conf);
-    final List<Path> inputDataPath = getDataFilePaths(_inputSegmentDir);
-
-    if (_fileSystem.exists(_preprocessedOutputDir)) {
-      _logger.warn("Found the output folder {}, deleting it", _preprocessedOutputDir);
-      _fileSystem.delete(_preprocessedOutputDir, true);
-    }
-    JobPreparationHelper.setDirPermission(_fileSystem, _preprocessedOutputDir, _defaultPermissionsMask);
-
-    _tableConfig = getTableConfig();
-    Preconditions.checkArgument(_tableConfig != null, "Table config shouldn't be null");
-
-    if (_enablePartitioning) {
-      fetchPartitioningConfig();
-      _logger.info("partition.column: {}", _partitionColumn);
-      _logger.info("num.partitions: {}", _numberOfPartitions);
-      _logger.info("partition.function: {}", _partitionColumn);
-    }
-
-    if (_enableSorting) {
-      fetchSortingConfig();
-      _logger.info("sorted.column: {}", _sortedColumn);
-    }
-
-    if (_enableResizing) {
-      fetchResizingConfig();
-      _logger.info("minimum number of output files: {}", _numberOfOutputFiles);
-    }
-
-    _logger.info("Initializing a pre-processing job");
-    Job job = Job.getInstance(_conf);
-
-    job.getConfiguration().set(JobContext.JOB_NAME, this.getClass().getName());
-    // Turn this on to always firstly use class paths that user specifies.
-    job.getConfiguration().set(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, "true");
-    // Turn this off since we don't need an empty file in the output directory
-    job.getConfiguration().set(FileOutputCommitter.SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, "false");
-
-    job.setJarByClass(SegmentPreprocessingJob.class);
-
-    String hadoopTokenFileLocation = System.getenv(HADOOP_TOKEN_FILE_LOCATION);
-    if (hadoopTokenFileLocation != null) {
-      job.getConfiguration().set(MAPREDUCE_JOB_CREDENTIALS_BINARY, hadoopTokenFileLocation);
-    }
-
-    // Schema configs.
-    Schema schema = getSchema(inputDataPath.get(0));
-    _logger.info("Schema is: {}", schema.toString(true));
-
-    // Validates configs against schema.
-    validateConfigsAgainstSchema(schema);
-
-    // Mapper configs.
-    job.setMapperClass(SegmentPreprocessingMapper.class);
-    job.setMapOutputKeyClass(AvroKey.class);
-    job.setMapOutputValueClass(AvroValue.class);
-    job.getConfiguration().setInt(JobContext.NUM_MAPS, inputDataPath.size());
-
-    // Reducer configs.
-    job.setReducerClass(SegmentPreprocessingReducer.class);
-    job.setOutputKeyClass(AvroKey.class);
-    job.setOutputValueClass(NullWritable.class);
-
-    AvroMultipleOutputs.addNamedOutput(job, "avro", AvroKeyOutputFormat.class, schema);
-    AvroMultipleOutputs.setCountersEnabled(job, true);
-    // Use LazyOutputFormat to avoid creating empty files.
-    LazyOutputFormat.setOutputFormatClass(job, AvroKeyOutputFormat.class);
-
-    // Input and output paths.
-    FileInputFormat.setInputPaths(job, _inputSegmentDir);
-    FileOutputFormat.setOutputPath(job, _preprocessedOutputDir);
-    _logger.info("Total number of files to be pre-processed: {}", inputDataPath.size());
-
-    // Set up mapper output key
-    Set<Schema.Field> fieldSet = new HashSet<>();
-
-    // Partition configs.
-    int numReduceTasks = (_numberOfPartitions != 0) ? _numberOfPartitions : inputDataPath.size();
-    if (_partitionColumn != null) {
-      job.getConfiguration().set(JobConfigConstants.ENABLE_PARTITIONING, "true");
-      job.setPartitionerClass(GenericPartitioner.class);
-      job.getConfiguration().set("partition.column", _partitionColumn);
-      if (_partitionFunction != null) {
-        job.getConfiguration().set("partition.function", _partitionFunction);
-      }
-      job.getConfiguration().set("num.partitions", Integer.toString(numReduceTasks));
-    } else {
-      if (_numberOfOutputFiles > 0) {
-        numReduceTasks = _numberOfOutputFiles;
-      }
-      // Partitioning is disabled. Adding hashcode as one of the fields to mapper output key.
-      // so that all the rows can be spread evenly.
-      addHashCodeField(fieldSet);
-    }
-    setMaxNumRecordsConfigIfSpecified(job);
-    job.setInputFormatClass(CombineAvroKeyInputFormat.class);
-
-    _logger.info("Number of reduce tasks for pre-processing job: {}", numReduceTasks);
-    job.setNumReduceTasks(numReduceTasks);
-
-    // Sort config.
-    if (_sortedColumn != null) {
-      _logger.info("Adding sorted column: {} to job config", _sortedColumn);
-      job.getConfiguration().set("sorted.column", _sortedColumn);
-
-      addSortedColumnField(schema, fieldSet);
-    } else {
-      // If sorting is disabled, hashcode will be the only factor for sort/group comparator.
-      addHashCodeField(fieldSet);
-    }
-
-    // Creates a wrapper for the schema of output key in mapper.
-    Schema mapperOutputKeySchema = Schema.createRecord(/*name*/"record", /*doc*/"", /*namespace*/"", false);
-    mapperOutputKeySchema.setFields(new ArrayList<>(fieldSet));
-    _logger.info("Mapper output schema: {}", mapperOutputKeySchema);
-
-    AvroJob.setInputKeySchema(job, schema);
-    AvroJob.setMapOutputKeySchema(job, mapperOutputKeySchema);
-    AvroJob.setMapOutputValueSchema(job, schema);
-    AvroJob.setOutputKeySchema(job, schema);
-
-    // Since we aren't extending AbstractHadoopJob, we need to add the jars for the job to
-    // distributed cache ourselves. Take a look at how the addFilesToDistributedCache is
-    // implemented so that you know what it does.
-    _logger.info("HDFS class path: " + _pathToDependencyJar);
-    if (_pathToDependencyJar != null) {
-      _logger.info("Copying jars locally.");
-      JobPreparationHelper.addDepsJarToDistributedCacheHelper(_fileSystem, job, _pathToDependencyJar);
-    } else {
-      _logger.info("Property '{}' not specified.", JobConfigConstants.PATH_TO_DEPS_JAR);
-    }
-
-    long startTime = System.currentTimeMillis();
-    // Submit the job for execution.
-    job.waitForCompletion(true);
-    if (!job.isSuccessful()) {
-      throw new RuntimeException("Job failed : " + job);
-    }
-
-    _logger.info("Finished pre-processing job in {}ms", (System.currentTimeMillis() - startTime));
+    _logger.info("Pre-processing job is disabled.");
+    return;
   }
 
   @Nullable


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org