You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2021/11/10 19:37:20 UTC

[pinot] 01/01: Support default null value in data preprocessing job

This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch support-default-null-value-in-preprocessing
in repository https://gitbox.apache.org/repos/asf/pinot.git

commit 87f83bf548ef51b93bf96fba9a3aa6fa25741b67
Author: Jack Li(Analytics Engineering) <jl...@jlli-mn1.linkedin.biz>
AuthorDate: Wed Nov 10 11:36:45 2021 -0800

    Support default null value in data preprocessing job
---
 .../hadoop/job/HadoopSegmentPreprocessingJob.java  |  9 ++++-
 .../pinot/hadoop/job/InternalConfigConstants.java  |  2 ++
 .../job/mappers/AvroDataPreprocessingMapper.java   | 19 +++++-----
 .../job/mappers/OrcDataPreprocessingMapper.java    | 11 +++---
 .../AvroDataPreprocessingPartitioner.java          | 40 ++++++++++++---------
 .../OrcDataPreprocessingPartitioner.java           | 42 ++++++++++++++--------
 .../job/preprocess/DataPreprocessingHelper.java    |  9 ++++-
 .../preprocess/DataPreprocessingHelperTest.java    | 20 +++++++++--
 8 files changed, 103 insertions(+), 49 deletions(-)

diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/HadoopSegmentPreprocessingJob.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/HadoopSegmentPreprocessingJob.java
index 0a6e10e..85f5c6c 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/HadoopSegmentPreprocessingJob.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/HadoopSegmentPreprocessingJob.java
@@ -63,9 +63,11 @@ public class HadoopSegmentPreprocessingJob extends SegmentPreprocessingJob {
   private String _partitionColumn;
   private int _numPartitions;
   private String _partitionFunction;
+  private String _partitionColumnDefaultNullValue;
 
   private String _sortingColumn;
   private FieldSpec.DataType _sortingColumnType;
+  private String _sortingColumnDefaultNullValue;
 
   private int _numOutputFiles;
   private int _maxNumRecordsPerFile;
@@ -101,7 +103,7 @@ public class HadoopSegmentPreprocessingJob extends SegmentPreprocessingJob {
         DataPreprocessingHelperFactory.generateDataPreprocessingHelper(_inputSegmentDir, _preprocessedOutputDir);
     dataPreprocessingHelper
         .registerConfigs(_tableConfig, _pinotTableSchema, _partitionColumn, _numPartitions, _partitionFunction,
-            _sortingColumn, _sortingColumnType,
+            _partitionColumnDefaultNullValue, _sortingColumn, _sortingColumnType, _sortingColumnDefaultNullValue,
             _numOutputFiles, _maxNumRecordsPerFile);
 
     Job job = dataPreprocessingHelper.setUpJob();
@@ -156,6 +158,8 @@ public class HadoopSegmentPreprocessingJob extends SegmentPreprocessingJob {
         _partitionColumn = columnPartitionMap.keySet().iterator().next();
         _numPartitions = segmentPartitionConfig.getNumPartitions(_partitionColumn);
         _partitionFunction = segmentPartitionConfig.getFunctionName(_partitionColumn);
+        _partitionColumnDefaultNullValue =
+            _pinotTableSchema.getFieldSpecFor(_partitionColumn).getDefaultNullValueString();
       }
     } else {
       LOGGER.info("Segment partition config is null for table: {}", _tableConfig.getTableName());
@@ -201,6 +205,9 @@ public class HadoopSegmentPreprocessingJob extends SegmentPreprocessingJob {
         LOGGER.info("Sorting the data with column: {} of type: {}", _sortingColumn, _sortingColumnType);
       }
     }
+    if (_sortingColumn != null) {
+      _sortingColumnDefaultNullValue = _pinotTableSchema.getFieldSpecFor(_sortingColumn).getDefaultNullValueString();
+    }
   }
 
   private void fetchResizingConfig() {
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/InternalConfigConstants.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/InternalConfigConstants.java
index ef898e3..b26fc38 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/InternalConfigConstants.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/InternalConfigConstants.java
@@ -41,9 +41,11 @@ public class InternalConfigConstants {
   public static final String PARTITION_COLUMN_CONFIG = "partition.column";
   public static final String NUM_PARTITIONS_CONFIG = "num.partitions";
   public static final String PARTITION_FUNCTION_CONFIG = "partition.function";
+  public static final String PARTITION_COLUMN_DEFAULT_NULL_VALUE = "partition.default.null.value";
 
   public static final String SORTING_COLUMN_CONFIG = "sorting.column";
   public static final String SORTING_COLUMN_TYPE = "sorting.type";
+  public static final String SORTING_COLUMN_DEFAULT_NULL_VALUE = "sorting.default.null.value";
   public static final String ENABLE_PARTITIONING = "enable.partitioning";
 
   @Deprecated
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/AvroDataPreprocessingMapper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/AvroDataPreprocessingMapper.java
index 8d2a3eb..d9f17ac 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/AvroDataPreprocessingMapper.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/AvroDataPreprocessingMapper.java
@@ -41,6 +41,7 @@ public class AvroDataPreprocessingMapper
 
   private String _sortingColumn = null;
   private FieldSpec.DataType _sortingColumnType = null;
+  private String _sortingColumnDefaultNullValue = null;
   private AvroRecordExtractor _avroRecordExtractor;
 
   @Override
@@ -51,8 +52,9 @@ public class AvroDataPreprocessingMapper
     if (sortingColumnConfig != null) {
       _sortingColumn = sortingColumnConfig;
       _sortingColumnType = FieldSpec.DataType.valueOf(configuration.get(InternalConfigConstants.SORTING_COLUMN_TYPE));
-      LOGGER.info("Initialized AvroDataPreprocessingMapper with sortingColumn: {} of type: {}", _sortingColumn,
-          _sortingColumnType);
+      _sortingColumnDefaultNullValue = configuration.get(InternalConfigConstants.SORTING_COLUMN_DEFAULT_NULL_VALUE);
+      LOGGER.info("Initialized AvroDataPreprocessingMapper with sortingColumn: {} of type: {}, default null value: {}",
+          _sortingColumn, _sortingColumnType, _sortingColumnDefaultNullValue);
     } else {
       LOGGER.info("Initialized AvroDataPreprocessingMapper without sorting column");
     }
@@ -64,18 +66,17 @@ public class AvroDataPreprocessingMapper
     GenericRecord record = key.datum();
     if (_sortingColumn != null) {
       Object object = record.get(_sortingColumn);
-      Preconditions
-          .checkState(object != null, "Failed to find value for sorting column: %s in record: %s", _sortingColumn,
-              record);
-      Object convertedValue = _avroRecordExtractor.convert(object);
-      Preconditions.checkState(convertedValue != null, "Invalid value: %s for sorting column: %s in record: %s", object,
+      Object valueToConvert = object != null ? _avroRecordExtractor.convert(object) : _sortingColumnDefaultNullValue;
+      Preconditions.checkState(valueToConvert != null, "Invalid value: %s for sorting column: %s in record: %s", object,
           _sortingColumn, record);
+
       WritableComparable outputKey;
       try {
-        outputKey = DataPreprocessingUtils.convertToWritableComparable(convertedValue, _sortingColumnType);
+        outputKey = DataPreprocessingUtils.convertToWritableComparable(valueToConvert, _sortingColumnType);
       } catch (Exception e) {
         throw new IllegalStateException(
-            String.format("Caught exception while processing sorting column: %s in record: %s", _sortingColumn, record),
+            String
+                .format("Caught exception while processing sorting column: %s in record: %s", _sortingColumn, record),
             e);
       }
       context.write(outputKey, new AvroValue<>(record));
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/OrcDataPreprocessingMapper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/OrcDataPreprocessingMapper.java
index d7d0694..8ad9d84 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/OrcDataPreprocessingMapper.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/OrcDataPreprocessingMapper.java
@@ -41,6 +41,7 @@ public class OrcDataPreprocessingMapper extends Mapper<NullWritable, OrcStruct,
   private final OrcValue _valueWrapper = new OrcValue();
   private String _sortingColumn = null;
   private FieldSpec.DataType _sortingColumnType = null;
+  private String _sortingColumnDefaultNullValue = null;
   private int _sortingColumnId = -1;
 
   @Override
@@ -50,8 +51,9 @@ public class OrcDataPreprocessingMapper extends Mapper<NullWritable, OrcStruct,
     if (sortingColumnConfig != null) {
       _sortingColumn = sortingColumnConfig;
       _sortingColumnType = FieldSpec.DataType.valueOf(configuration.get(InternalConfigConstants.SORTING_COLUMN_TYPE));
-      LOGGER.info("Initialized OrcDataPreprocessingMapper with sortingColumn: {} of type: {}", _sortingColumn,
-          _sortingColumnType);
+      _sortingColumnDefaultNullValue = configuration.get(InternalConfigConstants.SORTING_COLUMN_DEFAULT_NULL_VALUE);
+      LOGGER.info("Initialized OrcDataPreprocessingMapper with sortingColumn: {} of type: {}, default null value: {}",
+          _sortingColumn, _sortingColumnType, _sortingColumnDefaultNullValue);
     } else {
       LOGGER.info("Initialized OrcDataPreprocessingMapper without sorting column");
     }
@@ -72,8 +74,9 @@ public class OrcDataPreprocessingMapper extends Mapper<NullWritable, OrcStruct,
       WritableComparable sortingColumnValue = value.getFieldValue(_sortingColumnId);
       WritableComparable outputKey;
       try {
-        outputKey = DataPreprocessingUtils
-            .convertToWritableComparable(OrcUtils.convert(sortingColumnValue), _sortingColumnType);
+        Object valueToConvert =
+            sortingColumnValue != null ? OrcUtils.convert(sortingColumnValue) : _sortingColumnDefaultNullValue;
+        outputKey = DataPreprocessingUtils.convertToWritableComparable(valueToConvert, _sortingColumnType);
       } catch (Exception e) {
         throw new IllegalStateException(String
             .format("Caught exception while processing sorting column: %s, id: %d in ORC struct: %s", _sortingColumn,
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/AvroDataPreprocessingPartitioner.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/AvroDataPreprocessingPartitioner.java
index f0c05db..22c4054 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/AvroDataPreprocessingPartitioner.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/AvroDataPreprocessingPartitioner.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.hadoop.job.partitioners;
 
 import com.google.common.base.Preconditions;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.mapred.AvroValue;
 import org.apache.hadoop.conf.Configurable;
@@ -38,22 +39,26 @@ public class AvroDataPreprocessingPartitioner extends Partitioner<WritableCompar
 
   private Configuration _conf;
   private String _partitionColumn;
+  private int _numPartitions;
   private PartitionFunction _partitionFunction;
+  private String _partitionColumnDefaultNullValue;
   private AvroRecordExtractor _avroRecordExtractor;
 
+  private final AtomicInteger _counter = new AtomicInteger(0);
+
   @Override
   public void setConf(Configuration conf) {
     _conf = conf;
     _avroRecordExtractor = new AvroRecordExtractor();
     _partitionColumn = conf.get(InternalConfigConstants.PARTITION_COLUMN_CONFIG);
     String partitionFunctionName = conf.get(InternalConfigConstants.PARTITION_FUNCTION_CONFIG);
-    int numPartitions = Integer.parseInt(conf.get(InternalConfigConstants.NUM_PARTITIONS_CONFIG));
-    _partitionFunction = PartitionFunctionFactory.getPartitionFunction(partitionFunctionName, numPartitions);
+    _numPartitions = Integer.parseInt(conf.get(InternalConfigConstants.NUM_PARTITIONS_CONFIG));
+    _partitionFunction = PartitionFunctionFactory.getPartitionFunction(partitionFunctionName, _numPartitions);
+    _partitionColumnDefaultNullValue = conf.get(InternalConfigConstants.PARTITION_COLUMN_DEFAULT_NULL_VALUE);
     LOGGER.info(
         "Initialized AvroDataPreprocessingPartitioner with partitionColumn: {}, partitionFunction: {}, numPartitions:"
-            + " {}",
-        _partitionColumn,
-        partitionFunctionName, numPartitions);
+            + " {}, default null value: {}",
+        _partitionColumn, partitionFunctionName, _numPartitions, _partitionColumnDefaultNullValue);
   }
 
   @Override
@@ -65,16 +70,19 @@ public class AvroDataPreprocessingPartitioner extends Partitioner<WritableCompar
   public int getPartition(WritableComparable key, AvroValue<GenericRecord> value, int numPartitions) {
     GenericRecord record = value.datum();
     Object object = record.get(_partitionColumn);
-    Preconditions
-        .checkState(object != null, "Failed to find value for partition column: %s in record: %s", _partitionColumn,
-            record);
-    Object convertedValue = _avroRecordExtractor.convert(object);
-    Preconditions.checkState(convertedValue != null, "Invalid value: %s for partition column: %s in record: %s", object,
-        _partitionColumn, record);
-    Preconditions.checkState(convertedValue instanceof Number || convertedValue instanceof String,
-        "Value for partition column: %s must be either a Number or a String, found: %s in record: %s", _partitionColumn,
-        convertedValue.getClass(), record);
-    // NOTE: Always partition with String type value because Broker uses String type value to prune segments
-    return _partitionFunction.getPartition(convertedValue.toString());
+    if (object == null || object.toString().equals(_partitionColumnDefaultNullValue)) {
+      return Math.abs(_counter.getAndIncrement()) % _numPartitions;
+    } else {
+      Object convertedValue = _avroRecordExtractor.convert(object);
+      Preconditions
+          .checkState(convertedValue != null, "Invalid value: %s for partition column: %s in record: %s", object,
+              _partitionColumn, record);
+      Preconditions.checkState(convertedValue instanceof Number || convertedValue instanceof String,
+          "Value for partition column: %s must be either a Number or a String, found: %s in record: %s",
+          _partitionColumn,
+          convertedValue.getClass(), record);
+      // NOTE: Always partition with String type value because Broker uses String type value to prune segments
+      return _partitionFunction.getPartition(convertedValue.toString());
+    }
   }
 }
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/OrcDataPreprocessingPartitioner.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/OrcDataPreprocessingPartitioner.java
index 914d145..b70d8f1 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/OrcDataPreprocessingPartitioner.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/OrcDataPreprocessingPartitioner.java
@@ -20,6 +20,7 @@ package org.apache.pinot.hadoop.job.partitioners;
 
 import com.google.common.base.Preconditions;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.WritableComparable;
@@ -38,21 +39,25 @@ public class OrcDataPreprocessingPartitioner extends Partitioner<WritableCompara
 
   private Configuration _conf;
   private String _partitionColumn;
+  private int _numPartitions;
   private PartitionFunction _partitionFunction;
+  private String _partitionColumnDefaultNullValue;
   private int _partitionColumnId = -1;
 
+  private final AtomicInteger _counter = new AtomicInteger(0);
+
   @Override
   public void setConf(Configuration conf) {
     _conf = conf;
     _partitionColumn = conf.get(InternalConfigConstants.PARTITION_COLUMN_CONFIG);
     String partitionFunctionName = conf.get(InternalConfigConstants.PARTITION_FUNCTION_CONFIG);
-    int numPartitions = Integer.parseInt(conf.get(InternalConfigConstants.NUM_PARTITIONS_CONFIG));
-    _partitionFunction = PartitionFunctionFactory.getPartitionFunction(partitionFunctionName, numPartitions);
+    _numPartitions = Integer.parseInt(conf.get(InternalConfigConstants.NUM_PARTITIONS_CONFIG));
+    _partitionFunction = PartitionFunctionFactory.getPartitionFunction(partitionFunctionName, _numPartitions);
+    _partitionColumnDefaultNullValue = conf.get(InternalConfigConstants.PARTITION_COLUMN_DEFAULT_NULL_VALUE);
     LOGGER.info(
         "Initialized OrcDataPreprocessingPartitioner with partitionColumn: {}, partitionFunction: {}, numPartitions: "
-            + "{}",
-        _partitionColumn,
-        partitionFunctionName, numPartitions);
+            + "{}, default null value: {}",
+        _partitionColumn, partitionFunctionName, _numPartitions, _partitionColumnDefaultNullValue);
   }
 
   @Override
@@ -71,16 +76,23 @@ public class OrcDataPreprocessingPartitioner extends Partitioner<WritableCompara
       LOGGER.info("Field id for partition column: {} is: {}", _partitionColumn, _partitionColumnId);
     }
     WritableComparable partitionColumnValue = orcStruct.getFieldValue(_partitionColumnId);
-    Object convertedValue;
-    try {
-      convertedValue = OrcUtils.convert(partitionColumnValue);
-    } catch (Exception e) {
-      throw new IllegalStateException(
-          String.format("Caught exception while processing partition column: %s, id: %d in ORC struct: %s",
-              _partitionColumn, _partitionColumnId, orcStruct),
-          e);
+    String convertedValueString = null;
+    if (partitionColumnValue != null) {
+      try {
+        Object convertedValue = OrcUtils.convert(partitionColumnValue);
+        convertedValueString = convertedValue.toString();
+      } catch (Exception e) {
+        throw new IllegalStateException(String
+            .format("Caught exception while processing partition column: %s, id: %d in ORC struct: %s",
+                _partitionColumn, _partitionColumnId, orcStruct), e);
+      }
+    }
+
+    if (convertedValueString == null || convertedValueString.equals(_partitionColumnDefaultNullValue)) {
+      return Math.abs(_counter.getAndIncrement()) % _numPartitions;
+    } else {
+      // NOTE: Always partition with String type value because Broker uses String type value to prune segments
+      return _partitionFunction.getPartition(convertedValueString);
     }
-    // NOTE: Always partition with String type value because Broker uses String type value to prune segments
-    return _partitionFunction.getPartition(convertedValue.toString());
   }
 }
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelper.java
index 287c51b..ae50168 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelper.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelper.java
@@ -58,9 +58,11 @@ public abstract class DataPreprocessingHelper {
   String _partitionColumn;
   int _numPartitions;
   String _partitionFunction;
+  String _partitionColumnDefaultNullValue;
 
   String _sortingColumn;
   private FieldSpec.DataType _sortingColumnType;
+  String _sortingColumnDefaultNullValue;
 
   private int _numOutputFiles;
   private int _maxNumRecordsPerFile;
@@ -79,16 +81,19 @@ public abstract class DataPreprocessingHelper {
   }
 
   public void registerConfigs(TableConfig tableConfig, Schema tableSchema, String partitionColumn, int numPartitions,
-      String partitionFunction, String sortingColumn, FieldSpec.DataType sortingColumnType, int numOutputFiles,
+      String partitionFunction, String partitionColumnDefaultNullValue, String sortingColumn,
+      FieldSpec.DataType sortingColumnType, String sortingColumnDefaultNullValue, int numOutputFiles,
       int maxNumRecordsPerFile) {
     _tableConfig = tableConfig;
     _pinotTableSchema = tableSchema;
     _partitionColumn = partitionColumn;
     _numPartitions = numPartitions;
     _partitionFunction = partitionFunction;
+    _partitionColumnDefaultNullValue = partitionColumnDefaultNullValue;
 
     _sortingColumn = sortingColumn;
     _sortingColumnType = sortingColumnType;
+    _sortingColumnDefaultNullValue = sortingColumnDefaultNullValue;
 
     _numOutputFiles = numOutputFiles;
     _maxNumRecordsPerFile = maxNumRecordsPerFile;
@@ -113,6 +118,7 @@ public abstract class DataPreprocessingHelper {
       LOGGER.info("Adding sorting column: {} to job config", _sortingColumn);
       jobConf.set(InternalConfigConstants.SORTING_COLUMN_CONFIG, _sortingColumn);
       jobConf.set(InternalConfigConstants.SORTING_COLUMN_TYPE, _sortingColumnType.name());
+      jobConf.set(InternalConfigConstants.SORTING_COLUMN_DEFAULT_NULL_VALUE, _sortingColumnDefaultNullValue);
 
       switch (_sortingColumnType) {
         case INT:
@@ -148,6 +154,7 @@ public abstract class DataPreprocessingHelper {
       if (_partitionFunction != null) {
         jobConf.set(InternalConfigConstants.PARTITION_FUNCTION_CONFIG, _partitionFunction);
       }
+      jobConf.set(InternalConfigConstants.PARTITION_COLUMN_DEFAULT_NULL_VALUE, _partitionColumnDefaultNullValue);
       jobConf.setInt(InternalConfigConstants.NUM_PARTITIONS_CONFIG, numReduceTasks);
       job.setPartitionerClass(getPartitioner());
     } else {
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelperTest.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelperTest.java
index dd132cf..2f97f72 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelperTest.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelperTest.java
@@ -22,6 +22,7 @@ import com.google.common.base.Preconditions;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.pinot.hadoop.job.InternalConfigConstants;
@@ -35,6 +36,7 @@ import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.testng.annotations.Test;
 
+import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 
@@ -66,11 +68,23 @@ public class DataPreprocessingHelperTest {
 
     Schema schema = new Schema.SchemaBuilder()
         .addDateTime("time_day", FieldSpec.DataType.LONG, "1:MILLISECONDS:EPOCH", "1:MILLISECONDS").build();
-    dataPreprocessingHelper.registerConfigs(tableConfig, schema, "column2", 1, "Murmur", "column4",
-        FieldSpec.DataType.INT, 0, 0);
+    dataPreprocessingHelper.registerConfigs(tableConfig, schema, "column2", 1, "Murmur", "0", "column4",
+        FieldSpec.DataType.INT, "0", 0, 0);
 
     Job job = dataPreprocessingHelper.setUpJob();
+    Configuration conf = job.getConfiguration();
     assertNotNull(job);
-    assertNull(job.getConfiguration().get(InternalConfigConstants.SEGMENT_TIME_SDF_PATTERN));
+    assertNull(conf.get(InternalConfigConstants.SEGMENT_TIME_SDF_PATTERN));
+
+    // Validate partitioning configs.
+    assertEquals(conf.get(InternalConfigConstants.PARTITION_COLUMN_CONFIG), "column2");
+    assertEquals(conf.get(InternalConfigConstants.PARTITION_FUNCTION_CONFIG), "Murmur");
+    assertEquals(conf.get(InternalConfigConstants.NUM_PARTITIONS_CONFIG), "1");
+    assertEquals(conf.get(InternalConfigConstants.PARTITION_COLUMN_DEFAULT_NULL_VALUE), "0");
+
+    // Validate sorting configs.
+    assertEquals(conf.get(InternalConfigConstants.SORTING_COLUMN_CONFIG), "column4");
+    assertEquals(conf.get(InternalConfigConstants.SORTING_COLUMN_TYPE), "INT");
+    assertEquals(conf.get(InternalConfigConstants.SORTING_COLUMN_DEFAULT_NULL_VALUE), "0");
   }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org