You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by je...@apache.org on 2019/06/27 18:52:25 UTC
[incubator-pinot] branch master updated: Add segment pre-processing
Hadoop job (#4253)
This is an automated email from the ASF dual-hosted git repository.
jenniferdai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 89a3449 Add segment pre-processing Hadoop job (#4253)
89a3449 is described below
commit 89a344998bce7aeaeda8a4a49d5df175bdb82185
Author: Jialiang Li <jl...@linkedin.com>
AuthorDate: Thu Jun 27 11:52:21 2019 -0700
Add segment pre-processing Hadoop job (#4253)
* Adds partitioning, sorting and resizing functionalities before Hadoop Pinot segment generation
---
.../apache/pinot/common/config/TableConfig.java | 8 +-
.../data/partition/MurmurPartitionFunction.java | 2 +-
pinot-hadoop/pom.xml | 5 +
.../pinot/hadoop/PinotHadoopJobLauncher.java | 7 +-
.../pinot/hadoop/io/CombineAvroKeyInputFormat.java | 57 +++
.../pinot/hadoop/job/JobConfigConstants.java | 5 +
.../pinot/hadoop/job/SegmentCreationJob.java | 33 +-
.../pinot/hadoop/job/SegmentPreprocessingJob.java | 483 +++++++++++++++++++++
.../{mapper => mappers}/SegmentCreationMapper.java | 2 +-
.../job/mappers/SegmentPreprocessingMapper.java | 85 ++++
.../job/partitioners/GenericPartitioner.java | 63 +++
.../job/partitioners/PartitionFunctionFactory.java | 98 +++++
.../job/reducers/SegmentPreprocessingReducer.java | 84 ++++
.../pinot/hadoop/utils/JobPreparationHelper.java | 58 +++
.../tests/BaseClusterIntegrationTest.java | 15 +
.../pinot/integration/tests/ClusterTest.java | 23 +-
.../ControllerPeriodicTasksIntegrationTests.java | 4 +-
...mentBuildPushOfflineClusterIntegrationTest.java | 87 +++-
.../tests/HybridClusterIntegrationTest.java | 2 +-
...ridClusterIntegrationTestCommandLineRunner.java | 2 +-
.../tests/OfflineClusterIntegrationTest.java | 6 +-
.../tests/SimpleMinionClusterIntegrationTest.java | 6 +-
pom.xml | 6 +
23 files changed, 1087 insertions(+), 54 deletions(-)
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/config/TableConfig.java b/pinot-common/src/main/java/org/apache/pinot/common/config/TableConfig.java
index 2f4ee1c..502a7a6 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/config/TableConfig.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/config/TableConfig.java
@@ -457,6 +457,7 @@ public class TableConfig {
private TableTaskConfig _taskConfig;
private RoutingConfig _routingConfig;
private HllConfig _hllConfig;
+ private SegmentPartitionConfig _segmentPartitionConfig;
private StarTreeIndexSpec _starTreeIndexSpec;
public Builder(TableType tableType) {
@@ -609,6 +610,11 @@ public class TableConfig {
return this;
}
+ public Builder setSegmentPartitionConfig(SegmentPartitionConfig segmentPartitionConfig) {
+ _segmentPartitionConfig = segmentPartitionConfig;
+ return this;
+ }
+
public TableConfig build() {
// Validation config
SegmentsValidationAndRetentionConfig validationConfig = new SegmentsValidationAndRetentionConfig();
@@ -646,7 +652,7 @@ public class TableConfig {
StreamConsumptionConfig streamConsumptionConfig = new StreamConsumptionConfig();
streamConsumptionConfig.setStreamPartitionAssignmentStrategy(_streamPartitionAssignmentStrategy);
indexingConfig.setStreamConsumptionConfig(streamConsumptionConfig);
- // TODO: set SegmentPartitionConfig here
+ indexingConfig.setSegmentPartitionConfig(_segmentPartitionConfig);
if (_customConfig == null) {
_customConfig = new TableCustomConfig();
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/partition/MurmurPartitionFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/data/partition/MurmurPartitionFunction.java
index fd522fb..24da4dd 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/partition/MurmurPartitionFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/partition/MurmurPartitionFunction.java
@@ -35,7 +35,7 @@ public class MurmurPartitionFunction implements PartitionFunction {
* @param numPartitions Number of partitions.
*/
public MurmurPartitionFunction(int numPartitions) {
- Preconditions.checkArgument(numPartitions > 0, "Number of partitions must be > 0, specified", numPartitions);
+ Preconditions.checkArgument(numPartitions > 0, "Number of partitions must be > 0");
_numPartitions = numPartitions;
}
diff --git a/pinot-hadoop/pom.xml b/pinot-hadoop/pom.xml
index cfcfc05..f6c2b55 100644
--- a/pinot-hadoop/pom.xml
+++ b/pinot-hadoop/pom.xml
@@ -118,6 +118,11 @@
<scope>provided</scope>
</dependency>
<dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro-mapred</artifactId>
+ <classifier>hadoop2</classifier>
+ </dependency>
+ <dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<scope>test</scope>
diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/PinotHadoopJobLauncher.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/PinotHadoopJobLauncher.java
index 2c7b0da..9f59d6d 100644
--- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/PinotHadoopJobLauncher.java
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/PinotHadoopJobLauncher.java
@@ -21,6 +21,7 @@ package org.apache.pinot.hadoop;
import java.io.FileInputStream;
import java.util.Arrays;
import java.util.Properties;
+import org.apache.pinot.hadoop.job.SegmentPreprocessingJob;
import org.apache.pinot.hadoop.job.SegmentCreationJob;
import org.apache.pinot.hadoop.job.SegmentTarPushJob;
import org.apache.pinot.hadoop.job.SegmentUriPushJob;
@@ -29,7 +30,8 @@ import org.apache.pinot.hadoop.job.SegmentUriPushJob;
public class PinotHadoopJobLauncher {
enum PinotHadoopJobType {
- SegmentCreation, SegmentTarPush, SegmentUriPush, SegmentCreationAndTarPush, SegmentCreationAndUriPush
+ SegmentCreation, SegmentTarPush, SegmentUriPush, SegmentCreationAndTarPush, SegmentCreationAndUriPush,
+ SegmentPreprocessing
}
private static final String USAGE = "usage: [job_type] [job.properties]";
@@ -61,6 +63,9 @@ public class PinotHadoopJobLauncher {
new SegmentCreationJob(jobConf).run();
new SegmentUriPushJob(jobConf).run();
break;
+ case SegmentPreprocessing:
+ new SegmentPreprocessingJob(jobConf).run();
+ break;
default:
throw new RuntimeException("Not a valid jobType - " + jobType);
}
diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/io/CombineAvroKeyInputFormat.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/io/CombineAvroKeyInputFormat.java
new file mode 100644
index 0000000..a36f8db
--- /dev/null
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/io/CombineAvroKeyInputFormat.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.hadoop.io;
+
+import java.io.IOException;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapreduce.AvroKeyInputFormat;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
+import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReaderWrapper;
+import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
+
+
+/**
+ * Create Hadoop splits across files.
+ * By default, hadoop cannot create splits across files. As a result, if the input has lots of small avro files,
+ * the job will end up having lots of short running mappers, which is inefficient.
+ * This class allows creating splits across avro files so we can have larger splits and fewer long running mappers,
+ * which improves the performance of the Hadoop job.
+ */
+public class CombineAvroKeyInputFormat<T> extends CombineFileInputFormat<AvroKey<T>, NullWritable> {
+ @Override
+ public RecordReader<AvroKey<T>, NullWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
+ throws IOException {
+ Class cls = AvroKeyRecordReaderWrapper.class;
+
+ return new CombineFileRecordReader<>((CombineFileSplit) split, context,
+ (Class<? extends RecordReader<AvroKey<T>, NullWritable>>) cls);
+ }
+
+ public static class AvroKeyRecordReaderWrapper<T> extends CombineFileRecordReaderWrapper<AvroKey<T>, NullWritable> {
+ public AvroKeyRecordReaderWrapper(CombineFileSplit split, TaskAttemptContext context, Integer index)
+ throws IOException, InterruptedException {
+ super(new AvroKeyInputFormat<>(), split, context, index);
+ }
+ }
+}
diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/JobConfigConstants.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/JobConfigConstants.java
index 35557b5..19b350c 100644
--- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/JobConfigConstants.java
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/JobConfigConstants.java
@@ -21,6 +21,7 @@ package org.apache.pinot.hadoop.job;
public class JobConfigConstants {
public static final String PATH_TO_INPUT = "path.to.input";
public static final String PATH_TO_OUTPUT = "path.to.output";
+ public static final String PREPROCESS_PATH_TO_OUTPUT = "preprocess.path.to.output";
public static final String PATH_TO_DEPS_JAR = "path.to.deps.jar";
public static final String PATH_TO_READER_CONFIG = "path.to.reader.config";
// Leave this for backward compatibility. We prefer to use the schema fetched from the controller.
@@ -52,4 +53,8 @@ public class JobConfigConstants {
// The path to the record reader to be configured
public static final String RECORD_READER_PATH = "record.reader.path";
+
+ public static final String ENABLE_PARTITIONING = "enable.partitioning";
+ public static final String ENABLE_SORTING = "enable.sorting";
+ public static final String ENABLE_RESIZING = "enable.resizing";
}
diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentCreationJob.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentCreationJob.java
index 997dfec..b4e56ef 100644
--- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentCreationJob.java
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentCreationJob.java
@@ -32,7 +32,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobContext;
@@ -46,7 +45,8 @@ import org.apache.pinot.common.config.SegmentsValidationAndRetentionConfig;
import org.apache.pinot.common.config.TableConfig;
import org.apache.pinot.common.data.Schema;
import org.apache.pinot.common.utils.StringUtil;
-import org.apache.pinot.hadoop.job.mapper.SegmentCreationMapper;
+import org.apache.pinot.hadoop.job.mappers.SegmentCreationMapper;
+import org.apache.pinot.hadoop.utils.JobPreparationHelper;
import org.apache.pinot.hadoop.utils.PushLocation;
@@ -202,16 +202,7 @@ public class SegmentCreationJob extends BaseSegmentJob {
}
_logger.info("Making directory: {}", dirPath);
_fileSystem.mkdirs(dirPath);
- setDirPermission(dirPath);
- }
-
- protected void setDirPermission(Path dirPath)
- throws IOException {
- if (_defaultPermissionsMask != null) {
- FsPermission permission = FsPermission.getDirDefault().applyUMask(new FsPermission(_defaultPermissionsMask));
- _logger.info("Setting permission: {} to directory: {}", permission, dirPath);
- _fileSystem.setPermission(dirPath, permission);
- }
+ JobPreparationHelper.setDirPermission(_fileSystem, dirPath, _defaultPermissionsMask);
}
@Nullable
@@ -263,23 +254,7 @@ public class SegmentCreationJob extends BaseSegmentJob {
protected void addDepsJarToDistributedCache(Job job)
throws IOException {
if (_depsJarDir != null) {
- addDepsJarToDistributedCacheHelper(job, _depsJarDir);
- }
- }
-
- protected void addDepsJarToDistributedCacheHelper(Job job, Path depsJarDir)
- throws IOException {
- FileStatus[] fileStatuses = _fileSystem.listStatus(depsJarDir);
- for (FileStatus fileStatus : fileStatuses) {
- if (fileStatus.isDirectory()) {
- addDepsJarToDistributedCacheHelper(job, fileStatus.getPath());
- } else {
- Path depJarPath = fileStatus.getPath();
- if (depJarPath.getName().endsWith(".jar")) {
- _logger.info("Adding deps jar: {} to distributed cache", depJarPath);
- job.addCacheArchive(depJarPath.toUri());
- }
- }
+ JobPreparationHelper.addDepsJarToDistributedCacheHelper(_fileSystem, job, _depsJarDir);
}
}
diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentPreprocessingJob.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentPreprocessingJob.java
new file mode 100644
index 0000000..aadeb3b
--- /dev/null
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentPreprocessingJob.java
@@ -0,0 +1,483 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.hadoop.job;
+
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.zip.GZIPInputStream;
+import javax.annotation.Nullable;
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapred.AvroValue;
+import org.apache.avro.mapreduce.AvroJob;
+import org.apache.avro.mapreduce.AvroKeyOutputFormat;
+import org.apache.avro.mapreduce.AvroMultipleOutputs;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
+import org.apache.pinot.common.config.ColumnPartitionConfig;
+import org.apache.pinot.common.config.IndexingConfig;
+import org.apache.pinot.common.config.SegmentPartitionConfig;
+import org.apache.pinot.common.config.TableConfig;
+import org.apache.pinot.common.config.TableCustomConfig;
+import org.apache.pinot.hadoop.io.CombineAvroKeyInputFormat;
+import org.apache.pinot.hadoop.job.mappers.SegmentPreprocessingMapper;
+import org.apache.pinot.hadoop.job.partitioners.GenericPartitioner;
+import org.apache.pinot.hadoop.job.reducers.SegmentPreprocessingReducer;
+import org.apache.pinot.hadoop.utils.JobPreparationHelper;
+import org.apache.pinot.hadoop.utils.PushLocation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.hadoop.mapreduce.MRJobConfig.*;
+import static org.apache.hadoop.security.UserGroupInformation.*;
+
+
+/**
+ * A Hadoop job which provides partitioning, sorting, and resizing against the input files, which is raw data in Avro format.
+ * Thus, the output files are partitioned, sorted, resized after this job.
+ * In order to run this job, the following configs need to be specified in job properties:
+ * * enable.partitioning: false by default. If enabled, this job will fetch the partition config from table config.
+ * * enable.sorting: false by default. If enabled, the job will fetch sorted column from table config.
+ * * enable.resizing: false by default. If partitioning is enabled, this is force to be disabled. If enabled, min.num.output.files is required.
+ * * min.num.output.files: null by default. It's required if resizing is enabled.
+ * * max.num.records: null by default. The output file will be split into multiple files if the number of records exceeded max.num.records.
+ */
+public class SegmentPreprocessingJob extends BaseSegmentJob {
+ private static final Logger _logger = LoggerFactory.getLogger(SegmentPreprocessingJob.class);
+
+ private boolean _enablePartitioning = false;
+ private boolean _enableSorting = false;
+ private boolean _enableResizing = false;
+
+ private String _partitionColumn;
+ private int _numberOfPartitions;
+ private String _partitionFunction;
+ private String _sortedColumn;
+ private int _numberOfOutputFiles;
+
+ private final Path _inputSegmentDir;
+ private final Path _preprocessedOutputDir;
+ protected final String _rawTableName;
+ protected final List<PushLocation> _pushLocations;
+
+ // Optional.
+ private final Path _pathToDependencyJar;
+ private final String _defaultPermissionsMask;
+
+ private TableConfig _tableConfig;
+ protected FileSystem _fileSystem;
+
+ public SegmentPreprocessingJob(final Properties properties) {
+ super(properties);
+
+ if (_properties.getProperty(JobConfigConstants.ENABLE_PARTITIONING) != null) {
+ _enablePartitioning = Boolean.parseBoolean(_properties.getProperty(JobConfigConstants.ENABLE_PARTITIONING));
+ }
+
+ if (_properties.getProperty(JobConfigConstants.ENABLE_SORTING) != null) {
+ _enableSorting = Boolean.parseBoolean(_properties.getProperty(JobConfigConstants.ENABLE_SORTING));
+ }
+
+ boolean resizingForcedDisabled = false;
+ if (_properties.getProperty(JobConfigConstants.ENABLE_RESIZING) != null) {
+ if (!_enablePartitioning) {
+ _enableResizing = Boolean.parseBoolean(_properties.getProperty(JobConfigConstants.ENABLE_RESIZING));
+ } else {
+ resizingForcedDisabled = true;
+ _logger.warn("Resizing cannot be enabled since partitioning has already been enabled!");
+ }
+ }
+
+ // get input/output paths.
+ _inputSegmentDir = Preconditions.checkNotNull(getPathFromProperty(JobConfigConstants.PATH_TO_INPUT));
+ _preprocessedOutputDir =
+ Preconditions.checkNotNull(getPathFromProperty(JobConfigConstants.PREPROCESS_PATH_TO_OUTPUT));
+ _rawTableName = Preconditions.checkNotNull(_properties.getProperty(JobConfigConstants.SEGMENT_TABLE_NAME));
+
+ _pathToDependencyJar = getPathFromProperty(JobConfigConstants.PATH_TO_DEPS_JAR);
+ _defaultPermissionsMask = _properties.getProperty(JobConfigConstants.DEFAULT_PERMISSIONS_MASK, null);
+
+ // Optional push location and table parameters. If set, will use the table config and schema from the push hosts.
+ String pushHostsString = _properties.getProperty(JobConfigConstants.PUSH_TO_HOSTS);
+ String pushPortString = _properties.getProperty(JobConfigConstants.PUSH_TO_PORT);
+ if (pushHostsString != null && pushPortString != null) {
+ _pushLocations =
+ PushLocation.getPushLocations(StringUtils.split(pushHostsString, ','), Integer.parseInt(pushPortString));
+ } else {
+ throw new RuntimeException(String
+ .format("Push location is mis-configured! %s: %s, %s: %s", JobConfigConstants.PUSH_TO_HOSTS, pushHostsString,
+ JobConfigConstants.PUSH_TO_PORT, pushPortString));
+ }
+
+ _logger.info("*********************************************************************");
+ _logger.info("enable.partitioning: {}", _enablePartitioning);
+ _logger.info("enable.sorting: {}", _enableSorting);
+ _logger.info("enable.resizing: {} {}", _enableResizing,
+ (resizingForcedDisabled ? "(forced to be disabled since partitioning is enabled)" : ""));
+ _logger.info("path.to.input: {}", _inputSegmentDir);
+ _logger.info("preprocess.path.to.output: {}", _preprocessedOutputDir);
+ _logger.info("path.to.deps.jar: {}", _pathToDependencyJar);
+ _logger.info("push.locations: {}", _pushLocations);
+ _logger.info("*********************************************************************");
+ }
+
+ public void run()
+ throws Exception {
+ if (!_enablePartitioning && !_enableSorting && !_enableResizing) {
+ _logger.info("Pre-processing job is disabled.");
+ return;
+ } else {
+ _logger.info("Starting {}", getClass().getSimpleName());
+ }
+
+ _fileSystem = FileSystem.get(_conf);
+ final List<Path> inputDataPath = getDataFilePaths(_inputSegmentDir);
+
+ if (_fileSystem.exists(_preprocessedOutputDir)) {
+ _logger.warn("Found the output folder {}, deleting it", _preprocessedOutputDir);
+ _fileSystem.delete(_preprocessedOutputDir, true);
+ }
+ JobPreparationHelper.setDirPermission(_fileSystem, _preprocessedOutputDir, _defaultPermissionsMask);
+
+ _tableConfig = getTableConfig();
+ Preconditions.checkArgument(_tableConfig != null, "Table config shouldn't be null");
+
+ if (_enablePartitioning) {
+ fetchPartitioningConfig();
+ _logger.info("partition.column: {}", _partitionColumn);
+ _logger.info("num.partitions: {}", _numberOfPartitions);
+ _logger.info("partition.function: {}", _partitionColumn);
+ }
+
+ if (_enableSorting) {
+ fetchSortingConfig();
+ _logger.info("sorted.column: {}", _sortedColumn);
+ }
+
+ if (_enableResizing) {
+ fetchResizingConfig();
+ _logger.info("minimum number of output files: {}", _numberOfOutputFiles);
+ }
+
+ _logger.info("Initializing a pre-processing job");
+ Job job = Job.getInstance(_conf);
+
+ job.getConfiguration().set(JobContext.JOB_NAME, this.getClass().getName());
+ // Turn this on to always firstly use class paths that user specifies.
+ job.getConfiguration().set(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, "true");
+ // Turn this off since we don't need an empty file in the output directory
+ job.getConfiguration().set(FileOutputCommitter.SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, "false");
+
+ job.setJarByClass(SegmentPreprocessingJob.class);
+
+ String hadoopTokenFileLocation = System.getenv(HADOOP_TOKEN_FILE_LOCATION);
+ if (hadoopTokenFileLocation != null) {
+ job.getConfiguration().set(MAPREDUCE_JOB_CREDENTIALS_BINARY, hadoopTokenFileLocation);
+ }
+
+ // Schema configs.
+ Schema schema = getSchema(inputDataPath.get(0));
+ _logger.info("Schema is: {}", schema.toString(true));
+
+ // Validates configs against schema.
+ validateConfigsAgainstSchema(schema);
+
+ // Mapper configs.
+ job.setMapperClass(SegmentPreprocessingMapper.class);
+ job.setMapOutputKeyClass(AvroKey.class);
+ job.setMapOutputValueClass(AvroValue.class);
+ job.getConfiguration().setInt(JobContext.NUM_MAPS, inputDataPath.size());
+
+ // Reducer configs.
+ job.setReducerClass(SegmentPreprocessingReducer.class);
+ job.setOutputKeyClass(AvroKey.class);
+ job.setOutputValueClass(NullWritable.class);
+
+ AvroMultipleOutputs.addNamedOutput(job, "avro", AvroKeyOutputFormat.class, schema);
+ AvroMultipleOutputs.setCountersEnabled(job, true);
+ // Use LazyOutputFormat to avoid creating empty files.
+ LazyOutputFormat.setOutputFormatClass(job, AvroKeyOutputFormat.class);
+
+ // Input and output paths.
+ FileInputFormat.setInputPaths(job, _inputSegmentDir);
+ FileOutputFormat.setOutputPath(job, _preprocessedOutputDir);
+ _logger.info("Total number of files to be pre-processed: {}", inputDataPath.size());
+
+ // Set up mapper output key
+ Set<Schema.Field> fieldSet = new HashSet<>();
+
+ // Partition configs.
+ int numReduceTasks = (_numberOfPartitions != 0) ? _numberOfPartitions : inputDataPath.size();
+ if (_partitionColumn != null) {
+ job.getConfiguration().set(JobConfigConstants.ENABLE_PARTITIONING, "true");
+ job.setPartitionerClass(GenericPartitioner.class);
+ job.getConfiguration().set("partition.column", _partitionColumn);
+ if (_partitionFunction != null) {
+ job.getConfiguration().set("partition.function", _partitionFunction);
+ }
+ job.getConfiguration().set("num.partitions", Integer.toString(numReduceTasks));
+ } else {
+ if (_numberOfOutputFiles > 0) {
+ numReduceTasks = _numberOfOutputFiles;
+ }
+ // Partitioning is disabled. Adding hashcode as one of the fields to mapper output key.
+ // so that all the rows can be spread evenly.
+ addHashCodeField(fieldSet);
+ }
+ setMaxNumRecordsConfigIfSpecified(job);
+ job.setInputFormatClass(CombineAvroKeyInputFormat.class);
+
+ _logger.info("Number of reduce tasks for pre-processing job: {}", numReduceTasks);
+ job.setNumReduceTasks(numReduceTasks);
+
+ // Sort config.
+ if (_sortedColumn != null) {
+ _logger.info("Adding sorted column: {} to job config", _sortedColumn);
+ job.getConfiguration().set("sorted.column", _sortedColumn);
+
+ addSortedColumnField(schema, fieldSet);
+ } else {
+ // If sorting is disabled, hashcode will be the only factor for sort/group comparator.
+ addHashCodeField(fieldSet);
+ }
+
+ // Creates a wrapper for the schema of output key in mapper.
+ Schema mapperOutputKeySchema = Schema.createRecord(/*name*/"record", /*doc*/"", /*namespace*/"", false);
+ mapperOutputKeySchema.setFields(new ArrayList<>(fieldSet));
+ _logger.info("Mapper output schema: {}", mapperOutputKeySchema);
+
+ AvroJob.setInputKeySchema(job, schema);
+ AvroJob.setMapOutputKeySchema(job, mapperOutputKeySchema);
+ AvroJob.setMapOutputValueSchema(job, schema);
+ AvroJob.setOutputKeySchema(job, schema);
+
+ // Since we aren't extending AbstractHadoopJob, we need to add the jars for the job to
+ // distributed cache ourselves. Take a look at how the addFilesToDistributedCache is
+ // implemented so that you know what it does.
+ _logger.info("HDFS class path: " + _pathToDependencyJar);
+ if (_pathToDependencyJar != null) {
+ _logger.info("Copying jars locally.");
+ JobPreparationHelper.addDepsJarToDistributedCacheHelper(_fileSystem, job, _pathToDependencyJar);
+ } else {
+ _logger.info("Property '{}' not specified.", JobConfigConstants.PATH_TO_DEPS_JAR);
+ }
+
+ long startTime = System.currentTimeMillis();
+ // Submit the job for execution.
+ job.waitForCompletion(true);
+ if (!job.isSuccessful()) {
+ throw new RuntimeException("Job failed : " + job);
+ }
+
+ _logger.info("Finished pre-processing job in {}ms", (System.currentTimeMillis() - startTime));
+ }
+
+ @Nullable
+ private TableConfig getTableConfig()
+ throws IOException {
+ try (ControllerRestApi controllerRestApi = getControllerRestApi()) {
+ return controllerRestApi != null ? controllerRestApi.getTableConfig() : null;
+ }
+ }
+
+ /**
+ * Can be overridden to provide custom controller Rest API.
+ */
+ @Nullable
+ private ControllerRestApi getControllerRestApi() {
+ return _pushLocations != null ? new DefaultControllerRestApi(_pushLocations, _rawTableName) : null;
+ }
+
+ private void fetchPartitioningConfig() {
+ // Fetch partition info from table config.
+ SegmentPartitionConfig segmentPartitionConfig = _tableConfig.getIndexingConfig().getSegmentPartitionConfig();
+ if (segmentPartitionConfig != null) {
+ Map<String, ColumnPartitionConfig> columnPartitionMap = segmentPartitionConfig.getColumnPartitionMap();
+ Preconditions.checkArgument(columnPartitionMap.size() <= 1, "There should be at most 1 partition setting in the table.");
+ if (columnPartitionMap.size() == 1) {
+ _partitionColumn = columnPartitionMap.keySet().iterator().next();
+ _numberOfPartitions = segmentPartitionConfig.getNumPartitions(_partitionColumn);
+ _partitionFunction = segmentPartitionConfig.getFunctionName(_partitionColumn);
+ }
+ } else {
+ _logger.info("Segment partition config is null for table: {}", _tableConfig.getTableName());
+ }
+ }
+
+ private void fetchSortingConfig() {
+ // Fetch sorting info from table config.
+ IndexingConfig indexingConfig = _tableConfig.getIndexingConfig();
+ List<String> sortedColumns = indexingConfig.getSortedColumn();
+ Preconditions.checkArgument(sortedColumns.size() <= 1, "There should be at most 1 sorted column in the table.");
+ if (sortedColumns.size() == 1) {
+ _sortedColumn = sortedColumns.get(0);
+ }
+ }
+
+ private void fetchResizingConfig() {
+ TableCustomConfig tableCustomConfig = _tableConfig.getCustomConfig();
+ if (tableCustomConfig == null) {
+ _numberOfOutputFiles = 0;
+ return;
+ }
+ Map<String, String> customConfigsMap = tableCustomConfig.getCustomConfigs();
+ if (customConfigsMap != null && customConfigsMap.containsKey("min.num.output.files")) {
+ _numberOfOutputFiles = Integer.parseInt(customConfigsMap.get("min.num.output.files"));
+ Preconditions.checkArgument(_numberOfOutputFiles > 0, String.format("The value of min.num.output.files should be positive! Current value: %s", customConfigsMap.get("min.num.output.files")));
+ } else {
+ _numberOfOutputFiles = 0;
+ }
+ }
+
+ private void setMaxNumRecordsConfigIfSpecified(Job job) {
+ TableCustomConfig tableCustomConfig = _tableConfig.getCustomConfig();
+ if (tableCustomConfig == null) {
+ return;
+ }
+ Map<String, String> customConfigsMap = tableCustomConfig.getCustomConfigs();
+ if (customConfigsMap != null && customConfigsMap.containsKey("max.num.records")) {
+ int maxNumRecords = Integer.parseInt(customConfigsMap.get("max.num.records"));
+ Preconditions.checkArgument(maxNumRecords > 0, "The value of max.num.records should be positive. Current value: " + customConfigsMap.get("max.num.records"));
+ _logger.info("Setting max.num.records to {}", maxNumRecords);
+ job.getConfiguration().set("max.num.records", Integer.toString(maxNumRecords));
+ }
+ }
+
+ /**
+ * Finds the avro file in the input folder, and returns its avro schema
+ * @param inputPathDir Path to input directory
+ * @return Input schema
+ * @throws IOException exception when accessing to IO
+ */
+ private static Schema getSchema(Path inputPathDir)
+ throws IOException {
+ FileSystem fs = FileSystem.get(new Configuration());
+ Schema avroSchema = null;
+ for (FileStatus fileStatus : fs.listStatus(inputPathDir)) {
+ if (fileStatus.isFile() && fileStatus.getPath().getName().endsWith(".avro")) {
+ _logger.info("Extracting schema from " + fileStatus.getPath());
+ avroSchema = extractSchemaFromAvro(fileStatus.getPath());
+ break;
+ }
+ }
+ return avroSchema;
+ }
+
+ /**
+ * Extracts avro schema from avro file
+ * @param avroFile Input avro file
+ * @return Schema in avro file
+ * @throws IOException exception when accessing to IO
+ */
+ private static Schema extractSchemaFromAvro(Path avroFile)
+ throws IOException {
+ DataFileStream<GenericRecord> dataStreamReader = getAvroReader(avroFile);
+ Schema avroSchema = dataStreamReader.getSchema();
+ dataStreamReader.close();
+ return avroSchema;
+ }
+
+ /**
+ * Helper method that returns avro reader for the given avro file.
+ * If file name ends in 'gz' then returns the GZIP version, otherwise gives the regular reader.
+ *
+ * @param avroFile File to read
+ * @return Avro reader for the file.
+ * @throws IOException exception when accessing to IO
+ */
+ private static DataFileStream<GenericRecord> getAvroReader(Path avroFile)
+ throws IOException {
+ FileSystem fs = FileSystem.get(new Configuration());
+ if (avroFile.getName().endsWith("gz")) {
+ return new DataFileStream<>(new GZIPInputStream(fs.open(avroFile)), new GenericDatumReader<>());
+ } else {
+ return new DataFileStream<>(fs.open(avroFile), new GenericDatumReader<>());
+ }
+ }
+
+ private void addSortedColumnField(Schema schema, Set<Schema.Field> fieldSet) {
+ // Sorting is enabled. Adding sorted column value to mapper output key.
+ Schema sortedColumnSchema = schema.getField(_sortedColumn).schema();
+ Schema sortedColumnAsKeySchema;
+ if (sortedColumnSchema.getType().equals(Schema.Type.UNION)) {
+ sortedColumnAsKeySchema = Schema.createUnion(sortedColumnSchema.getTypes());
+ } else if (sortedColumnSchema.getType().equals(Schema.Type.ARRAY)) {
+ sortedColumnAsKeySchema = Schema.createArray(sortedColumnSchema.getElementType());
+ } else {
+ sortedColumnAsKeySchema = Schema.create(sortedColumnSchema.getType());
+ }
+ Schema.Field columnField = new Schema.Field(_sortedColumn, sortedColumnAsKeySchema, "sortedColumn", null);
+ fieldSet.add(columnField);
+ }
+
+ private void validateConfigsAgainstSchema(Schema schema) {
+ if (_enablePartitioning) {
+ Preconditions.checkArgument(_partitionColumn != null, "Partition column should not be null!");
+ Preconditions.checkArgument(schema.getField(_partitionColumn) != null, String
+ .format("Partition column: %s is not found from the schema of input files.", _partitionColumn));
+ Preconditions.checkArgument(_numberOfPartitions > 0, String.format("Number of partitions should be positive. Current value: %s", _numberOfPartitions));
+ Preconditions.checkArgument(_partitionFunction != null, "Partition function should not be null!");
+ }
+ if (_enableSorting) {
+ Preconditions.checkArgument(_sortedColumn != null, "Sorted column should not be null!");
+ Preconditions.checkArgument(schema.getField(_sortedColumn) != null, String
+ .format("Sorted column: %s is not found from the schema of input files.", _sortedColumn));
+ }
+ }
+
+ private void addHashCodeField(Set<Schema.Field> fieldSet) {
+ Schema.Field hashCodeField = new Schema.Field("hashcode", Schema.create(Schema.Type.INT), "hashcode", null);
+ fieldSet.add(hashCodeField);
+ }
+
+ @Override
+ protected boolean isDataFile(String fileName) {
+ // TODO: support orc format in the future.
+ return fileName.endsWith(".avro");
+ }
+
+ void cleanup() {
+ _logger.info("Clean up pre-processing output path: {}", _preprocessedOutputDir);
+ try {
+ _fileSystem.delete(_preprocessedOutputDir, true);
+ } catch (IOException e) {
+ _logger.error("Failed to clean up pre-processing output path: {}", _preprocessedOutputDir);
+ }
+ }
+}
diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mapper/SegmentCreationMapper.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java
similarity index 99%
rename from pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mapper/SegmentCreationMapper.java
rename to pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java
index 2fe3537..ccf752c 100644
--- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mapper/SegmentCreationMapper.java
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.hadoop.job.mapper;
+package org.apache.pinot.hadoop.job.mappers;
import com.google.common.base.Preconditions;
import java.io.File;
diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentPreprocessingMapper.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentPreprocessingMapper.java
new file mode 100644
index 0000000..2c4d012
--- /dev/null
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentPreprocessingMapper.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.hadoop.job.mappers;
+
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapred.AvroValue;
+import org.apache.avro.mapreduce.AvroJob;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.pinot.hadoop.job.JobConfigConstants.*;
+
+
+public class SegmentPreprocessingMapper extends Mapper<AvroKey<GenericRecord>, NullWritable, AvroKey<GenericRecord>, AvroValue<GenericRecord>> {
+ private static final Logger LOGGER = LoggerFactory.getLogger(SegmentPreprocessingMapper.class);
+ private String _sortedColumn = null;
+ private Schema _outputKeySchema;
+ private Schema _outputSchema;
+ private boolean _enablePartition = false;
+
+ @Override
+ public void setup(final Context context) {
+ Configuration configuration = context.getConfiguration();
+
+ String sortedColumn = configuration.get("sorted.column");
+ // Logging the configs for the mapper
+ LOGGER.info("Sorted Column: " + sortedColumn);
+ if (sortedColumn != null) {
+ _sortedColumn = sortedColumn;
+ }
+ _outputKeySchema = AvroJob.getMapOutputKeySchema(configuration);
+ _outputSchema = AvroJob.getMapOutputValueSchema(configuration);
+ _enablePartition = Boolean.parseBoolean(configuration.get(ENABLE_PARTITIONING));
+ LOGGER.info("Enable partitioning? " + _enablePartition);
+ }
+
+ @Override
+ public void map(AvroKey<GenericRecord> record, NullWritable value, final Context context)
+ throws IOException, InterruptedException {
+ final GenericRecord inputRecord = record.datum();
+ final Schema schema = inputRecord.getSchema();
+ Preconditions.checkArgument(_outputSchema.equals(schema), "The schema of all avro files should be the same!");
+
+ GenericRecord outputKey = new GenericData.Record(_outputKeySchema);
+ if (_sortedColumn == null) {
+ outputKey.put("hashcode", inputRecord.hashCode());
+ } else if (_enablePartition) {
+ outputKey.put(_sortedColumn, inputRecord.get(_sortedColumn));
+ } else {
+ outputKey.put(_sortedColumn, inputRecord.get(_sortedColumn));
+ outputKey.put("hashcode", inputRecord.hashCode());
+ }
+
+ try {
+ context.write(new AvroKey<>(outputKey), new AvroValue<>(inputRecord));
+ } catch (Exception e) {
+ LOGGER.error("Exception when writing context on mapper!");
+ throw e;
+ }
+ }
+}
diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/GenericPartitioner.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/GenericPartitioner.java
new file mode 100644
index 0000000..43d1396
--- /dev/null
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/GenericPartitioner.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.hadoop.job.partitioners;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.mapred.AvroValue;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.pinot.core.data.partition.PartitionFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class GenericPartitioner<T> extends Partitioner<T, AvroValue<GenericRecord>> implements Configurable {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(GenericPartitioner.class);
+ private Configuration _configuration;
+ private String _partitionColumn;
+ private int _numPartitions;
+ private PartitionFunction _partitionFunction;
+
+ @Override
+ public void setConf(Configuration conf) {
+ _configuration = conf;
+ _partitionColumn = _configuration.get("partition.column");
+ _numPartitions = Integer.parseInt(_configuration.get("num.partitions"));
+ _partitionFunction =
+ PartitionFunctionFactory.getPartitionFunction(_configuration.get("partition.function", null), _numPartitions);
+
+ LOGGER.info("The partition function is: " + _partitionFunction.getClass().getName());
+ LOGGER.info("The partition column is: " + _partitionColumn);
+ LOGGER.info("Total number of partitions is: " + _numPartitions);
+ }
+
+ @Override
+ public Configuration getConf() {
+ return _configuration;
+ }
+
+ @Override
+ public int getPartition(T genericRecordAvroKey, AvroValue<GenericRecord> genericRecordAvroValue, int numPartitions) {
+ final GenericRecord inputRecord = genericRecordAvroValue.datum();
+ final Object partitionColumnValue = inputRecord.get(_partitionColumn);
+ return _partitionFunction.getPartition(partitionColumnValue);
+ }
+}
diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/PartitionFunctionFactory.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/PartitionFunctionFactory.java
new file mode 100644
index 0000000..54c0bbc
--- /dev/null
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/PartitionFunctionFactory.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.hadoop.job.partitioners;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pinot.core.data.partition.ByteArrayPartitionFunction;
+import org.apache.pinot.core.data.partition.HashCodePartitionFunction;
+import org.apache.pinot.core.data.partition.ModuloPartitionFunction;
+import org.apache.pinot.core.data.partition.MurmurPartitionFunction;
+import org.apache.pinot.core.data.partition.PartitionFunction;
+
+
+public class PartitionFunctionFactory {
+
+ private PartitionFunctionFactory() {
+ }
+
+ public enum PartitionFunctionType {
+ Modulo,
+ Murmur,
+ HashCode,
+ ByteArray;
+ // Add more functions here.
+
+ private static final Map<String, PartitionFunctionType> VALUE_MAP = new HashMap<>();
+
+ static {
+ for (PartitionFunctionType functionType : PartitionFunctionType.values()) {
+ VALUE_MAP.put(functionType.name().toLowerCase(), functionType);
+ }
+ }
+
+ public static PartitionFunctionType fromString(String name) {
+ PartitionFunctionType functionType = VALUE_MAP.get(name.toLowerCase());
+
+ if (functionType == null) {
+ throw new IllegalArgumentException("No enum constant for: " + name);
+ }
+ return functionType;
+ }
+ }
+
+ /**
+ * This method generates and returns a partition function based on the provided string.
+ *
+ * @param functionName Name of partition function
+ * @return Partition function
+ */
+ public static PartitionFunction getPartitionFunction(String functionName, int numPartitions) {
+
+ // Return default partition function if there is no configuration given.
+ if (functionName == null) {
+ return new MurmurPartitionFunction(numPartitions);
+ }
+
+ PartitionFunctionType functionType;
+ try {
+ functionType = PartitionFunctionType.fromString(functionName);
+ } catch (IllegalArgumentException e) {
+ // By default, we use murmur
+ functionType = PartitionFunctionType.Murmur;
+ }
+
+ switch (functionType) {
+ case Modulo:
+ return new ModuloPartitionFunction(numPartitions);
+
+ case Murmur:
+ return new MurmurPartitionFunction(numPartitions);
+
+ case HashCode:
+ return new HashCodePartitionFunction(numPartitions);
+
+ case ByteArray:
+ return new ByteArrayPartitionFunction(numPartitions);
+
+ default:
+ return new MurmurPartitionFunction(numPartitions);
+ }
+ }
+}
diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/SegmentPreprocessingReducer.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/SegmentPreprocessingReducer.java
new file mode 100644
index 0000000..d07e98f
--- /dev/null
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/SegmentPreprocessingReducer.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.hadoop.job.reducers;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapred.AvroValue;
+import org.apache.avro.mapreduce.AvroMultipleOutputs;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class SegmentPreprocessingReducer<T> extends Reducer<T, AvroValue<GenericRecord>, AvroKey<GenericRecord>, NullWritable> {
+ private static final Logger LOGGER = LoggerFactory.getLogger(SegmentPreprocessingReducer.class);
+
+ private AvroMultipleOutputs _multipleOutputs;
+ private AtomicInteger _counter;
+ private int _maxNumberOfRecords;
+ private String _filePrefix;
+
+ @Override
+ public void setup(Context context) {
+ LOGGER.info("Using multiple outputs strategy.");
+ Configuration configuration = context.getConfiguration();
+ _multipleOutputs = new AvroMultipleOutputs(context);
+ _counter = new AtomicInteger();
+ // If it's 0, the output file won't be split into multiple files.
+ // If not, output file will be split when the number of records reaches this number.
+ _maxNumberOfRecords = configuration.getInt("max.num.records", 0);
+ LOGGER.info("Maximum number of records per file: {}", _maxNumberOfRecords);
+ _filePrefix = RandomStringUtils.randomAlphanumeric(4);
+ }
+
+ @Override
+ public void reduce(final T inputRecord, final Iterable<AvroValue<GenericRecord>> values, final Context context)
+ throws IOException, InterruptedException {
+ for (final AvroValue<GenericRecord> value : values) {
+ String fileName = generateFileName();
+ _multipleOutputs.write(new AvroKey<>(value.datum()), NullWritable.get(), fileName);
+ }
+ }
+
+ @Override
+ public void cleanup(Context context)
+ throws IOException, InterruptedException {
+ LOGGER.info("Clean up reducer.");
+ if (_multipleOutputs != null) {
+ _multipleOutputs.close();
+ _multipleOutputs = null;
+ }
+ LOGGER.info("Finished cleaning up reducer.");
+ }
+
+ private String generateFileName() {
+ if (_maxNumberOfRecords == 0) {
+ return _filePrefix;
+ } else {
+ return _filePrefix + (_counter.getAndIncrement() / _maxNumberOfRecords);
+ }
+ }
+}
+
diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/JobPreparationHelper.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/JobPreparationHelper.java
new file mode 100644
index 0000000..bd726d2
--- /dev/null
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/JobPreparationHelper.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.hadoop.utils;
+
+import java.io.IOException;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.mapreduce.Job;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class JobPreparationHelper {
+ private static final Logger _logger = LoggerFactory.getLogger(JobPreparationHelper.class);
+
+ public static void addDepsJarToDistributedCacheHelper(FileSystem fileSystem, Job job, Path depsJarDir)
+ throws IOException {
+ FileStatus[] fileStatuses = fileSystem.listStatus(depsJarDir);
+ for (FileStatus fileStatus : fileStatuses) {
+ if (fileStatus.isDirectory()) {
+ addDepsJarToDistributedCacheHelper(fileSystem, job, fileStatus.getPath());
+ } else {
+ Path depJarPath = fileStatus.getPath();
+ if (depJarPath.getName().endsWith(".jar")) {
+ _logger.info("Adding deps jar: {} to distributed cache", depJarPath);
+ job.addCacheArchive(depJarPath.toUri());
+ }
+ }
+ }
+ }
+
+ public static void setDirPermission(FileSystem fileSystem, Path dirPath, String defaultPermissionsMask)
+ throws IOException {
+ if (defaultPermissionsMask != null) {
+ FsPermission permission = FsPermission.getDirDefault().applyUMask(new FsPermission(defaultPermissionsMask));
+ _logger.info("Setting permission: {} to directory: {}", permission, dirPath);
+ fileSystem.setPermission(dirPath, permission);
+ }
+ }
+}
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
index 21c2140..914adda 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
@@ -24,13 +24,17 @@ import java.net.URL;
import java.sql.Connection;
import java.sql.DriverManager;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.Executor;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import kafka.server.KafkaServerStartable;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.client.ConnectionFactory;
+import org.apache.pinot.common.config.ColumnPartitionConfig;
+import org.apache.pinot.common.config.SegmentPartitionConfig;
import org.apache.pinot.common.config.TableTaskConfig;
import org.apache.pinot.common.config.TagNameUtils;
import org.apache.pinot.common.utils.TarGzCompressionUtils;
@@ -67,6 +71,7 @@ public abstract class BaseClusterIntegrationTest extends ClusterTest {
protected final File _tempDir = new File(FileUtils.getTempDirectory(), getClass().getSimpleName());
protected final File _avroDir = new File(_tempDir, "avroDir");
+ protected final File _preprocessingDir = new File(_tempDir, "preprocessingDir");
protected final File _segmentDir = new File(_tempDir, "segmentDir");
protected final File _tarDir = new File(_tempDir, "tarDir");
protected List<KafkaServerStartable> _kafkaStarters;
@@ -188,6 +193,16 @@ public abstract class BaseClusterIntegrationTest extends ClusterTest {
return TagNameUtils.DEFAULT_TENANT_NAME;
}
+ protected SegmentPartitionConfig getSegmentPartitionConfig() {
+
+ ColumnPartitionConfig columnPartitionConfig = new ColumnPartitionConfig("murmur", 2);
+ Map<String, ColumnPartitionConfig> columnPartitionConfigMap = new HashMap<>();
+ columnPartitionConfigMap.put("AirlineID", columnPartitionConfig);
+
+ SegmentPartitionConfig segmentPartitionConfig = new SegmentPartitionConfig(columnPartitionConfigMap);
+ return segmentPartitionConfig;
+ }
+
/**
* Get the Pinot connection.
*
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
index 0457159..d8fb9b9 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
@@ -46,6 +46,7 @@ import org.apache.commons.io.FileUtils;
import org.apache.http.HttpStatus;
import org.apache.pinot.broker.broker.helix.HelixBrokerStarter;
import org.apache.pinot.common.config.IndexingConfig;
+import org.apache.pinot.common.config.SegmentPartitionConfig;
import org.apache.pinot.common.config.TableConfig;
import org.apache.pinot.common.config.TableNameBuilder;
import org.apache.pinot.common.config.TableTaskConfig;
@@ -307,16 +308,17 @@ public abstract class ClusterTest extends ControllerTest {
protected void addOfflineTable(String tableName, SegmentVersion segmentVersion)
throws Exception {
- addOfflineTable(tableName, null, null, null, null, null, segmentVersion, null, null, null);
+ addOfflineTable(tableName, null, null, null, null, null, segmentVersion, null, null, null, null, null);
}
protected void addOfflineTable(String tableName, String timeColumnName, String timeType, String brokerTenant,
String serverTenant, String loadMode, SegmentVersion segmentVersion, List<String> invertedIndexColumns,
- List<String> bloomFilterColumns, TableTaskConfig taskConfig)
+ List<String> bloomFilterColumns, TableTaskConfig taskConfig, SegmentPartitionConfig segmentPartitionConfig,
+ String sortedColumn)
throws Exception {
TableConfig tableConfig =
getOfflineTableConfig(tableName, timeColumnName, timeType, brokerTenant, serverTenant, loadMode, segmentVersion,
- invertedIndexColumns, bloomFilterColumns, taskConfig);
+ invertedIndexColumns, bloomFilterColumns, taskConfig, segmentPartitionConfig, sortedColumn);
if (!isUsingNewConfigFormat()) {
sendPostRequest(_controllerRequestURLBuilder.forTableCreate(), tableConfig.toJsonConfigString());
@@ -327,11 +329,12 @@ public abstract class ClusterTest extends ControllerTest {
protected void updateOfflineTable(String tableName, String timeColumnName, String timeType, String brokerTenant,
String serverTenant, String loadMode, SegmentVersion segmentVersion, List<String> invertedIndexColumns,
- List<String> bloomFilterColumns, TableTaskConfig taskConfig)
+ List<String> bloomFilterColumns, TableTaskConfig taskConfig, SegmentPartitionConfig segmentPartitionConfig,
+ String sortedColumn)
throws Exception {
TableConfig tableConfig =
getOfflineTableConfig(tableName, timeColumnName, timeType, brokerTenant, serverTenant, loadMode, segmentVersion,
- invertedIndexColumns, bloomFilterColumns, taskConfig);
+ invertedIndexColumns, bloomFilterColumns, taskConfig, segmentPartitionConfig, sortedColumn);
if (!isUsingNewConfigFormat()) {
sendPutRequest(_controllerRequestURLBuilder.forUpdateTableConfig(tableName), tableConfig.toJsonConfigString());
@@ -342,12 +345,14 @@ public abstract class ClusterTest extends ControllerTest {
private static TableConfig getOfflineTableConfig(String tableName, String timeColumnName, String timeType,
String brokerTenant, String serverTenant, String loadMode, SegmentVersion segmentVersion,
- List<String> invertedIndexColumns, List<String> bloomFilterColumns, TableTaskConfig taskConfig) {
+ List<String> invertedIndexColumns, List<String> bloomFilterColumns, TableTaskConfig taskConfig,
+ SegmentPartitionConfig segmentPartitionConfig, String sortedColumn) {
return new TableConfig.Builder(Helix.TableType.OFFLINE).setTableName(tableName).setTimeColumnName(timeColumnName)
.setTimeType(timeType).setNumReplicas(3).setBrokerTenant(brokerTenant).setServerTenant(serverTenant)
.setLoadMode(loadMode).setSegmentVersion(segmentVersion.toString())
.setInvertedIndexColumns(invertedIndexColumns).setBloomFilterColumns(bloomFilterColumns)
- .setTaskConfig(taskConfig).build();
+ .setTaskConfig(taskConfig).setSegmentPartitionConfig(segmentPartitionConfig).setSortedColumn(sortedColumn)
+ .build();
}
protected void dropOfflineTable(String tableName)
@@ -481,10 +486,10 @@ public abstract class ClusterTest extends ControllerTest {
String kafkaTopic, int realtimeSegmentFlushSize, File avroFile, String timeColumnName, String timeType,
String schemaName, String brokerTenant, String serverTenant, String loadMode, String sortedColumn,
List<String> invertedIndexColumns, List<String> bloomFilterColumns, List<String> noDictionaryColumns,
- TableTaskConfig taskConfig, String streamConsumerFactoryName)
+ TableTaskConfig taskConfig, String streamConsumerFactoryName, SegmentPartitionConfig segmentPartitionConfig)
throws Exception {
addOfflineTable(tableName, timeColumnName, timeType, brokerTenant, serverTenant, loadMode, SegmentVersion.v1,
- invertedIndexColumns, bloomFilterColumns, taskConfig);
+ invertedIndexColumns, bloomFilterColumns, taskConfig, segmentPartitionConfig, sortedColumn);
addRealtimeTable(tableName, useLlc, kafkaBrokerList, kafkaZkUrl, kafkaTopic, realtimeSegmentFlushSize, avroFile,
timeColumnName, timeType, schemaName, brokerTenant, serverTenant, loadMode, sortedColumn, invertedIndexColumns,
bloomFilterColumns, noDictionaryColumns, taskConfig, streamConsumerFactoryName);
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTests.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTests.java
index 8c9ffe5..fc14789 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTests.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTests.java
@@ -161,7 +161,7 @@ public class ControllerPeriodicTasksIntegrationTests extends BaseClusterIntegrat
*/
private void setupOfflineTable(String table) throws Exception {
_realtimeTableConfig = null;
- addOfflineTable(table, null, null, TENANT_NAME, TENANT_NAME, null, SegmentVersion.v1, null, null, null);
+ addOfflineTable(table, null, null, TENANT_NAME, TENANT_NAME, null, SegmentVersion.v1, null, null, null, null, null);
completeTableConfiguration();
}
@@ -184,7 +184,7 @@ public class ControllerPeriodicTasksIntegrationTests extends BaseClusterIntegrat
Assert.assertNotNull(outgoingTimeUnit);
String timeType = outgoingTimeUnit.toString();
- addOfflineTable(tableName, timeColumnName, timeType, TENANT_NAME, TENANT_NAME, null, SegmentVersion.v1, null, null, null);
+ addOfflineTable(tableName, timeColumnName, timeType, TENANT_NAME, TENANT_NAME, null, SegmentVersion.v1, null, null, null, null, null);
completeTableConfiguration();
ExecutorService executor = Executors.newCachedThreadPool();
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HadoopSegmentBuildPushOfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HadoopSegmentBuildPushOfflineClusterIntegrationTest.java
index 36e083c..5bb9b65 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HadoopSegmentBuildPushOfflineClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HadoopSegmentBuildPushOfflineClusterIntegrationTest.java
@@ -19,26 +19,48 @@
package org.apache.pinot.integration.tests;
import java.io.File;
+import java.io.IOException;
+import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
+import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
+import org.apache.pinot.common.config.ColumnPartitionConfig;
+import org.apache.pinot.common.config.SegmentPartitionConfig;
import org.apache.pinot.common.data.Schema;
+import org.apache.pinot.core.data.partition.PartitionFunction;
+import org.apache.pinot.core.data.partition.PartitionFunctionFactory;
import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
import org.apache.pinot.hadoop.job.JobConfigConstants;
+import org.apache.pinot.hadoop.job.SegmentPreprocessingJob;
import org.apache.pinot.hadoop.job.SegmentCreationJob;
import org.apache.pinot.hadoop.job.SegmentTarPushJob;
import org.apache.pinot.util.TestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
+import static org.apache.pinot.hadoop.job.JobConfigConstants.*;
+
public class HadoopSegmentBuildPushOfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet {
+ private static final Logger LOGGER = LoggerFactory.getLogger(HadoopSegmentBuildPushOfflineClusterIntegrationTest.class);
private static final int NUM_BROKERS = 1;
private static final int NUM_SERVERS = 1;
@@ -66,7 +88,7 @@ public class HadoopSegmentBuildPushOfflineClusterIntegrationTest extends BaseClu
// Start the MR Yarn cluster
final Configuration conf = new Configuration();
- _mrCluster = new MiniMRYarnCluster(getClass().getName(), 1);
+ _mrCluster = new MiniMRYarnCluster(getClass().getName(), 2);
_mrCluster.init(conf);
_mrCluster.start();
@@ -93,7 +115,7 @@ public class HadoopSegmentBuildPushOfflineClusterIntegrationTest extends BaseClu
// Create the table
addOfflineTable(getTableName(), _schema.getTimeColumnName(), _schema.getOutgoingTimeUnit().toString(), null, null,
- getLoadMode(), SegmentVersion.v3, getInvertedIndexColumns(), getBloomFilterIndexColumns(), getTaskConfig());
+ getLoadMode(), SegmentVersion.v3, getInvertedIndexColumns(), getBloomFilterIndexColumns(), getTaskConfig(), getSegmentPartitionConfig(), getSortedColumn());
// Generate and push Pinot segments from Hadoop
generateAndPushSegmentsFromHadoop();
@@ -154,6 +176,25 @@ public class HadoopSegmentBuildPushOfflineClusterIntegrationTest extends BaseClu
properties.setProperty(JobConfigConstants.PUSH_TO_HOSTS, getDefaultControllerConfiguration().getControllerHost());
properties.setProperty(JobConfigConstants.PUSH_TO_PORT, getDefaultControllerConfiguration().getControllerPort());
+ Properties preComputeProperties = new Properties();
+ preComputeProperties.putAll(properties);
+ preComputeProperties.setProperty(ENABLE_PARTITIONING, Boolean.TRUE.toString());
+ preComputeProperties.setProperty(ENABLE_SORTING, Boolean.TRUE.toString());
+
+ preComputeProperties.setProperty(JobConfigConstants.PATH_TO_INPUT, _avroDir.getPath());
+ preComputeProperties.setProperty(JobConfigConstants.PREPROCESS_PATH_TO_OUTPUT, _preprocessingDir.getPath());
+ properties.setProperty(JobConfigConstants.PATH_TO_INPUT, _preprocessingDir.getPath());
+
+ // Run segment pre-processing job
+ SegmentPreprocessingJob segmentPreprocessingJob = new SegmentPreprocessingJob(preComputeProperties);
+ Configuration preComputeConfig = _mrCluster.getConfig();
+ segmentPreprocessingJob.setConf(preComputeConfig);
+ segmentPreprocessingJob.run();
+ LOGGER.info("Segment preprocessing job finished.");
+
+ // Verify partitioning and sorting.
+ verifyPreprocessingJob(preComputeConfig);
+
// Run segment creation job
SegmentCreationJob creationJob = new SegmentCreationJob(properties);
Configuration config = _mrCluster.getConfig();
@@ -165,4 +206,46 @@ public class HadoopSegmentBuildPushOfflineClusterIntegrationTest extends BaseClu
pushJob.setConf(_mrCluster.getConfig());
pushJob.run();
}
+
+ private void verifyPreprocessingJob(Configuration preComputeConfig) throws IOException {
+ // Fetch partitioning config and sorting config.
+ SegmentPartitionConfig segmentPartitionConfig = getSegmentPartitionConfig();
+ Map.Entry<String, ColumnPartitionConfig>
+ entry = segmentPartitionConfig.getColumnPartitionMap().entrySet().iterator().next();
+ String partitionColumn = entry.getKey();
+ String partitionFunctionString = entry.getValue().getFunctionName();
+ int numPartitions = entry.getValue().getNumPartitions();
+ PartitionFunction partitionFunction = PartitionFunctionFactory.getPartitionFunction(partitionFunctionString, numPartitions);
+ String sortedColumn = getSortedColumn();
+
+ // Get output files.
+ FileSystem fileSystem = FileSystem.get(preComputeConfig);
+ FileStatus[] fileStatuses = fileSystem.listStatus(new Path(_preprocessingDir.getPath()));
+ Assert.assertEquals(fileStatuses.length, numPartitions, "Number of output file should be the same as the number of partitions.");
+
+ Set<Integer> partitionIdSet = new HashSet<>();
+ Object previousObject;
+ for (FileStatus fileStatus : fileStatuses) {
+ Path avroFile = fileStatus.getPath();
+ DataFileStream<GenericRecord> dataFileStream = new DataFileStream<>(fileSystem.open(avroFile), new GenericDatumReader<>());
+
+ // Reset hash set and previous object
+ partitionIdSet.clear();
+ previousObject = null;
+ while (dataFileStream.hasNext()) {
+ GenericRecord genericRecord = dataFileStream.next();
+ partitionIdSet.add(partitionFunction.getPartition(genericRecord.get(partitionColumn)));
+ Assert.assertEquals(partitionIdSet.size(), 1, "Partition Id should be the same within a file.");
+ org.apache.avro.Schema sortedColumnSchema = genericRecord.getSchema().getField(sortedColumn).schema();
+ Object currentObject = genericRecord.get(sortedColumn);
+ if (previousObject == null) {
+ previousObject = currentObject;
+ continue;
+ }
+ // The values of sorted column should be sorted in ascending order.
+ Assert.assertTrue(GenericData.get().compare(previousObject, currentObject, sortedColumnSchema) <= 0);
+ previousObject = currentObject;
+ }
+ }
+ }
}
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java
index f9ca74a..875969f 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java
@@ -135,7 +135,7 @@ public class HybridClusterIntegrationTest extends BaseClusterIntegrationTestSet
addHybridTable(getTableName(), useLlc(), KafkaStarterUtils.DEFAULT_KAFKA_BROKER, KafkaStarterUtils.DEFAULT_ZK_STR,
getKafkaTopic(), getRealtimeSegmentFlushSize(), avroFile, timeColumnName, timeType, schemaName, TENANT_NAME,
TENANT_NAME, getLoadMode(), getSortedColumn(), getInvertedIndexColumns(), getBloomFilterIndexColumns(),
- getRawIndexColumns(), getTaskConfig(), getStreamConsumerFactoryClassName());
+ getRawIndexColumns(), getTaskConfig(), getStreamConsumerFactoryClassName(), getSegmentPartitionConfig());
completeTableConfiguration();
}
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTestCommandLineRunner.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTestCommandLineRunner.java
index 6e5474f..c1f77d8 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTestCommandLineRunner.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTestCommandLineRunner.java
@@ -295,7 +295,7 @@ public class HybridClusterIntegrationTestCommandLineRunner {
String timeType = outgoingTimeUnit.toString();
addHybridTable(_tableName, _useLlc, KAFKA_BROKER, KAFKA_ZK_STR, getKafkaTopic(), getRealtimeSegmentFlushSize(),
_realtimeAvroFiles.get(0), timeColumnName, timeType, schemaName, TENANT_NAME, TENANT_NAME, "MMAP",
- _sortedColumn, _invertedIndexColumns, null, null, null, getStreamConsumerFactoryClassName());
+ _sortedColumn, _invertedIndexColumns, null, null, null, getStreamConsumerFactoryClassName(), null);
// Upload all segments
uploadSegments(_tarDir);
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index 8d9d144..75a69a8 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -120,7 +120,7 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
// Create the table
addOfflineTable(getTableName(), null, null, null, null, getLoadMode(), SegmentVersion.v1, getInvertedIndexColumns(),
- getBloomFilterIndexColumns(), getTaskConfig());
+ getBloomFilterIndexColumns(), getTaskConfig(), null, null);
completeTableConfiguration();
@@ -225,7 +225,7 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
// Update table config and trigger reload
updateOfflineTable(getTableName(), null, null, null, null, getLoadMode(), SegmentVersion.v1,
- UPDATED_INVERTED_INDEX_COLUMNS, null, getTaskConfig());
+ UPDATED_INVERTED_INDEX_COLUMNS, null, getTaskConfig(), null, null);
updateTableConfiguration();
@@ -252,7 +252,7 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
// Update table config and trigger reload
updateOfflineTable(getTableName(), null, null, null, null, getLoadMode(), SegmentVersion.v1, null,
- UPDATED_BLOOM_FILTER_COLUMNS, getTaskConfig());
+ UPDATED_BLOOM_FILTER_COLUMNS, getTaskConfig(), null, null);
updateTableConfiguration();
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
index 438655a..ad50f05 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
@@ -86,9 +86,9 @@ public class SimpleMinionClusterIntegrationTest extends ClusterTest {
Map<String, Map<String, String>> taskTypeConfigsMap = new HashMap<>();
taskTypeConfigsMap.put(TestTaskGenerator.TASK_TYPE, Collections.emptyMap());
taskConfig.setTaskTypeConfigsMap(taskTypeConfigsMap);
- addOfflineTable(TABLE_NAME_1, null, null, null, null, null, SegmentVersion.v1, null, null, taskConfig);
- addOfflineTable(TABLE_NAME_2, null, null, null, null, null, SegmentVersion.v1, null, null, taskConfig);
- addOfflineTable(TABLE_NAME_3, null, null, null, null, null, SegmentVersion.v1, null, null, null);
+ addOfflineTable(TABLE_NAME_1, null, null, null, null, null, SegmentVersion.v1, null, null, taskConfig, null, null);
+ addOfflineTable(TABLE_NAME_2, null, null, null, null, null, SegmentVersion.v1, null, null, taskConfig, null, null);
+ addOfflineTable(TABLE_NAME_3, null, null, null, null, null, SegmentVersion.v1, null, null, null, null, null);
_helixTaskResourceManager = _controllerStarter.getHelixTaskResourceManager();
_taskManager = _controllerStarter.getTaskManager();
diff --git a/pom.xml b/pom.xml
index 95bd02c..4e27925 100644
--- a/pom.xml
+++ b/pom.xml
@@ -668,6 +668,12 @@
</exclusions>
</dependency>
<dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro-mapred</artifactId>
+ <version>1.7.4</version>
+ <classifier>hadoop2</classifier>
+ </dependency>
+ <dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
<version>1.9.13</version>
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org