You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by je...@apache.org on 2019/06/27 22:35:03 UTC
[incubator-pinot] branch master updated: Disable Hadoop Pre-process
(#4375)
This is an automated email from the ASF dual-hosted git repository.
jenniferdai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new cc83174 Disable Hadoop Pre-process (#4375)
cc83174 is described below
commit cc8317438dd4edd6d41885dcf0471f04e42d4f64
Author: Jennifer Dai <je...@users.noreply.github.com>
AuthorDate: Thu Jun 27 15:34:57 2019 -0700
Disable Hadoop Pre-process (#4375)
* Disabling hadoop pre-process until some protective features are added
---
.../pinot/hadoop/job/SegmentPreprocessingJob.java | 171 +--------------------
...mentBuildPushOfflineClusterIntegrationTest.java | 87 +----------
2 files changed, 4 insertions(+), 254 deletions(-)
diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentPreprocessingJob.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentPreprocessingJob.java
index aadeb3b..a3f3901 100644
--- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentPreprocessingJob.java
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentPreprocessingJob.java
@@ -20,8 +20,6 @@ package org.apache.pinot.hadoop.job;
import com.google.common.base.Preconditions;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -32,41 +30,21 @@ import org.apache.avro.Schema;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.mapred.AvroKey;
-import org.apache.avro.mapred.AvroValue;
-import org.apache.avro.mapreduce.AvroJob;
-import org.apache.avro.mapreduce.AvroKeyOutputFormat;
-import org.apache.avro.mapreduce.AvroMultipleOutputs;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapred.JobContext;
import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import org.apache.pinot.common.config.ColumnPartitionConfig;
import org.apache.pinot.common.config.IndexingConfig;
import org.apache.pinot.common.config.SegmentPartitionConfig;
import org.apache.pinot.common.config.TableConfig;
import org.apache.pinot.common.config.TableCustomConfig;
-import org.apache.pinot.hadoop.io.CombineAvroKeyInputFormat;
-import org.apache.pinot.hadoop.job.mappers.SegmentPreprocessingMapper;
-import org.apache.pinot.hadoop.job.partitioners.GenericPartitioner;
-import org.apache.pinot.hadoop.job.reducers.SegmentPreprocessingReducer;
-import org.apache.pinot.hadoop.utils.JobPreparationHelper;
import org.apache.pinot.hadoop.utils.PushLocation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.hadoop.mapreduce.MRJobConfig.*;
-import static org.apache.hadoop.security.UserGroupInformation.*;
-
/**
* A Hadoop job which provides partitioning, sorting, and resizing against the input files, which is raw data in Avro format.
@@ -159,153 +137,8 @@ public class SegmentPreprocessingJob extends BaseSegmentJob {
public void run()
throws Exception {
- if (!_enablePartitioning && !_enableSorting && !_enableResizing) {
- _logger.info("Pre-processing job is disabled.");
- return;
- } else {
- _logger.info("Starting {}", getClass().getSimpleName());
- }
-
- _fileSystem = FileSystem.get(_conf);
- final List<Path> inputDataPath = getDataFilePaths(_inputSegmentDir);
-
- if (_fileSystem.exists(_preprocessedOutputDir)) {
- _logger.warn("Found the output folder {}, deleting it", _preprocessedOutputDir);
- _fileSystem.delete(_preprocessedOutputDir, true);
- }
- JobPreparationHelper.setDirPermission(_fileSystem, _preprocessedOutputDir, _defaultPermissionsMask);
-
- _tableConfig = getTableConfig();
- Preconditions.checkArgument(_tableConfig != null, "Table config shouldn't be null");
-
- if (_enablePartitioning) {
- fetchPartitioningConfig();
- _logger.info("partition.column: {}", _partitionColumn);
- _logger.info("num.partitions: {}", _numberOfPartitions);
- _logger.info("partition.function: {}", _partitionColumn);
- }
-
- if (_enableSorting) {
- fetchSortingConfig();
- _logger.info("sorted.column: {}", _sortedColumn);
- }
-
- if (_enableResizing) {
- fetchResizingConfig();
- _logger.info("minimum number of output files: {}", _numberOfOutputFiles);
- }
-
- _logger.info("Initializing a pre-processing job");
- Job job = Job.getInstance(_conf);
-
- job.getConfiguration().set(JobContext.JOB_NAME, this.getClass().getName());
- // Turn this on to always firstly use class paths that user specifies.
- job.getConfiguration().set(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, "true");
- // Turn this off since we don't need an empty file in the output directory
- job.getConfiguration().set(FileOutputCommitter.SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, "false");
-
- job.setJarByClass(SegmentPreprocessingJob.class);
-
- String hadoopTokenFileLocation = System.getenv(HADOOP_TOKEN_FILE_LOCATION);
- if (hadoopTokenFileLocation != null) {
- job.getConfiguration().set(MAPREDUCE_JOB_CREDENTIALS_BINARY, hadoopTokenFileLocation);
- }
-
- // Schema configs.
- Schema schema = getSchema(inputDataPath.get(0));
- _logger.info("Schema is: {}", schema.toString(true));
-
- // Validates configs against schema.
- validateConfigsAgainstSchema(schema);
-
- // Mapper configs.
- job.setMapperClass(SegmentPreprocessingMapper.class);
- job.setMapOutputKeyClass(AvroKey.class);
- job.setMapOutputValueClass(AvroValue.class);
- job.getConfiguration().setInt(JobContext.NUM_MAPS, inputDataPath.size());
-
- // Reducer configs.
- job.setReducerClass(SegmentPreprocessingReducer.class);
- job.setOutputKeyClass(AvroKey.class);
- job.setOutputValueClass(NullWritable.class);
-
- AvroMultipleOutputs.addNamedOutput(job, "avro", AvroKeyOutputFormat.class, schema);
- AvroMultipleOutputs.setCountersEnabled(job, true);
- // Use LazyOutputFormat to avoid creating empty files.
- LazyOutputFormat.setOutputFormatClass(job, AvroKeyOutputFormat.class);
-
- // Input and output paths.
- FileInputFormat.setInputPaths(job, _inputSegmentDir);
- FileOutputFormat.setOutputPath(job, _preprocessedOutputDir);
- _logger.info("Total number of files to be pre-processed: {}", inputDataPath.size());
-
- // Set up mapper output key
- Set<Schema.Field> fieldSet = new HashSet<>();
-
- // Partition configs.
- int numReduceTasks = (_numberOfPartitions != 0) ? _numberOfPartitions : inputDataPath.size();
- if (_partitionColumn != null) {
- job.getConfiguration().set(JobConfigConstants.ENABLE_PARTITIONING, "true");
- job.setPartitionerClass(GenericPartitioner.class);
- job.getConfiguration().set("partition.column", _partitionColumn);
- if (_partitionFunction != null) {
- job.getConfiguration().set("partition.function", _partitionFunction);
- }
- job.getConfiguration().set("num.partitions", Integer.toString(numReduceTasks));
- } else {
- if (_numberOfOutputFiles > 0) {
- numReduceTasks = _numberOfOutputFiles;
- }
- // Partitioning is disabled. Adding hashcode as one of the fields to mapper output key.
- // so that all the rows can be spread evenly.
- addHashCodeField(fieldSet);
- }
- setMaxNumRecordsConfigIfSpecified(job);
- job.setInputFormatClass(CombineAvroKeyInputFormat.class);
-
- _logger.info("Number of reduce tasks for pre-processing job: {}", numReduceTasks);
- job.setNumReduceTasks(numReduceTasks);
-
- // Sort config.
- if (_sortedColumn != null) {
- _logger.info("Adding sorted column: {} to job config", _sortedColumn);
- job.getConfiguration().set("sorted.column", _sortedColumn);
-
- addSortedColumnField(schema, fieldSet);
- } else {
- // If sorting is disabled, hashcode will be the only factor for sort/group comparator.
- addHashCodeField(fieldSet);
- }
-
- // Creates a wrapper for the schema of output key in mapper.
- Schema mapperOutputKeySchema = Schema.createRecord(/*name*/"record", /*doc*/"", /*namespace*/"", false);
- mapperOutputKeySchema.setFields(new ArrayList<>(fieldSet));
- _logger.info("Mapper output schema: {}", mapperOutputKeySchema);
-
- AvroJob.setInputKeySchema(job, schema);
- AvroJob.setMapOutputKeySchema(job, mapperOutputKeySchema);
- AvroJob.setMapOutputValueSchema(job, schema);
- AvroJob.setOutputKeySchema(job, schema);
-
- // Since we aren't extending AbstractHadoopJob, we need to add the jars for the job to
- // distributed cache ourselves. Take a look at how the addFilesToDistributedCache is
- // implemented so that you know what it does.
- _logger.info("HDFS class path: " + _pathToDependencyJar);
- if (_pathToDependencyJar != null) {
- _logger.info("Copying jars locally.");
- JobPreparationHelper.addDepsJarToDistributedCacheHelper(_fileSystem, job, _pathToDependencyJar);
- } else {
- _logger.info("Property '{}' not specified.", JobConfigConstants.PATH_TO_DEPS_JAR);
- }
-
- long startTime = System.currentTimeMillis();
- // Submit the job for execution.
- job.waitForCompletion(true);
- if (!job.isSuccessful()) {
- throw new RuntimeException("Job failed : " + job);
- }
-
- _logger.info("Finished pre-processing job in {}ms", (System.currentTimeMillis() - startTime));
+ _logger.info("Pre-processing job is disabled.");
+ return;
}
@Nullable
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HadoopSegmentBuildPushOfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HadoopSegmentBuildPushOfflineClusterIntegrationTest.java
index 5bb9b65..bda936e 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HadoopSegmentBuildPushOfflineClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HadoopSegmentBuildPushOfflineClusterIntegrationTest.java
@@ -19,48 +19,26 @@
package org.apache.pinot.integration.tests;
import java.io.File;
-import java.io.IOException;
-import java.util.HashSet;
import java.util.List;
-import java.util.Map;
import java.util.Properties;
-import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
-import org.apache.avro.file.DataFileStream;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.generic.GenericRecord;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
-import org.apache.pinot.common.config.ColumnPartitionConfig;
-import org.apache.pinot.common.config.SegmentPartitionConfig;
import org.apache.pinot.common.data.Schema;
-import org.apache.pinot.core.data.partition.PartitionFunction;
-import org.apache.pinot.core.data.partition.PartitionFunctionFactory;
import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
import org.apache.pinot.hadoop.job.JobConfigConstants;
-import org.apache.pinot.hadoop.job.SegmentPreprocessingJob;
import org.apache.pinot.hadoop.job.SegmentCreationJob;
import org.apache.pinot.hadoop.job.SegmentTarPushJob;
import org.apache.pinot.util.TestUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-import static org.apache.pinot.hadoop.job.JobConfigConstants.*;
-
public class HadoopSegmentBuildPushOfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet {
- private static final Logger LOGGER = LoggerFactory.getLogger(HadoopSegmentBuildPushOfflineClusterIntegrationTest.class);
private static final int NUM_BROKERS = 1;
private static final int NUM_SERVERS = 1;
@@ -88,7 +66,7 @@ public class HadoopSegmentBuildPushOfflineClusterIntegrationTest extends BaseClu
// Start the MR Yarn cluster
final Configuration conf = new Configuration();
- _mrCluster = new MiniMRYarnCluster(getClass().getName(), 2);
+ _mrCluster = new MiniMRYarnCluster(getClass().getName(), 1);
_mrCluster.init(conf);
_mrCluster.start();
@@ -115,7 +93,7 @@ public class HadoopSegmentBuildPushOfflineClusterIntegrationTest extends BaseClu
// Create the table
addOfflineTable(getTableName(), _schema.getTimeColumnName(), _schema.getOutgoingTimeUnit().toString(), null, null,
- getLoadMode(), SegmentVersion.v3, getInvertedIndexColumns(), getBloomFilterIndexColumns(), getTaskConfig(), getSegmentPartitionConfig(), getSortedColumn());
+ getLoadMode(), SegmentVersion.v3, getInvertedIndexColumns(), getBloomFilterIndexColumns(), getTaskConfig(), null, null);
// Generate and push Pinot segments from Hadoop
generateAndPushSegmentsFromHadoop();
@@ -176,25 +154,6 @@ public class HadoopSegmentBuildPushOfflineClusterIntegrationTest extends BaseClu
properties.setProperty(JobConfigConstants.PUSH_TO_HOSTS, getDefaultControllerConfiguration().getControllerHost());
properties.setProperty(JobConfigConstants.PUSH_TO_PORT, getDefaultControllerConfiguration().getControllerPort());
- Properties preComputeProperties = new Properties();
- preComputeProperties.putAll(properties);
- preComputeProperties.setProperty(ENABLE_PARTITIONING, Boolean.TRUE.toString());
- preComputeProperties.setProperty(ENABLE_SORTING, Boolean.TRUE.toString());
-
- preComputeProperties.setProperty(JobConfigConstants.PATH_TO_INPUT, _avroDir.getPath());
- preComputeProperties.setProperty(JobConfigConstants.PREPROCESS_PATH_TO_OUTPUT, _preprocessingDir.getPath());
- properties.setProperty(JobConfigConstants.PATH_TO_INPUT, _preprocessingDir.getPath());
-
- // Run segment pre-processing job
- SegmentPreprocessingJob segmentPreprocessingJob = new SegmentPreprocessingJob(preComputeProperties);
- Configuration preComputeConfig = _mrCluster.getConfig();
- segmentPreprocessingJob.setConf(preComputeConfig);
- segmentPreprocessingJob.run();
- LOGGER.info("Segment preprocessing job finished.");
-
- // Verify partitioning and sorting.
- verifyPreprocessingJob(preComputeConfig);
-
// Run segment creation job
SegmentCreationJob creationJob = new SegmentCreationJob(properties);
Configuration config = _mrCluster.getConfig();
@@ -206,46 +165,4 @@ public class HadoopSegmentBuildPushOfflineClusterIntegrationTest extends BaseClu
pushJob.setConf(_mrCluster.getConfig());
pushJob.run();
}
-
- private void verifyPreprocessingJob(Configuration preComputeConfig) throws IOException {
- // Fetch partitioning config and sorting config.
- SegmentPartitionConfig segmentPartitionConfig = getSegmentPartitionConfig();
- Map.Entry<String, ColumnPartitionConfig>
- entry = segmentPartitionConfig.getColumnPartitionMap().entrySet().iterator().next();
- String partitionColumn = entry.getKey();
- String partitionFunctionString = entry.getValue().getFunctionName();
- int numPartitions = entry.getValue().getNumPartitions();
- PartitionFunction partitionFunction = PartitionFunctionFactory.getPartitionFunction(partitionFunctionString, numPartitions);
- String sortedColumn = getSortedColumn();
-
- // Get output files.
- FileSystem fileSystem = FileSystem.get(preComputeConfig);
- FileStatus[] fileStatuses = fileSystem.listStatus(new Path(_preprocessingDir.getPath()));
- Assert.assertEquals(fileStatuses.length, numPartitions, "Number of output file should be the same as the number of partitions.");
-
- Set<Integer> partitionIdSet = new HashSet<>();
- Object previousObject;
- for (FileStatus fileStatus : fileStatuses) {
- Path avroFile = fileStatus.getPath();
- DataFileStream<GenericRecord> dataFileStream = new DataFileStream<>(fileSystem.open(avroFile), new GenericDatumReader<>());
-
- // Reset hash set and previous object
- partitionIdSet.clear();
- previousObject = null;
- while (dataFileStream.hasNext()) {
- GenericRecord genericRecord = dataFileStream.next();
- partitionIdSet.add(partitionFunction.getPartition(genericRecord.get(partitionColumn)));
- Assert.assertEquals(partitionIdSet.size(), 1, "Partition Id should be the same within a file.");
- org.apache.avro.Schema sortedColumnSchema = genericRecord.getSchema().getField(sortedColumn).schema();
- Object currentObject = genericRecord.get(sortedColumn);
- if (previousObject == null) {
- previousObject = currentObject;
- continue;
- }
- // The values of sorted column should be sorted in ascending order.
- Assert.assertTrue(GenericData.get().compare(previousObject, currentObject, sortedColumnSchema) <= 0);
- previousObject = currentObject;
- }
- }
- }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org