You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2021/11/10 19:37:19 UTC

[pinot] branch support-default-null-value-in-preprocessing created (now 87f83bf)

This is an automated email from the ASF dual-hosted git repository.

jlli pushed a change to branch support-default-null-value-in-preprocessing
in repository https://gitbox.apache.org/repos/asf/pinot.git.


      at 87f83bf  Support default null value in data preprocessing job

This branch includes the following new commits:

     new 87f83bf  Support default null value in data preprocessing job

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[pinot] 01/01: Support default null value in data preprocessing job

Posted by jl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch support-default-null-value-in-preprocessing
in repository https://gitbox.apache.org/repos/asf/pinot.git

commit 87f83bf548ef51b93bf96fba9a3aa6fa25741b67
Author: Jack Li(Analytics Engineering) <jl...@jlli-mn1.linkedin.biz>
AuthorDate: Wed Nov 10 11:36:45 2021 -0800

    Support default null value in data preprocessing job
---
 .../hadoop/job/HadoopSegmentPreprocessingJob.java  |  9 ++++-
 .../pinot/hadoop/job/InternalConfigConstants.java  |  2 ++
 .../job/mappers/AvroDataPreprocessingMapper.java   | 19 +++++-----
 .../job/mappers/OrcDataPreprocessingMapper.java    | 11 +++---
 .../AvroDataPreprocessingPartitioner.java          | 40 ++++++++++++---------
 .../OrcDataPreprocessingPartitioner.java           | 42 ++++++++++++++--------
 .../job/preprocess/DataPreprocessingHelper.java    |  9 ++++-
 .../preprocess/DataPreprocessingHelperTest.java    | 20 +++++++++--
 8 files changed, 103 insertions(+), 49 deletions(-)

diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/HadoopSegmentPreprocessingJob.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/HadoopSegmentPreprocessingJob.java
index 0a6e10e..85f5c6c 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/HadoopSegmentPreprocessingJob.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/HadoopSegmentPreprocessingJob.java
@@ -63,9 +63,11 @@ public class HadoopSegmentPreprocessingJob extends SegmentPreprocessingJob {
   private String _partitionColumn;
   private int _numPartitions;
   private String _partitionFunction;
+  private String _partitionColumnDefaultNullValue;
 
   private String _sortingColumn;
   private FieldSpec.DataType _sortingColumnType;
+  private String _sortingColumnDefaultNullValue;
 
   private int _numOutputFiles;
   private int _maxNumRecordsPerFile;
@@ -101,7 +103,7 @@ public class HadoopSegmentPreprocessingJob extends SegmentPreprocessingJob {
         DataPreprocessingHelperFactory.generateDataPreprocessingHelper(_inputSegmentDir, _preprocessedOutputDir);
     dataPreprocessingHelper
         .registerConfigs(_tableConfig, _pinotTableSchema, _partitionColumn, _numPartitions, _partitionFunction,
-            _sortingColumn, _sortingColumnType,
+            _partitionColumnDefaultNullValue, _sortingColumn, _sortingColumnType, _sortingColumnDefaultNullValue,
             _numOutputFiles, _maxNumRecordsPerFile);
 
     Job job = dataPreprocessingHelper.setUpJob();
@@ -156,6 +158,8 @@ public class HadoopSegmentPreprocessingJob extends SegmentPreprocessingJob {
         _partitionColumn = columnPartitionMap.keySet().iterator().next();
         _numPartitions = segmentPartitionConfig.getNumPartitions(_partitionColumn);
         _partitionFunction = segmentPartitionConfig.getFunctionName(_partitionColumn);
+        _partitionColumnDefaultNullValue =
+            _pinotTableSchema.getFieldSpecFor(_partitionColumn).getDefaultNullValueString();
       }
     } else {
       LOGGER.info("Segment partition config is null for table: {}", _tableConfig.getTableName());
@@ -201,6 +205,9 @@ public class HadoopSegmentPreprocessingJob extends SegmentPreprocessingJob {
         LOGGER.info("Sorting the data with column: {} of type: {}", _sortingColumn, _sortingColumnType);
       }
     }
+    if (_sortingColumn != null) {
+      _sortingColumnDefaultNullValue = _pinotTableSchema.getFieldSpecFor(_sortingColumn).getDefaultNullValueString();
+    }
   }
 
   private void fetchResizingConfig() {
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/InternalConfigConstants.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/InternalConfigConstants.java
index ef898e3..b26fc38 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/InternalConfigConstants.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/InternalConfigConstants.java
@@ -41,9 +41,11 @@ public class InternalConfigConstants {
   public static final String PARTITION_COLUMN_CONFIG = "partition.column";
   public static final String NUM_PARTITIONS_CONFIG = "num.partitions";
   public static final String PARTITION_FUNCTION_CONFIG = "partition.function";
+  public static final String PARTITION_COLUMN_DEFAULT_NULL_VALUE = "partition.default.null.value";
 
   public static final String SORTING_COLUMN_CONFIG = "sorting.column";
   public static final String SORTING_COLUMN_TYPE = "sorting.type";
+  public static final String SORTING_COLUMN_DEFAULT_NULL_VALUE = "sorting.default.null.value";
   public static final String ENABLE_PARTITIONING = "enable.partitioning";
 
   @Deprecated
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/AvroDataPreprocessingMapper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/AvroDataPreprocessingMapper.java
index 8d2a3eb..d9f17ac 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/AvroDataPreprocessingMapper.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/AvroDataPreprocessingMapper.java
@@ -41,6 +41,7 @@ public class AvroDataPreprocessingMapper
 
   private String _sortingColumn = null;
   private FieldSpec.DataType _sortingColumnType = null;
+  private String _sortingColumnDefaultNullValue = null;
   private AvroRecordExtractor _avroRecordExtractor;
 
   @Override
@@ -51,8 +52,9 @@ public class AvroDataPreprocessingMapper
     if (sortingColumnConfig != null) {
       _sortingColumn = sortingColumnConfig;
       _sortingColumnType = FieldSpec.DataType.valueOf(configuration.get(InternalConfigConstants.SORTING_COLUMN_TYPE));
-      LOGGER.info("Initialized AvroDataPreprocessingMapper with sortingColumn: {} of type: {}", _sortingColumn,
-          _sortingColumnType);
+      _sortingColumnDefaultNullValue = configuration.get(InternalConfigConstants.SORTING_COLUMN_DEFAULT_NULL_VALUE);
+      LOGGER.info("Initialized AvroDataPreprocessingMapper with sortingColumn: {} of type: {}, default null value: {}",
+          _sortingColumn, _sortingColumnType, _sortingColumnDefaultNullValue);
     } else {
       LOGGER.info("Initialized AvroDataPreprocessingMapper without sorting column");
     }
@@ -64,18 +66,17 @@ public class AvroDataPreprocessingMapper
     GenericRecord record = key.datum();
     if (_sortingColumn != null) {
       Object object = record.get(_sortingColumn);
-      Preconditions
-          .checkState(object != null, "Failed to find value for sorting column: %s in record: %s", _sortingColumn,
-              record);
-      Object convertedValue = _avroRecordExtractor.convert(object);
-      Preconditions.checkState(convertedValue != null, "Invalid value: %s for sorting column: %s in record: %s", object,
+      Object valueToConvert = object != null ? _avroRecordExtractor.convert(object) : _sortingColumnDefaultNullValue;
+      Preconditions.checkState(valueToConvert != null, "Invalid value: %s for sorting column: %s in record: %s", object,
           _sortingColumn, record);
+
       WritableComparable outputKey;
       try {
-        outputKey = DataPreprocessingUtils.convertToWritableComparable(convertedValue, _sortingColumnType);
+        outputKey = DataPreprocessingUtils.convertToWritableComparable(valueToConvert, _sortingColumnType);
       } catch (Exception e) {
         throw new IllegalStateException(
-            String.format("Caught exception while processing sorting column: %s in record: %s", _sortingColumn, record),
+            String
+                .format("Caught exception while processing sorting column: %s in record: %s", _sortingColumn, record),
             e);
       }
       context.write(outputKey, new AvroValue<>(record));
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/OrcDataPreprocessingMapper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/OrcDataPreprocessingMapper.java
index d7d0694..8ad9d84 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/OrcDataPreprocessingMapper.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/OrcDataPreprocessingMapper.java
@@ -41,6 +41,7 @@ public class OrcDataPreprocessingMapper extends Mapper<NullWritable, OrcStruct,
   private final OrcValue _valueWrapper = new OrcValue();
   private String _sortingColumn = null;
   private FieldSpec.DataType _sortingColumnType = null;
+  private String _sortingColumnDefaultNullValue = null;
   private int _sortingColumnId = -1;
 
   @Override
@@ -50,8 +51,9 @@ public class OrcDataPreprocessingMapper extends Mapper<NullWritable, OrcStruct,
     if (sortingColumnConfig != null) {
       _sortingColumn = sortingColumnConfig;
       _sortingColumnType = FieldSpec.DataType.valueOf(configuration.get(InternalConfigConstants.SORTING_COLUMN_TYPE));
-      LOGGER.info("Initialized OrcDataPreprocessingMapper with sortingColumn: {} of type: {}", _sortingColumn,
-          _sortingColumnType);
+      _sortingColumnDefaultNullValue = configuration.get(InternalConfigConstants.SORTING_COLUMN_DEFAULT_NULL_VALUE);
+      LOGGER.info("Initialized OrcDataPreprocessingMapper with sortingColumn: {} of type: {}, default null value: {}",
+          _sortingColumn, _sortingColumnType, _sortingColumnDefaultNullValue);
     } else {
       LOGGER.info("Initialized OrcDataPreprocessingMapper without sorting column");
     }
@@ -72,8 +74,9 @@ public class OrcDataPreprocessingMapper extends Mapper<NullWritable, OrcStruct,
       WritableComparable sortingColumnValue = value.getFieldValue(_sortingColumnId);
       WritableComparable outputKey;
       try {
-        outputKey = DataPreprocessingUtils
-            .convertToWritableComparable(OrcUtils.convert(sortingColumnValue), _sortingColumnType);
+        Object valueToConvert =
+            sortingColumnValue != null ? OrcUtils.convert(sortingColumnValue) : _sortingColumnDefaultNullValue;
+        outputKey = DataPreprocessingUtils.convertToWritableComparable(valueToConvert, _sortingColumnType);
       } catch (Exception e) {
         throw new IllegalStateException(String
             .format("Caught exception while processing sorting column: %s, id: %d in ORC struct: %s", _sortingColumn,
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/AvroDataPreprocessingPartitioner.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/AvroDataPreprocessingPartitioner.java
index f0c05db..22c4054 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/AvroDataPreprocessingPartitioner.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/AvroDataPreprocessingPartitioner.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.hadoop.job.partitioners;
 
 import com.google.common.base.Preconditions;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.mapred.AvroValue;
 import org.apache.hadoop.conf.Configurable;
@@ -38,22 +39,26 @@ public class AvroDataPreprocessingPartitioner extends Partitioner<WritableCompar
 
   private Configuration _conf;
   private String _partitionColumn;
+  private int _numPartitions;
   private PartitionFunction _partitionFunction;
+  private String _partitionColumnDefaultNullValue;
   private AvroRecordExtractor _avroRecordExtractor;
 
+  private final AtomicInteger _counter = new AtomicInteger(0);
+
   @Override
   public void setConf(Configuration conf) {
     _conf = conf;
     _avroRecordExtractor = new AvroRecordExtractor();
     _partitionColumn = conf.get(InternalConfigConstants.PARTITION_COLUMN_CONFIG);
     String partitionFunctionName = conf.get(InternalConfigConstants.PARTITION_FUNCTION_CONFIG);
-    int numPartitions = Integer.parseInt(conf.get(InternalConfigConstants.NUM_PARTITIONS_CONFIG));
-    _partitionFunction = PartitionFunctionFactory.getPartitionFunction(partitionFunctionName, numPartitions);
+    _numPartitions = Integer.parseInt(conf.get(InternalConfigConstants.NUM_PARTITIONS_CONFIG));
+    _partitionFunction = PartitionFunctionFactory.getPartitionFunction(partitionFunctionName, _numPartitions);
+    _partitionColumnDefaultNullValue = conf.get(InternalConfigConstants.PARTITION_COLUMN_DEFAULT_NULL_VALUE);
     LOGGER.info(
         "Initialized AvroDataPreprocessingPartitioner with partitionColumn: {}, partitionFunction: {}, numPartitions:"
-            + " {}",
-        _partitionColumn,
-        partitionFunctionName, numPartitions);
+            + " {}, default null value: {}",
+        _partitionColumn, partitionFunctionName, _numPartitions, _partitionColumnDefaultNullValue);
   }
 
   @Override
@@ -65,16 +70,19 @@ public class AvroDataPreprocessingPartitioner extends Partitioner<WritableCompar
   public int getPartition(WritableComparable key, AvroValue<GenericRecord> value, int numPartitions) {
     GenericRecord record = value.datum();
     Object object = record.get(_partitionColumn);
-    Preconditions
-        .checkState(object != null, "Failed to find value for partition column: %s in record: %s", _partitionColumn,
-            record);
-    Object convertedValue = _avroRecordExtractor.convert(object);
-    Preconditions.checkState(convertedValue != null, "Invalid value: %s for partition column: %s in record: %s", object,
-        _partitionColumn, record);
-    Preconditions.checkState(convertedValue instanceof Number || convertedValue instanceof String,
-        "Value for partition column: %s must be either a Number or a String, found: %s in record: %s", _partitionColumn,
-        convertedValue.getClass(), record);
-    // NOTE: Always partition with String type value because Broker uses String type value to prune segments
-    return _partitionFunction.getPartition(convertedValue.toString());
+    if (object == null || object.toString().equals(_partitionColumnDefaultNullValue)) {
+      return Math.abs(_counter.getAndIncrement()) % _numPartitions;
+    } else {
+      Object convertedValue = _avroRecordExtractor.convert(object);
+      Preconditions
+          .checkState(convertedValue != null, "Invalid value: %s for partition column: %s in record: %s", object,
+              _partitionColumn, record);
+      Preconditions.checkState(convertedValue instanceof Number || convertedValue instanceof String,
+          "Value for partition column: %s must be either a Number or a String, found: %s in record: %s",
+          _partitionColumn,
+          convertedValue.getClass(), record);
+      // NOTE: Always partition with String type value because Broker uses String type value to prune segments
+      return _partitionFunction.getPartition(convertedValue.toString());
+    }
   }
 }
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/OrcDataPreprocessingPartitioner.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/OrcDataPreprocessingPartitioner.java
index 914d145..b70d8f1 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/OrcDataPreprocessingPartitioner.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/OrcDataPreprocessingPartitioner.java
@@ -20,6 +20,7 @@ package org.apache.pinot.hadoop.job.partitioners;
 
 import com.google.common.base.Preconditions;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.WritableComparable;
@@ -38,21 +39,25 @@ public class OrcDataPreprocessingPartitioner extends Partitioner<WritableCompara
 
   private Configuration _conf;
   private String _partitionColumn;
+  private int _numPartitions;
   private PartitionFunction _partitionFunction;
+  private String _partitionColumnDefaultNullValue;
   private int _partitionColumnId = -1;
 
+  private final AtomicInteger _counter = new AtomicInteger(0);
+
   @Override
   public void setConf(Configuration conf) {
     _conf = conf;
     _partitionColumn = conf.get(InternalConfigConstants.PARTITION_COLUMN_CONFIG);
     String partitionFunctionName = conf.get(InternalConfigConstants.PARTITION_FUNCTION_CONFIG);
-    int numPartitions = Integer.parseInt(conf.get(InternalConfigConstants.NUM_PARTITIONS_CONFIG));
-    _partitionFunction = PartitionFunctionFactory.getPartitionFunction(partitionFunctionName, numPartitions);
+    _numPartitions = Integer.parseInt(conf.get(InternalConfigConstants.NUM_PARTITIONS_CONFIG));
+    _partitionFunction = PartitionFunctionFactory.getPartitionFunction(partitionFunctionName, _numPartitions);
+    _partitionColumnDefaultNullValue = conf.get(InternalConfigConstants.PARTITION_COLUMN_DEFAULT_NULL_VALUE);
     LOGGER.info(
         "Initialized OrcDataPreprocessingPartitioner with partitionColumn: {}, partitionFunction: {}, numPartitions: "
-            + "{}",
-        _partitionColumn,
-        partitionFunctionName, numPartitions);
+            + "{}, default null value: {}",
+        _partitionColumn, partitionFunctionName, _numPartitions, _partitionColumnDefaultNullValue);
   }
 
   @Override
@@ -71,16 +76,23 @@ public class OrcDataPreprocessingPartitioner extends Partitioner<WritableCompara
       LOGGER.info("Field id for partition column: {} is: {}", _partitionColumn, _partitionColumnId);
     }
     WritableComparable partitionColumnValue = orcStruct.getFieldValue(_partitionColumnId);
-    Object convertedValue;
-    try {
-      convertedValue = OrcUtils.convert(partitionColumnValue);
-    } catch (Exception e) {
-      throw new IllegalStateException(
-          String.format("Caught exception while processing partition column: %s, id: %d in ORC struct: %s",
-              _partitionColumn, _partitionColumnId, orcStruct),
-          e);
+    String convertedValueString = null;
+    if (partitionColumnValue != null) {
+      try {
+        Object convertedValue = OrcUtils.convert(partitionColumnValue);
+        convertedValueString = convertedValue.toString();
+      } catch (Exception e) {
+        throw new IllegalStateException(String
+            .format("Caught exception while processing partition column: %s, id: %d in ORC struct: %s",
+                _partitionColumn, _partitionColumnId, orcStruct), e);
+      }
+    }
+
+    if (convertedValueString == null || convertedValueString.equals(_partitionColumnDefaultNullValue)) {
+      return Math.abs(_counter.getAndIncrement()) % _numPartitions;
+    } else {
+      // NOTE: Always partition with String type value because Broker uses String type value to prune segments
+      return _partitionFunction.getPartition(convertedValueString);
     }
-    // NOTE: Always partition with String type value because Broker uses String type value to prune segments
-    return _partitionFunction.getPartition(convertedValue.toString());
   }
 }
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelper.java
index 287c51b..ae50168 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelper.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelper.java
@@ -58,9 +58,11 @@ public abstract class DataPreprocessingHelper {
   String _partitionColumn;
   int _numPartitions;
   String _partitionFunction;
+  String _partitionColumnDefaultNullValue;
 
   String _sortingColumn;
   private FieldSpec.DataType _sortingColumnType;
+  String _sortingColumnDefaultNullValue;
 
   private int _numOutputFiles;
   private int _maxNumRecordsPerFile;
@@ -79,16 +81,19 @@ public abstract class DataPreprocessingHelper {
   }
 
   public void registerConfigs(TableConfig tableConfig, Schema tableSchema, String partitionColumn, int numPartitions,
-      String partitionFunction, String sortingColumn, FieldSpec.DataType sortingColumnType, int numOutputFiles,
+      String partitionFunction, String partitionColumnDefaultNullValue, String sortingColumn,
+      FieldSpec.DataType sortingColumnType, String sortingColumnDefaultNullValue, int numOutputFiles,
       int maxNumRecordsPerFile) {
     _tableConfig = tableConfig;
     _pinotTableSchema = tableSchema;
     _partitionColumn = partitionColumn;
     _numPartitions = numPartitions;
     _partitionFunction = partitionFunction;
+    _partitionColumnDefaultNullValue = partitionColumnDefaultNullValue;
 
     _sortingColumn = sortingColumn;
     _sortingColumnType = sortingColumnType;
+    _sortingColumnDefaultNullValue = sortingColumnDefaultNullValue;
 
     _numOutputFiles = numOutputFiles;
     _maxNumRecordsPerFile = maxNumRecordsPerFile;
@@ -113,6 +118,7 @@ public abstract class DataPreprocessingHelper {
       LOGGER.info("Adding sorting column: {} to job config", _sortingColumn);
       jobConf.set(InternalConfigConstants.SORTING_COLUMN_CONFIG, _sortingColumn);
       jobConf.set(InternalConfigConstants.SORTING_COLUMN_TYPE, _sortingColumnType.name());
+      jobConf.set(InternalConfigConstants.SORTING_COLUMN_DEFAULT_NULL_VALUE, _sortingColumnDefaultNullValue);
 
       switch (_sortingColumnType) {
         case INT:
@@ -148,6 +154,7 @@ public abstract class DataPreprocessingHelper {
       if (_partitionFunction != null) {
         jobConf.set(InternalConfigConstants.PARTITION_FUNCTION_CONFIG, _partitionFunction);
       }
+      jobConf.set(InternalConfigConstants.PARTITION_COLUMN_DEFAULT_NULL_VALUE, _partitionColumnDefaultNullValue);
       jobConf.setInt(InternalConfigConstants.NUM_PARTITIONS_CONFIG, numReduceTasks);
       job.setPartitionerClass(getPartitioner());
     } else {
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelperTest.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelperTest.java
index dd132cf..2f97f72 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelperTest.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelperTest.java
@@ -22,6 +22,7 @@ import com.google.common.base.Preconditions;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.pinot.hadoop.job.InternalConfigConstants;
@@ -35,6 +36,7 @@ import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.testng.annotations.Test;
 
+import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 
@@ -66,11 +68,23 @@ public class DataPreprocessingHelperTest {
 
     Schema schema = new Schema.SchemaBuilder()
         .addDateTime("time_day", FieldSpec.DataType.LONG, "1:MILLISECONDS:EPOCH", "1:MILLISECONDS").build();
-    dataPreprocessingHelper.registerConfigs(tableConfig, schema, "column2", 1, "Murmur", "column4",
-        FieldSpec.DataType.INT, 0, 0);
+    dataPreprocessingHelper.registerConfigs(tableConfig, schema, "column2", 1, "Murmur", "0", "column4",
+        FieldSpec.DataType.INT, "0", 0, 0);
 
     Job job = dataPreprocessingHelper.setUpJob();
+    Configuration conf = job.getConfiguration();
     assertNotNull(job);
-    assertNull(job.getConfiguration().get(InternalConfigConstants.SEGMENT_TIME_SDF_PATTERN));
+    assertNull(conf.get(InternalConfigConstants.SEGMENT_TIME_SDF_PATTERN));
+
+    // Validate partitioning configs.
+    assertEquals(conf.get(InternalConfigConstants.PARTITION_COLUMN_CONFIG), "column2");
+    assertEquals(conf.get(InternalConfigConstants.PARTITION_FUNCTION_CONFIG), "Murmur");
+    assertEquals(conf.get(InternalConfigConstants.NUM_PARTITIONS_CONFIG), "1");
+    assertEquals(conf.get(InternalConfigConstants.PARTITION_COLUMN_DEFAULT_NULL_VALUE), "0");
+
+    // Validate sorting configs.
+    assertEquals(conf.get(InternalConfigConstants.SORTING_COLUMN_CONFIG), "column4");
+    assertEquals(conf.get(InternalConfigConstants.SORTING_COLUMN_TYPE), "INT");
+    assertEquals(conf.get(InternalConfigConstants.SORTING_COLUMN_DEFAULT_NULL_VALUE), "0");
   }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org