You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2021/11/10 19:37:20 UTC
[pinot] 01/01: Support default null value in data preprocessing job
This is an automated email from the ASF dual-hosted git repository.
jlli pushed a commit to branch support-default-null-value-in-preprocessing
in repository https://gitbox.apache.org/repos/asf/pinot.git
commit 87f83bf548ef51b93bf96fba9a3aa6fa25741b67
Author: Jack Li(Analytics Engineering) <jl...@jlli-mn1.linkedin.biz>
AuthorDate: Wed Nov 10 11:36:45 2021 -0800
Support default null value in data preprocessing job
---
.../hadoop/job/HadoopSegmentPreprocessingJob.java | 9 ++++-
.../pinot/hadoop/job/InternalConfigConstants.java | 2 ++
.../job/mappers/AvroDataPreprocessingMapper.java | 19 +++++-----
.../job/mappers/OrcDataPreprocessingMapper.java | 11 +++---
.../AvroDataPreprocessingPartitioner.java | 40 ++++++++++++---------
.../OrcDataPreprocessingPartitioner.java | 42 ++++++++++++++--------
.../job/preprocess/DataPreprocessingHelper.java | 9 ++++-
.../preprocess/DataPreprocessingHelperTest.java | 20 +++++++++--
8 files changed, 103 insertions(+), 49 deletions(-)
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/HadoopSegmentPreprocessingJob.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/HadoopSegmentPreprocessingJob.java
index 0a6e10e..85f5c6c 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/HadoopSegmentPreprocessingJob.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/HadoopSegmentPreprocessingJob.java
@@ -63,9 +63,11 @@ public class HadoopSegmentPreprocessingJob extends SegmentPreprocessingJob {
private String _partitionColumn;
private int _numPartitions;
private String _partitionFunction;
+ private String _partitionColumnDefaultNullValue;
private String _sortingColumn;
private FieldSpec.DataType _sortingColumnType;
+ private String _sortingColumnDefaultNullValue;
private int _numOutputFiles;
private int _maxNumRecordsPerFile;
@@ -101,7 +103,7 @@ public class HadoopSegmentPreprocessingJob extends SegmentPreprocessingJob {
DataPreprocessingHelperFactory.generateDataPreprocessingHelper(_inputSegmentDir, _preprocessedOutputDir);
dataPreprocessingHelper
.registerConfigs(_tableConfig, _pinotTableSchema, _partitionColumn, _numPartitions, _partitionFunction,
- _sortingColumn, _sortingColumnType,
+ _partitionColumnDefaultNullValue, _sortingColumn, _sortingColumnType, _sortingColumnDefaultNullValue,
_numOutputFiles, _maxNumRecordsPerFile);
Job job = dataPreprocessingHelper.setUpJob();
@@ -156,6 +158,8 @@ public class HadoopSegmentPreprocessingJob extends SegmentPreprocessingJob {
_partitionColumn = columnPartitionMap.keySet().iterator().next();
_numPartitions = segmentPartitionConfig.getNumPartitions(_partitionColumn);
_partitionFunction = segmentPartitionConfig.getFunctionName(_partitionColumn);
+ _partitionColumnDefaultNullValue =
+ _pinotTableSchema.getFieldSpecFor(_partitionColumn).getDefaultNullValueString();
}
} else {
LOGGER.info("Segment partition config is null for table: {}", _tableConfig.getTableName());
@@ -201,6 +205,9 @@ public class HadoopSegmentPreprocessingJob extends SegmentPreprocessingJob {
LOGGER.info("Sorting the data with column: {} of type: {}", _sortingColumn, _sortingColumnType);
}
}
+ if (_sortingColumn != null) {
+ _sortingColumnDefaultNullValue = _pinotTableSchema.getFieldSpecFor(_sortingColumn).getDefaultNullValueString();
+ }
}
private void fetchResizingConfig() {
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/InternalConfigConstants.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/InternalConfigConstants.java
index ef898e3..b26fc38 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/InternalConfigConstants.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/InternalConfigConstants.java
@@ -41,9 +41,11 @@ public class InternalConfigConstants {
public static final String PARTITION_COLUMN_CONFIG = "partition.column";
public static final String NUM_PARTITIONS_CONFIG = "num.partitions";
public static final String PARTITION_FUNCTION_CONFIG = "partition.function";
+ public static final String PARTITION_COLUMN_DEFAULT_NULL_VALUE = "partition.default.null.value";
public static final String SORTING_COLUMN_CONFIG = "sorting.column";
public static final String SORTING_COLUMN_TYPE = "sorting.type";
+ public static final String SORTING_COLUMN_DEFAULT_NULL_VALUE = "sorting.default.null.value";
public static final String ENABLE_PARTITIONING = "enable.partitioning";
@Deprecated
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/AvroDataPreprocessingMapper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/AvroDataPreprocessingMapper.java
index 8d2a3eb..d9f17ac 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/AvroDataPreprocessingMapper.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/AvroDataPreprocessingMapper.java
@@ -41,6 +41,7 @@ public class AvroDataPreprocessingMapper
private String _sortingColumn = null;
private FieldSpec.DataType _sortingColumnType = null;
+ private String _sortingColumnDefaultNullValue = null;
private AvroRecordExtractor _avroRecordExtractor;
@Override
@@ -51,8 +52,9 @@ public class AvroDataPreprocessingMapper
if (sortingColumnConfig != null) {
_sortingColumn = sortingColumnConfig;
_sortingColumnType = FieldSpec.DataType.valueOf(configuration.get(InternalConfigConstants.SORTING_COLUMN_TYPE));
- LOGGER.info("Initialized AvroDataPreprocessingMapper with sortingColumn: {} of type: {}", _sortingColumn,
- _sortingColumnType);
+ _sortingColumnDefaultNullValue = configuration.get(InternalConfigConstants.SORTING_COLUMN_DEFAULT_NULL_VALUE);
+ LOGGER.info("Initialized AvroDataPreprocessingMapper with sortingColumn: {} of type: {}, default null value: {}",
+ _sortingColumn, _sortingColumnType, _sortingColumnDefaultNullValue);
} else {
LOGGER.info("Initialized AvroDataPreprocessingMapper without sorting column");
}
@@ -64,18 +66,17 @@ public class AvroDataPreprocessingMapper
GenericRecord record = key.datum();
if (_sortingColumn != null) {
Object object = record.get(_sortingColumn);
- Preconditions
- .checkState(object != null, "Failed to find value for sorting column: %s in record: %s", _sortingColumn,
- record);
- Object convertedValue = _avroRecordExtractor.convert(object);
- Preconditions.checkState(convertedValue != null, "Invalid value: %s for sorting column: %s in record: %s", object,
+ Object valueToConvert = object != null ? _avroRecordExtractor.convert(object) : _sortingColumnDefaultNullValue;
+ Preconditions.checkState(valueToConvert != null, "Invalid value: %s for sorting column: %s in record: %s", object,
_sortingColumn, record);
+
WritableComparable outputKey;
try {
- outputKey = DataPreprocessingUtils.convertToWritableComparable(convertedValue, _sortingColumnType);
+ outputKey = DataPreprocessingUtils.convertToWritableComparable(valueToConvert, _sortingColumnType);
} catch (Exception e) {
throw new IllegalStateException(
- String.format("Caught exception while processing sorting column: %s in record: %s", _sortingColumn, record),
+ String
+ .format("Caught exception while processing sorting column: %s in record: %s", _sortingColumn, record),
e);
}
context.write(outputKey, new AvroValue<>(record));
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/OrcDataPreprocessingMapper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/OrcDataPreprocessingMapper.java
index d7d0694..8ad9d84 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/OrcDataPreprocessingMapper.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/OrcDataPreprocessingMapper.java
@@ -41,6 +41,7 @@ public class OrcDataPreprocessingMapper extends Mapper<NullWritable, OrcStruct,
private final OrcValue _valueWrapper = new OrcValue();
private String _sortingColumn = null;
private FieldSpec.DataType _sortingColumnType = null;
+ private String _sortingColumnDefaultNullValue = null;
private int _sortingColumnId = -1;
@Override
@@ -50,8 +51,9 @@ public class OrcDataPreprocessingMapper extends Mapper<NullWritable, OrcStruct,
if (sortingColumnConfig != null) {
_sortingColumn = sortingColumnConfig;
_sortingColumnType = FieldSpec.DataType.valueOf(configuration.get(InternalConfigConstants.SORTING_COLUMN_TYPE));
- LOGGER.info("Initialized OrcDataPreprocessingMapper with sortingColumn: {} of type: {}", _sortingColumn,
- _sortingColumnType);
+ _sortingColumnDefaultNullValue = configuration.get(InternalConfigConstants.SORTING_COLUMN_DEFAULT_NULL_VALUE);
+ LOGGER.info("Initialized OrcDataPreprocessingMapper with sortingColumn: {} of type: {}, default null value: {}",
+ _sortingColumn, _sortingColumnType, _sortingColumnDefaultNullValue);
} else {
LOGGER.info("Initialized OrcDataPreprocessingMapper without sorting column");
}
@@ -72,8 +74,9 @@ public class OrcDataPreprocessingMapper extends Mapper<NullWritable, OrcStruct,
WritableComparable sortingColumnValue = value.getFieldValue(_sortingColumnId);
WritableComparable outputKey;
try {
- outputKey = DataPreprocessingUtils
- .convertToWritableComparable(OrcUtils.convert(sortingColumnValue), _sortingColumnType);
+ Object valueToConvert =
+ sortingColumnValue != null ? OrcUtils.convert(sortingColumnValue) : _sortingColumnDefaultNullValue;
+ outputKey = DataPreprocessingUtils.convertToWritableComparable(valueToConvert, _sortingColumnType);
} catch (Exception e) {
throw new IllegalStateException(String
.format("Caught exception while processing sorting column: %s, id: %d in ORC struct: %s", _sortingColumn,
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/AvroDataPreprocessingPartitioner.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/AvroDataPreprocessingPartitioner.java
index f0c05db..22c4054 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/AvroDataPreprocessingPartitioner.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/AvroDataPreprocessingPartitioner.java
@@ -19,6 +19,7 @@
package org.apache.pinot.hadoop.job.partitioners;
import com.google.common.base.Preconditions;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.mapred.AvroValue;
import org.apache.hadoop.conf.Configurable;
@@ -38,22 +39,26 @@ public class AvroDataPreprocessingPartitioner extends Partitioner<WritableCompar
private Configuration _conf;
private String _partitionColumn;
+ private int _numPartitions;
private PartitionFunction _partitionFunction;
+ private String _partitionColumnDefaultNullValue;
private AvroRecordExtractor _avroRecordExtractor;
+ private final AtomicInteger _counter = new AtomicInteger(0);
+
@Override
public void setConf(Configuration conf) {
_conf = conf;
_avroRecordExtractor = new AvroRecordExtractor();
_partitionColumn = conf.get(InternalConfigConstants.PARTITION_COLUMN_CONFIG);
String partitionFunctionName = conf.get(InternalConfigConstants.PARTITION_FUNCTION_CONFIG);
- int numPartitions = Integer.parseInt(conf.get(InternalConfigConstants.NUM_PARTITIONS_CONFIG));
- _partitionFunction = PartitionFunctionFactory.getPartitionFunction(partitionFunctionName, numPartitions);
+ _numPartitions = Integer.parseInt(conf.get(InternalConfigConstants.NUM_PARTITIONS_CONFIG));
+ _partitionFunction = PartitionFunctionFactory.getPartitionFunction(partitionFunctionName, _numPartitions);
+ _partitionColumnDefaultNullValue = conf.get(InternalConfigConstants.PARTITION_COLUMN_DEFAULT_NULL_VALUE);
LOGGER.info(
"Initialized AvroDataPreprocessingPartitioner with partitionColumn: {}, partitionFunction: {}, numPartitions:"
- + " {}",
- _partitionColumn,
- partitionFunctionName, numPartitions);
+ + " {}, default null value: {}",
+ _partitionColumn, partitionFunctionName, _numPartitions, _partitionColumnDefaultNullValue);
}
@Override
@@ -65,16 +70,19 @@ public class AvroDataPreprocessingPartitioner extends Partitioner<WritableCompar
public int getPartition(WritableComparable key, AvroValue<GenericRecord> value, int numPartitions) {
GenericRecord record = value.datum();
Object object = record.get(_partitionColumn);
- Preconditions
- .checkState(object != null, "Failed to find value for partition column: %s in record: %s", _partitionColumn,
- record);
- Object convertedValue = _avroRecordExtractor.convert(object);
- Preconditions.checkState(convertedValue != null, "Invalid value: %s for partition column: %s in record: %s", object,
- _partitionColumn, record);
- Preconditions.checkState(convertedValue instanceof Number || convertedValue instanceof String,
- "Value for partition column: %s must be either a Number or a String, found: %s in record: %s", _partitionColumn,
- convertedValue.getClass(), record);
- // NOTE: Always partition with String type value because Broker uses String type value to prune segments
- return _partitionFunction.getPartition(convertedValue.toString());
+ if (object == null || object.toString().equals(_partitionColumnDefaultNullValue)) {
+ return Math.abs(_counter.getAndIncrement()) % _numPartitions;
+ } else {
+ Object convertedValue = _avroRecordExtractor.convert(object);
+ Preconditions
+ .checkState(convertedValue != null, "Invalid value: %s for partition column: %s in record: %s", object,
+ _partitionColumn, record);
+ Preconditions.checkState(convertedValue instanceof Number || convertedValue instanceof String,
+ "Value for partition column: %s must be either a Number or a String, found: %s in record: %s",
+ _partitionColumn,
+ convertedValue.getClass(), record);
+ // NOTE: Always partition with String type value because Broker uses String type value to prune segments
+ return _partitionFunction.getPartition(convertedValue.toString());
+ }
}
}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/OrcDataPreprocessingPartitioner.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/OrcDataPreprocessingPartitioner.java
index 914d145..b70d8f1 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/OrcDataPreprocessingPartitioner.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/OrcDataPreprocessingPartitioner.java
@@ -20,6 +20,7 @@ package org.apache.pinot.hadoop.job.partitioners;
import com.google.common.base.Preconditions;
import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.WritableComparable;
@@ -38,21 +39,25 @@ public class OrcDataPreprocessingPartitioner extends Partitioner<WritableCompara
private Configuration _conf;
private String _partitionColumn;
+ private int _numPartitions;
private PartitionFunction _partitionFunction;
+ private String _partitionColumnDefaultNullValue;
private int _partitionColumnId = -1;
+ private final AtomicInteger _counter = new AtomicInteger(0);
+
@Override
public void setConf(Configuration conf) {
_conf = conf;
_partitionColumn = conf.get(InternalConfigConstants.PARTITION_COLUMN_CONFIG);
String partitionFunctionName = conf.get(InternalConfigConstants.PARTITION_FUNCTION_CONFIG);
- int numPartitions = Integer.parseInt(conf.get(InternalConfigConstants.NUM_PARTITIONS_CONFIG));
- _partitionFunction = PartitionFunctionFactory.getPartitionFunction(partitionFunctionName, numPartitions);
+ _numPartitions = Integer.parseInt(conf.get(InternalConfigConstants.NUM_PARTITIONS_CONFIG));
+ _partitionFunction = PartitionFunctionFactory.getPartitionFunction(partitionFunctionName, _numPartitions);
+ _partitionColumnDefaultNullValue = conf.get(InternalConfigConstants.PARTITION_COLUMN_DEFAULT_NULL_VALUE);
LOGGER.info(
"Initialized OrcDataPreprocessingPartitioner with partitionColumn: {}, partitionFunction: {}, numPartitions: "
- + "{}",
- _partitionColumn,
- partitionFunctionName, numPartitions);
+ + "{}, default null value: {}",
+ _partitionColumn, partitionFunctionName, _numPartitions, _partitionColumnDefaultNullValue);
}
@Override
@@ -71,16 +76,23 @@ public class OrcDataPreprocessingPartitioner extends Partitioner<WritableCompara
LOGGER.info("Field id for partition column: {} is: {}", _partitionColumn, _partitionColumnId);
}
WritableComparable partitionColumnValue = orcStruct.getFieldValue(_partitionColumnId);
- Object convertedValue;
- try {
- convertedValue = OrcUtils.convert(partitionColumnValue);
- } catch (Exception e) {
- throw new IllegalStateException(
- String.format("Caught exception while processing partition column: %s, id: %d in ORC struct: %s",
- _partitionColumn, _partitionColumnId, orcStruct),
- e);
+ String convertedValueString = null;
+ if (partitionColumnValue != null) {
+ try {
+ Object convertedValue = OrcUtils.convert(partitionColumnValue);
+ convertedValueString = convertedValue.toString();
+ } catch (Exception e) {
+ throw new IllegalStateException(String
+ .format("Caught exception while processing partition column: %s, id: %d in ORC struct: %s",
+ _partitionColumn, _partitionColumnId, orcStruct), e);
+ }
+ }
+
+ if (convertedValueString == null || convertedValueString.equals(_partitionColumnDefaultNullValue)) {
+ return Math.abs(_counter.getAndIncrement()) % _numPartitions;
+ } else {
+ // NOTE: Always partition with String type value because Broker uses String type value to prune segments
+ return _partitionFunction.getPartition(convertedValueString);
}
- // NOTE: Always partition with String type value because Broker uses String type value to prune segments
- return _partitionFunction.getPartition(convertedValue.toString());
}
}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelper.java
index 287c51b..ae50168 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelper.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelper.java
@@ -58,9 +58,11 @@ public abstract class DataPreprocessingHelper {
String _partitionColumn;
int _numPartitions;
String _partitionFunction;
+ String _partitionColumnDefaultNullValue;
String _sortingColumn;
private FieldSpec.DataType _sortingColumnType;
+ String _sortingColumnDefaultNullValue;
private int _numOutputFiles;
private int _maxNumRecordsPerFile;
@@ -79,16 +81,19 @@ public abstract class DataPreprocessingHelper {
}
public void registerConfigs(TableConfig tableConfig, Schema tableSchema, String partitionColumn, int numPartitions,
- String partitionFunction, String sortingColumn, FieldSpec.DataType sortingColumnType, int numOutputFiles,
+ String partitionFunction, String partitionColumnDefaultNullValue, String sortingColumn,
+ FieldSpec.DataType sortingColumnType, String sortingColumnDefaultNullValue, int numOutputFiles,
int maxNumRecordsPerFile) {
_tableConfig = tableConfig;
_pinotTableSchema = tableSchema;
_partitionColumn = partitionColumn;
_numPartitions = numPartitions;
_partitionFunction = partitionFunction;
+ _partitionColumnDefaultNullValue = partitionColumnDefaultNullValue;
_sortingColumn = sortingColumn;
_sortingColumnType = sortingColumnType;
+ _sortingColumnDefaultNullValue = sortingColumnDefaultNullValue;
_numOutputFiles = numOutputFiles;
_maxNumRecordsPerFile = maxNumRecordsPerFile;
@@ -113,6 +118,7 @@ public abstract class DataPreprocessingHelper {
LOGGER.info("Adding sorting column: {} to job config", _sortingColumn);
jobConf.set(InternalConfigConstants.SORTING_COLUMN_CONFIG, _sortingColumn);
jobConf.set(InternalConfigConstants.SORTING_COLUMN_TYPE, _sortingColumnType.name());
+ jobConf.set(InternalConfigConstants.SORTING_COLUMN_DEFAULT_NULL_VALUE, _sortingColumnDefaultNullValue);
switch (_sortingColumnType) {
case INT:
@@ -148,6 +154,7 @@ public abstract class DataPreprocessingHelper {
if (_partitionFunction != null) {
jobConf.set(InternalConfigConstants.PARTITION_FUNCTION_CONFIG, _partitionFunction);
}
+ jobConf.set(InternalConfigConstants.PARTITION_COLUMN_DEFAULT_NULL_VALUE, _partitionColumnDefaultNullValue);
jobConf.setInt(InternalConfigConstants.NUM_PARTITIONS_CONFIG, numReduceTasks);
job.setPartitionerClass(getPartitioner());
} else {
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelperTest.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelperTest.java
index dd132cf..2f97f72 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelperTest.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelperTest.java
@@ -22,6 +22,7 @@ import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.pinot.hadoop.job.InternalConfigConstants;
@@ -35,6 +36,7 @@ import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.testng.annotations.Test;
+import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
@@ -66,11 +68,23 @@ public class DataPreprocessingHelperTest {
Schema schema = new Schema.SchemaBuilder()
.addDateTime("time_day", FieldSpec.DataType.LONG, "1:MILLISECONDS:EPOCH", "1:MILLISECONDS").build();
- dataPreprocessingHelper.registerConfigs(tableConfig, schema, "column2", 1, "Murmur", "column4",
- FieldSpec.DataType.INT, 0, 0);
+ dataPreprocessingHelper.registerConfigs(tableConfig, schema, "column2", 1, "Murmur", "0", "column4",
+ FieldSpec.DataType.INT, "0", 0, 0);
Job job = dataPreprocessingHelper.setUpJob();
+ Configuration conf = job.getConfiguration();
assertNotNull(job);
- assertNull(job.getConfiguration().get(InternalConfigConstants.SEGMENT_TIME_SDF_PATTERN));
+ assertNull(conf.get(InternalConfigConstants.SEGMENT_TIME_SDF_PATTERN));
+
+ // Validate partitioning configs.
+ assertEquals(conf.get(InternalConfigConstants.PARTITION_COLUMN_CONFIG), "column2");
+ assertEquals(conf.get(InternalConfigConstants.PARTITION_FUNCTION_CONFIG), "Murmur");
+ assertEquals(conf.get(InternalConfigConstants.NUM_PARTITIONS_CONFIG), "1");
+ assertEquals(conf.get(InternalConfigConstants.PARTITION_COLUMN_DEFAULT_NULL_VALUE), "0");
+
+ // Validate sorting configs.
+ assertEquals(conf.get(InternalConfigConstants.SORTING_COLUMN_CONFIG), "column4");
+ assertEquals(conf.get(InternalConfigConstants.SORTING_COLUMN_TYPE), "INT");
+ assertEquals(conf.get(InternalConfigConstants.SORTING_COLUMN_DEFAULT_NULL_VALUE), "0");
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org