You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2021/11/12 23:06:08 UTC

[pinot] branch master updated: Support default null value in data preprocessing job (#7739)

This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 068549c  Support default null value in data preprocessing job (#7739)
068549c is described below

commit 068549c185eacea4b8e0eb4126bb62cc33157f15
Author: Jialiang Li <jl...@linkedin.com>
AuthorDate: Fri Nov 12 15:05:53 2021 -0800

    Support default null value in data preprocessing job (#7739)
    
    Co-authored-by: Jack Li(Analytics Engineering) <jl...@jlli-mn1.linkedin.biz>
---
 .../hadoop/job/HadoopSegmentPreprocessingJob.java  |  9 ++-
 .../pinot/hadoop/job/InternalConfigConstants.java  |  2 +
 .../job/mappers/AvroDataPreprocessingMapper.java   | 19 +++---
 .../job/mappers/OrcDataPreprocessingMapper.java    | 11 ++--
 .../AvroDataPreprocessingPartitioner.java          | 56 ++++++++++++------
 .../OrcDataPreprocessingPartitioner.java           | 68 ++++++++++++++--------
 .../job/preprocess/DataPreprocessingHelper.java    | 11 +++-
 .../preprocess/DataPreprocessingHelperTest.java    | 20 ++++++-
 8 files changed, 135 insertions(+), 61 deletions(-)

diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/HadoopSegmentPreprocessingJob.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/HadoopSegmentPreprocessingJob.java
index 0a6e10e..85f5c6c 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/HadoopSegmentPreprocessingJob.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/HadoopSegmentPreprocessingJob.java
@@ -63,9 +63,11 @@ public class HadoopSegmentPreprocessingJob extends SegmentPreprocessingJob {
   private String _partitionColumn;
   private int _numPartitions;
   private String _partitionFunction;
+  private String _partitionColumnDefaultNullValue;
 
   private String _sortingColumn;
   private FieldSpec.DataType _sortingColumnType;
+  private String _sortingColumnDefaultNullValue;
 
   private int _numOutputFiles;
   private int _maxNumRecordsPerFile;
@@ -101,7 +103,7 @@ public class HadoopSegmentPreprocessingJob extends SegmentPreprocessingJob {
         DataPreprocessingHelperFactory.generateDataPreprocessingHelper(_inputSegmentDir, _preprocessedOutputDir);
     dataPreprocessingHelper
         .registerConfigs(_tableConfig, _pinotTableSchema, _partitionColumn, _numPartitions, _partitionFunction,
-            _sortingColumn, _sortingColumnType,
+            _partitionColumnDefaultNullValue, _sortingColumn, _sortingColumnType, _sortingColumnDefaultNullValue,
             _numOutputFiles, _maxNumRecordsPerFile);
 
     Job job = dataPreprocessingHelper.setUpJob();
@@ -156,6 +158,8 @@ public class HadoopSegmentPreprocessingJob extends SegmentPreprocessingJob {
         _partitionColumn = columnPartitionMap.keySet().iterator().next();
         _numPartitions = segmentPartitionConfig.getNumPartitions(_partitionColumn);
         _partitionFunction = segmentPartitionConfig.getFunctionName(_partitionColumn);
+        _partitionColumnDefaultNullValue =
+            _pinotTableSchema.getFieldSpecFor(_partitionColumn).getDefaultNullValueString();
       }
     } else {
       LOGGER.info("Segment partition config is null for table: {}", _tableConfig.getTableName());
@@ -201,6 +205,9 @@ public class HadoopSegmentPreprocessingJob extends SegmentPreprocessingJob {
         LOGGER.info("Sorting the data with column: {} of type: {}", _sortingColumn, _sortingColumnType);
       }
     }
+    if (_sortingColumn != null) {
+      _sortingColumnDefaultNullValue = _pinotTableSchema.getFieldSpecFor(_sortingColumn).getDefaultNullValueString();
+    }
   }
 
   private void fetchResizingConfig() {
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/InternalConfigConstants.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/InternalConfigConstants.java
index ef898e3..b26fc38 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/InternalConfigConstants.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/InternalConfigConstants.java
@@ -41,9 +41,11 @@ public class InternalConfigConstants {
   public static final String PARTITION_COLUMN_CONFIG = "partition.column";
   public static final String NUM_PARTITIONS_CONFIG = "num.partitions";
   public static final String PARTITION_FUNCTION_CONFIG = "partition.function";
+  public static final String PARTITION_COLUMN_DEFAULT_NULL_VALUE = "partition.default.null.value";
 
   public static final String SORTING_COLUMN_CONFIG = "sorting.column";
   public static final String SORTING_COLUMN_TYPE = "sorting.type";
+  public static final String SORTING_COLUMN_DEFAULT_NULL_VALUE = "sorting.default.null.value";
   public static final String ENABLE_PARTITIONING = "enable.partitioning";
 
   @Deprecated
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/AvroDataPreprocessingMapper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/AvroDataPreprocessingMapper.java
index 8d2a3eb..d9f17ac 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/AvroDataPreprocessingMapper.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/AvroDataPreprocessingMapper.java
@@ -41,6 +41,7 @@ public class AvroDataPreprocessingMapper
 
   private String _sortingColumn = null;
   private FieldSpec.DataType _sortingColumnType = null;
+  private String _sortingColumnDefaultNullValue = null;
   private AvroRecordExtractor _avroRecordExtractor;
 
   @Override
@@ -51,8 +52,9 @@ public class AvroDataPreprocessingMapper
     if (sortingColumnConfig != null) {
       _sortingColumn = sortingColumnConfig;
       _sortingColumnType = FieldSpec.DataType.valueOf(configuration.get(InternalConfigConstants.SORTING_COLUMN_TYPE));
-      LOGGER.info("Initialized AvroDataPreprocessingMapper with sortingColumn: {} of type: {}", _sortingColumn,
-          _sortingColumnType);
+      _sortingColumnDefaultNullValue = configuration.get(InternalConfigConstants.SORTING_COLUMN_DEFAULT_NULL_VALUE);
+      LOGGER.info("Initialized AvroDataPreprocessingMapper with sortingColumn: {} of type: {}, default null value: {}",
+          _sortingColumn, _sortingColumnType, _sortingColumnDefaultNullValue);
     } else {
       LOGGER.info("Initialized AvroDataPreprocessingMapper without sorting column");
     }
@@ -64,18 +66,17 @@ public class AvroDataPreprocessingMapper
     GenericRecord record = key.datum();
     if (_sortingColumn != null) {
       Object object = record.get(_sortingColumn);
-      Preconditions
-          .checkState(object != null, "Failed to find value for sorting column: %s in record: %s", _sortingColumn,
-              record);
-      Object convertedValue = _avroRecordExtractor.convert(object);
-      Preconditions.checkState(convertedValue != null, "Invalid value: %s for sorting column: %s in record: %s", object,
+      Object valueToConvert = object != null ? _avroRecordExtractor.convert(object) : _sortingColumnDefaultNullValue;
+      Preconditions.checkState(valueToConvert != null, "Invalid value: %s for sorting column: %s in record: %s", object,
           _sortingColumn, record);
+
       WritableComparable outputKey;
       try {
-        outputKey = DataPreprocessingUtils.convertToWritableComparable(convertedValue, _sortingColumnType);
+        outputKey = DataPreprocessingUtils.convertToWritableComparable(valueToConvert, _sortingColumnType);
       } catch (Exception e) {
         throw new IllegalStateException(
-            String.format("Caught exception while processing sorting column: %s in record: %s", _sortingColumn, record),
+            String
+                .format("Caught exception while processing sorting column: %s in record: %s", _sortingColumn, record),
             e);
       }
       context.write(outputKey, new AvroValue<>(record));
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/OrcDataPreprocessingMapper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/OrcDataPreprocessingMapper.java
index d7d0694..8ad9d84 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/OrcDataPreprocessingMapper.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/OrcDataPreprocessingMapper.java
@@ -41,6 +41,7 @@ public class OrcDataPreprocessingMapper extends Mapper<NullWritable, OrcStruct,
   private final OrcValue _valueWrapper = new OrcValue();
   private String _sortingColumn = null;
   private FieldSpec.DataType _sortingColumnType = null;
+  private String _sortingColumnDefaultNullValue = null;
   private int _sortingColumnId = -1;
 
   @Override
@@ -50,8 +51,9 @@ public class OrcDataPreprocessingMapper extends Mapper<NullWritable, OrcStruct,
     if (sortingColumnConfig != null) {
       _sortingColumn = sortingColumnConfig;
       _sortingColumnType = FieldSpec.DataType.valueOf(configuration.get(InternalConfigConstants.SORTING_COLUMN_TYPE));
-      LOGGER.info("Initialized OrcDataPreprocessingMapper with sortingColumn: {} of type: {}", _sortingColumn,
-          _sortingColumnType);
+      _sortingColumnDefaultNullValue = configuration.get(InternalConfigConstants.SORTING_COLUMN_DEFAULT_NULL_VALUE);
+      LOGGER.info("Initialized OrcDataPreprocessingMapper with sortingColumn: {} of type: {}, default null value: {}",
+          _sortingColumn, _sortingColumnType, _sortingColumnDefaultNullValue);
     } else {
       LOGGER.info("Initialized OrcDataPreprocessingMapper without sorting column");
     }
@@ -72,8 +74,9 @@ public class OrcDataPreprocessingMapper extends Mapper<NullWritable, OrcStruct,
       WritableComparable sortingColumnValue = value.getFieldValue(_sortingColumnId);
       WritableComparable outputKey;
       try {
-        outputKey = DataPreprocessingUtils
-            .convertToWritableComparable(OrcUtils.convert(sortingColumnValue), _sortingColumnType);
+        Object valueToConvert =
+            sortingColumnValue != null ? OrcUtils.convert(sortingColumnValue) : _sortingColumnDefaultNullValue;
+        outputKey = DataPreprocessingUtils.convertToWritableComparable(valueToConvert, _sortingColumnType);
       } catch (Exception e) {
         throw new IllegalStateException(String
             .format("Caught exception while processing sorting column: %s, id: %d in ORC struct: %s", _sortingColumn,
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/AvroDataPreprocessingPartitioner.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/AvroDataPreprocessingPartitioner.java
index f0c05db..5f2ae4d 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/AvroDataPreprocessingPartitioner.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/AvroDataPreprocessingPartitioner.java
@@ -19,11 +19,13 @@
 package org.apache.pinot.hadoop.job.partitioners;
 
 import com.google.common.base.Preconditions;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.mapred.AvroValue;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.Partitioner;
 import org.apache.pinot.hadoop.job.InternalConfigConstants;
 import org.apache.pinot.plugin.inputformat.avro.AvroRecordExtractor;
@@ -39,7 +41,11 @@ public class AvroDataPreprocessingPartitioner extends Partitioner<WritableCompar
   private Configuration _conf;
   private String _partitionColumn;
   private PartitionFunction _partitionFunction;
+  private String _partitionColumnDefaultNullValue;
   private AvroRecordExtractor _avroRecordExtractor;
+  private int _numReducers = -1;
+
+  private final AtomicInteger _counter = new AtomicInteger(0);
 
   @Override
   public void setConf(Configuration conf) {
@@ -47,13 +53,19 @@ public class AvroDataPreprocessingPartitioner extends Partitioner<WritableCompar
     _avroRecordExtractor = new AvroRecordExtractor();
     _partitionColumn = conf.get(InternalConfigConstants.PARTITION_COLUMN_CONFIG);
     String partitionFunctionName = conf.get(InternalConfigConstants.PARTITION_FUNCTION_CONFIG);
-    int numPartitions = Integer.parseInt(conf.get(InternalConfigConstants.NUM_PARTITIONS_CONFIG));
-    _partitionFunction = PartitionFunctionFactory.getPartitionFunction(partitionFunctionName, numPartitions);
+    String numPartitionsString = conf.get(InternalConfigConstants.NUM_PARTITIONS_CONFIG);
+    int numPartitions = -1;
+    if (_partitionColumn != null) {
+      numPartitions = Integer.parseInt(numPartitionsString);
+      _partitionFunction = PartitionFunctionFactory.getPartitionFunction(partitionFunctionName, numPartitions);
+    } else {
+      _numReducers = Integer.parseInt(conf.get(MRJobConfig.NUM_REDUCES));
+    }
+    _partitionColumnDefaultNullValue = conf.get(InternalConfigConstants.PARTITION_COLUMN_DEFAULT_NULL_VALUE);
     LOGGER.info(
         "Initialized AvroDataPreprocessingPartitioner with partitionColumn: {}, partitionFunction: {}, numPartitions:"
-            + " {}",
-        _partitionColumn,
-        partitionFunctionName, numPartitions);
+            + " {}, default null value: {}",
+        _partitionColumn, partitionFunctionName, numPartitions, _partitionColumnDefaultNullValue);
   }
 
   @Override
@@ -63,18 +75,26 @@ public class AvroDataPreprocessingPartitioner extends Partitioner<WritableCompar
 
   @Override
   public int getPartition(WritableComparable key, AvroValue<GenericRecord> value, int numPartitions) {
-    GenericRecord record = value.datum();
-    Object object = record.get(_partitionColumn);
-    Preconditions
-        .checkState(object != null, "Failed to find value for partition column: %s in record: %s", _partitionColumn,
-            record);
-    Object convertedValue = _avroRecordExtractor.convert(object);
-    Preconditions.checkState(convertedValue != null, "Invalid value: %s for partition column: %s in record: %s", object,
-        _partitionColumn, record);
-    Preconditions.checkState(convertedValue instanceof Number || convertedValue instanceof String,
-        "Value for partition column: %s must be either a Number or a String, found: %s in record: %s", _partitionColumn,
-        convertedValue.getClass(), record);
-    // NOTE: Always partition with String type value because Broker uses String type value to prune segments
-    return _partitionFunction.getPartition(convertedValue.toString());
+    if (_partitionColumn == null) {
+      return Math.abs(_counter.getAndIncrement()) % _numReducers;
+    } else {
+      GenericRecord record = value.datum();
+      Object object = record.get(_partitionColumn);
+      String convertedValueString;
+      if (object == null) {
+        convertedValueString = _partitionColumnDefaultNullValue;
+      } else {
+        Object convertedValue = _avroRecordExtractor.convert(object);
+        Preconditions
+            .checkState(convertedValue != null, "Invalid value: %s for partition column: %s in record: %s", object,
+                _partitionColumn, record);
+        Preconditions.checkState(convertedValue instanceof Number || convertedValue instanceof String,
+            "Value for partition column: %s must be either a Number or a String, found: %s in record: %s",
+            _partitionColumn, convertedValue.getClass(), record);
+        convertedValueString = convertedValue.toString();
+      }
+      // NOTE: Always partition with String type value because Broker uses String type value to prune segments
+      return _partitionFunction.getPartition(convertedValueString);
+    }
   }
 }
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/OrcDataPreprocessingPartitioner.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/OrcDataPreprocessingPartitioner.java
index 914d145..853502c 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/OrcDataPreprocessingPartitioner.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/OrcDataPreprocessingPartitioner.java
@@ -20,9 +20,11 @@ package org.apache.pinot.hadoop.job.partitioners;
 
 import com.google.common.base.Preconditions;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.Partitioner;
 import org.apache.orc.mapred.OrcStruct;
 import org.apache.orc.mapred.OrcValue;
@@ -39,20 +41,30 @@ public class OrcDataPreprocessingPartitioner extends Partitioner<WritableCompara
   private Configuration _conf;
   private String _partitionColumn;
   private PartitionFunction _partitionFunction;
+  private String _partitionColumnDefaultNullValue;
   private int _partitionColumnId = -1;
+  private int _numReducers = -1;
+
+  private final AtomicInteger _counter = new AtomicInteger(0);
 
   @Override
   public void setConf(Configuration conf) {
     _conf = conf;
     _partitionColumn = conf.get(InternalConfigConstants.PARTITION_COLUMN_CONFIG);
     String partitionFunctionName = conf.get(InternalConfigConstants.PARTITION_FUNCTION_CONFIG);
-    int numPartitions = Integer.parseInt(conf.get(InternalConfigConstants.NUM_PARTITIONS_CONFIG));
-    _partitionFunction = PartitionFunctionFactory.getPartitionFunction(partitionFunctionName, numPartitions);
+    String numPartitionsString = conf.get(InternalConfigConstants.NUM_PARTITIONS_CONFIG);
+    int numPartitions = -1;
+    if (_partitionColumn != null) {
+      numPartitions = Integer.parseInt(numPartitionsString);
+      _partitionFunction = PartitionFunctionFactory.getPartitionFunction(partitionFunctionName, numPartitions);
+    } else {
+      _numReducers = Integer.parseInt(conf.get(MRJobConfig.NUM_REDUCES));
+    }
+    _partitionColumnDefaultNullValue = conf.get(InternalConfigConstants.PARTITION_COLUMN_DEFAULT_NULL_VALUE);
     LOGGER.info(
         "Initialized OrcDataPreprocessingPartitioner with partitionColumn: {}, partitionFunction: {}, numPartitions: "
-            + "{}",
-        _partitionColumn,
-        partitionFunctionName, numPartitions);
+            + "{}, default null value: {}",
+        _partitionColumn, partitionFunctionName, numPartitions, _partitionColumnDefaultNullValue);
   }
 
   @Override
@@ -62,25 +74,33 @@ public class OrcDataPreprocessingPartitioner extends Partitioner<WritableCompara
 
   @Override
   public int getPartition(WritableComparable key, OrcValue value, int numPartitions) {
-    OrcStruct orcStruct = (OrcStruct) value.value;
-    if (_partitionColumnId == -1) {
-      List<String> fieldNames = orcStruct.getSchema().getFieldNames();
-      _partitionColumnId = fieldNames.indexOf(_partitionColumn);
-      Preconditions.checkState(_partitionColumnId != -1, "Failed to find partition column: %s in the ORC fields: %s",
-          _partitionColumn, fieldNames);
-      LOGGER.info("Field id for partition column: {} is: {}", _partitionColumn, _partitionColumnId);
-    }
-    WritableComparable partitionColumnValue = orcStruct.getFieldValue(_partitionColumnId);
-    Object convertedValue;
-    try {
-      convertedValue = OrcUtils.convert(partitionColumnValue);
-    } catch (Exception e) {
-      throw new IllegalStateException(
-          String.format("Caught exception while processing partition column: %s, id: %d in ORC struct: %s",
-              _partitionColumn, _partitionColumnId, orcStruct),
-          e);
+    if (_partitionColumn == null) {
+      return Math.abs(_counter.getAndIncrement()) % _numReducers;
+    } else {
+      OrcStruct orcStruct = (OrcStruct) value.value;
+      if (_partitionColumnId == -1) {
+        List<String> fieldNames = orcStruct.getSchema().getFieldNames();
+        _partitionColumnId = fieldNames.indexOf(_partitionColumn);
+        Preconditions.checkState(_partitionColumnId != -1, "Failed to find partition column: %s in the ORC fields: %s",
+            _partitionColumn, fieldNames);
+        LOGGER.info("Field id for partition column: {} is: {}", _partitionColumn, _partitionColumnId);
+      }
+      WritableComparable partitionColumnValue = orcStruct.getFieldValue(_partitionColumnId);
+      String convertedValueString;
+      if (partitionColumnValue == null) {
+        convertedValueString = _partitionColumnDefaultNullValue;
+      } else {
+        try {
+          Object convertedValue = OrcUtils.convert(partitionColumnValue);
+          convertedValueString = convertedValue.toString();
+        } catch (Exception e) {
+          throw new IllegalStateException(String
+              .format("Caught exception while processing partition column: %s, id: %d in ORC struct: %s",
+                  _partitionColumn, _partitionColumnId, orcStruct), e);
+        }
+      }
+      // NOTE: Always partition with String type value because Broker uses String type value to prune segments
+      return _partitionFunction.getPartition(convertedValueString);
     }
-    // NOTE: Always partition with String type value because Broker uses String type value to prune segments
-    return _partitionFunction.getPartition(convertedValue.toString());
   }
 }
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelper.java
index 287c51b..dc51e71 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelper.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelper.java
@@ -58,9 +58,11 @@ public abstract class DataPreprocessingHelper {
   String _partitionColumn;
   int _numPartitions;
   String _partitionFunction;
+  String _partitionColumnDefaultNullValue;
 
   String _sortingColumn;
   private FieldSpec.DataType _sortingColumnType;
+  String _sortingColumnDefaultNullValue;
 
   private int _numOutputFiles;
   private int _maxNumRecordsPerFile;
@@ -79,16 +81,19 @@ public abstract class DataPreprocessingHelper {
   }
 
   public void registerConfigs(TableConfig tableConfig, Schema tableSchema, String partitionColumn, int numPartitions,
-      String partitionFunction, String sortingColumn, FieldSpec.DataType sortingColumnType, int numOutputFiles,
+      String partitionFunction, String partitionColumnDefaultNullValue, String sortingColumn,
+      FieldSpec.DataType sortingColumnType, String sortingColumnDefaultNullValue, int numOutputFiles,
       int maxNumRecordsPerFile) {
     _tableConfig = tableConfig;
     _pinotTableSchema = tableSchema;
     _partitionColumn = partitionColumn;
     _numPartitions = numPartitions;
     _partitionFunction = partitionFunction;
+    _partitionColumnDefaultNullValue = partitionColumnDefaultNullValue;
 
     _sortingColumn = sortingColumn;
     _sortingColumnType = sortingColumnType;
+    _sortingColumnDefaultNullValue = sortingColumnDefaultNullValue;
 
     _numOutputFiles = numOutputFiles;
     _maxNumRecordsPerFile = maxNumRecordsPerFile;
@@ -113,6 +118,7 @@ public abstract class DataPreprocessingHelper {
       LOGGER.info("Adding sorting column: {} to job config", _sortingColumn);
       jobConf.set(InternalConfigConstants.SORTING_COLUMN_CONFIG, _sortingColumn);
       jobConf.set(InternalConfigConstants.SORTING_COLUMN_TYPE, _sortingColumnType.name());
+      jobConf.set(InternalConfigConstants.SORTING_COLUMN_DEFAULT_NULL_VALUE, _sortingColumnDefaultNullValue);
 
       switch (_sortingColumnType) {
         case INT:
@@ -148,8 +154,8 @@ public abstract class DataPreprocessingHelper {
       if (_partitionFunction != null) {
         jobConf.set(InternalConfigConstants.PARTITION_FUNCTION_CONFIG, _partitionFunction);
       }
+      jobConf.set(InternalConfigConstants.PARTITION_COLUMN_DEFAULT_NULL_VALUE, _partitionColumnDefaultNullValue);
       jobConf.setInt(InternalConfigConstants.NUM_PARTITIONS_CONFIG, numReduceTasks);
-      job.setPartitionerClass(getPartitioner());
     } else {
       if (_numOutputFiles > 0) {
         numReduceTasks = _numOutputFiles;
@@ -158,6 +164,7 @@ public abstract class DataPreprocessingHelper {
         numReduceTasks = _inputDataPaths.size();
       }
     }
+    job.setPartitionerClass(getPartitioner());
     // Maximum number of records per output file
     jobConf
         .set(InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE, Integer.toString(_maxNumRecordsPerFile));
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelperTest.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelperTest.java
index dd132cf..2f97f72 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelperTest.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelperTest.java
@@ -22,6 +22,7 @@ import com.google.common.base.Preconditions;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.pinot.hadoop.job.InternalConfigConstants;
@@ -35,6 +36,7 @@ import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.testng.annotations.Test;
 
+import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 
@@ -66,11 +68,23 @@ public class DataPreprocessingHelperTest {
 
     Schema schema = new Schema.SchemaBuilder()
         .addDateTime("time_day", FieldSpec.DataType.LONG, "1:MILLISECONDS:EPOCH", "1:MILLISECONDS").build();
-    dataPreprocessingHelper.registerConfigs(tableConfig, schema, "column2", 1, "Murmur", "column4",
-        FieldSpec.DataType.INT, 0, 0);
+    dataPreprocessingHelper.registerConfigs(tableConfig, schema, "column2", 1, "Murmur", "0", "column4",
+        FieldSpec.DataType.INT, "0", 0, 0);
 
     Job job = dataPreprocessingHelper.setUpJob();
+    Configuration conf = job.getConfiguration();
     assertNotNull(job);
-    assertNull(job.getConfiguration().get(InternalConfigConstants.SEGMENT_TIME_SDF_PATTERN));
+    assertNull(conf.get(InternalConfigConstants.SEGMENT_TIME_SDF_PATTERN));
+
+    // Validate partitioning configs.
+    assertEquals(conf.get(InternalConfigConstants.PARTITION_COLUMN_CONFIG), "column2");
+    assertEquals(conf.get(InternalConfigConstants.PARTITION_FUNCTION_CONFIG), "Murmur");
+    assertEquals(conf.get(InternalConfigConstants.NUM_PARTITIONS_CONFIG), "1");
+    assertEquals(conf.get(InternalConfigConstants.PARTITION_COLUMN_DEFAULT_NULL_VALUE), "0");
+
+    // Validate sorting configs.
+    assertEquals(conf.get(InternalConfigConstants.SORTING_COLUMN_CONFIG), "column4");
+    assertEquals(conf.get(InternalConfigConstants.SORTING_COLUMN_TYPE), "INT");
+    assertEquals(conf.get(InternalConfigConstants.SORTING_COLUMN_DEFAULT_NULL_VALUE), "0");
   }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org