You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2021/05/26 03:57:52 UTC

[incubator-pinot] branch master updated: Support null value fields in generic row ser/de (#6968)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new c76ce2a  Support null value fields in generic row ser/de (#6968)
c76ce2a is described below

commit c76ce2aa9616b4b8ac61a99776f56be525e9c0f8
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Tue May 25 20:57:21 2021 -0700

    Support null value fields in generic row ser/de (#6968)
    
    - Replace `GenericRowSerDeUtils` with `GenericRowSerializer` and `GenericRowDeserailizer` for the following enhancement:
      - Support null value fields ser/de to preserve the null fields information
      - Cache the encoded string bytes to avoid encoding strings twice
      - Support partial deserialize values to only deserialize the needed fields when sorting the rows
    - Add `SortOrderComparator` to compare the partial deserialized values
    - Modify `ConcatCollector` to use the new classes
    - Add `GenericRowSerDeTest` to test the new classes
---
 .../processing/collector/ConcatCollector.java      |  65 +++--
 .../processing/collector/SortOrderComparator.java  |  72 +++++
 .../processing/serde/GenericRowDeserializer.java   | 223 +++++++++++++++
 .../processing/serde/GenericRowSerializer.java     | 237 ++++++++++++++++
 .../pinot/core/util/GenericRowSerDeUtils.java      | 316 ---------------------
 .../processing/serde/GenericRowSerDeTest.java      | 137 +++++++++
 .../apache/pinot/spi/data/readers/GenericRow.java  |   7 +
 7 files changed, 717 insertions(+), 340 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/ConcatCollector.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/ConcatCollector.java
index 7fad934..d760274 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/ConcatCollector.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/ConcatCollector.java
@@ -26,15 +26,16 @@ import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
 import javax.annotation.Nullable;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.io.FileUtils;
-import org.apache.pinot.core.util.GenericRowSerDeUtils;
+import org.apache.pinot.core.segment.processing.serde.GenericRowDeserializer;
+import org.apache.pinot.core.segment.processing.serde.GenericRowSerializer;
 import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
 import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
 
@@ -46,11 +47,14 @@ public class ConcatCollector implements Collector {
   private static final String RECORDS_FILE_NAME = "collector.records";
 
   private final List<FieldSpec> _fieldSpecs = new ArrayList<>();
-  private final Comparator<GenericRow> _genericRowComparator;
-  private int _numDocs;
-
+  private final GenericRowSerializer _genericRowSerializer;
+  private final int _numSortColumns;
+  private final SortOrderComparator _sortOrderComparator;
   private final File _workingDir;
   private final File _collectorRecordFile;
+
+  private int _numDocs;
+
   // TODO: Avoid using BufferedOutputStream, and use ByteBuffer directly.
   //  However, ByteBuffer has a limitation that the size cannot exceed 2G.
   //  There are no limits on the size of data inserted into the {@link Collector}.
@@ -58,21 +62,38 @@ public class ConcatCollector implements Collector {
   private BufferedOutputStream _collectorRecordOutputStream;
   private List<Long> _collectorRecordOffsets;
   private PinotDataBuffer _collectorRecordBuffer;
+  private GenericRowDeserializer _genericRowDeserializer;
 
   public ConcatCollector(CollectorConfig collectorConfig, Schema schema) {
-
-    for (FieldSpec spec : schema.getAllFieldSpecs()) {
-      if (!spec.isVirtualColumn()) {
-        _fieldSpecs.add(spec);
-      }
-    }
     List<String> sortOrder = collectorConfig.getSortOrder();
     if (CollectionUtils.isNotEmpty(sortOrder)) {
-      GenericRowSorter sorter = new GenericRowSorter(sortOrder, schema);
-      _genericRowComparator = sorter.getGenericRowComparator();
+      _numSortColumns = sortOrder.size();
+      DataType[] sortColumnStoredTypes = new DataType[_numSortColumns];
+      for (int i = 0; i < _numSortColumns; i++) {
+        String sortColumn = sortOrder.get(i);
+        FieldSpec fieldSpec = schema.getFieldSpecFor(sortColumn);
+        Preconditions.checkArgument(fieldSpec != null, "Failed to find sort column: %s", sortColumn);
+        Preconditions.checkArgument(fieldSpec.isSingleValueField(), "Cannot sort on MV column: %s", sortColumn);
+        sortColumnStoredTypes[i] = fieldSpec.getDataType().getStoredType();
+        _fieldSpecs.add(fieldSpec);
+      }
+      _sortOrderComparator = new SortOrderComparator(_numSortColumns, sortColumnStoredTypes);
+      for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
+        if (!fieldSpec.isVirtualColumn() && !sortOrder.contains(fieldSpec.getName())) {
+          _fieldSpecs.add(fieldSpec);
+        }
+      }
     } else {
-      _genericRowComparator = null;
+      _numSortColumns = 0;
+      _sortOrderComparator = null;
+      for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
+        if (!fieldSpec.isVirtualColumn()) {
+          _fieldSpecs.add(fieldSpec);
+        }
+      }
     }
+    // TODO: Pass 'includeNullFields' from the config
+    _genericRowSerializer = new GenericRowSerializer(_fieldSpecs, true);
 
     _workingDir =
         new File(FileUtils.getTempDirectory(), String.format("concat_collector_%d", System.currentTimeMillis()));
@@ -84,7 +105,6 @@ public class ConcatCollector implements Collector {
   }
 
   private void initializeBuffer() {
-
     Preconditions.checkState(!_collectorRecordFile.exists(),
         "Collector record file: " + _collectorRecordFile + " already exists");
     try {
@@ -100,7 +120,7 @@ public class ConcatCollector implements Collector {
   @Override
   public void collect(GenericRow genericRow)
       throws IOException {
-    byte[] genericRowBytes = GenericRowSerDeUtils.serializeGenericRow(genericRow, _fieldSpecs);
+    byte[] genericRowBytes = _genericRowSerializer.serialize(genericRow);
     _collectorRecordOutputStream.write(genericRowBytes);
     _collectorRecordOffsets.add(_collectorRecordOffsets.get(_numDocs) + genericRowBytes.length);
     _numDocs++;
@@ -109,14 +129,14 @@ public class ConcatCollector implements Collector {
   @Override
   public Iterator<GenericRow> iterator()
       throws IOException {
-
     _collectorRecordOutputStream.flush();
     _collectorRecordBuffer = PinotDataBuffer
         .mapFile(_collectorRecordFile, true, 0, _collectorRecordOffsets.get(_numDocs), PinotDataBuffer.NATIVE_ORDER,
             "ConcatCollector: generic row buffer");
+    _genericRowDeserializer = new GenericRowDeserializer(_collectorRecordBuffer, _fieldSpecs, true);
 
     // TODO: A lot of this code can be made common across Collectors, once {@link RollupCollector} is also converted to off heap implementation
-    if (_genericRowComparator != null) {
+    if (_numSortColumns != 0) {
       int[] sortedDocIds = new int[_numDocs];
       for (int i = 0; i < _numDocs; i++) {
         sortedDocIds[i] = i;
@@ -125,11 +145,8 @@ public class ConcatCollector implements Collector {
       Arrays.quickSort(0, _numDocs, (i1, i2) -> {
         long startOffset1 = _collectorRecordOffsets.get(sortedDocIds[i1]);
         long startOffset2 = _collectorRecordOffsets.get(sortedDocIds[i2]);
-        GenericRow row1 = GenericRowSerDeUtils
-            .deserializeGenericRow(_collectorRecordBuffer, startOffset1, _fieldSpecs, new GenericRow());
-        GenericRow row2 = GenericRowSerDeUtils
-            .deserializeGenericRow(_collectorRecordBuffer, startOffset2, _fieldSpecs, new GenericRow());
-        return _genericRowComparator.compare(row1, row2);
+        return _sortOrderComparator.compare(_genericRowDeserializer.partialDeserialize(startOffset1, _numSortColumns),
+            _genericRowDeserializer.partialDeserialize(startOffset2, _numSortColumns));
       }, (i1, i2) -> {
         int temp = sortedDocIds[i1];
         sortedDocIds[i1] = sortedDocIds[i2];
@@ -159,7 +176,7 @@ public class ConcatCollector implements Collector {
         } else {
           offset = _collectorRecordOffsets.get(sortedDocIds[_nextDocId++]);
         }
-        return GenericRowSerDeUtils.deserializeGenericRow(_collectorRecordBuffer, offset, _fieldSpecs, _reuse);
+        return _genericRowDeserializer.deserialize(offset, _reuse);
       }
     };
   }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/SortOrderComparator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/SortOrderComparator.java
new file mode 100644
index 0000000..34b63b6
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/SortOrderComparator.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.collector;
+
+import java.util.Comparator;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.utils.ByteArray;
+
+
+/**
+ * Comparator for values of the sort columns.
+ */
+public class SortOrderComparator implements Comparator<Object[]> {
+  private final int _numSortColumns;
+  private final DataType[] _sortColumnStoredTypes;
+
+  public SortOrderComparator(int numSortColumns, DataType[] sortColumnStoredTypes) {
+    _numSortColumns = numSortColumns;
+    _sortColumnStoredTypes = sortColumnStoredTypes;
+  }
+
+  @Override
+  public int compare(Object[] o1, Object[] o2) {
+    for (int i = 0; i < _numSortColumns; i++) {
+      Object value1 = o1[i];
+      Object value2 = o2[i];
+      int result;
+      switch (_sortColumnStoredTypes[i]) {
+        case INT:
+          result = Integer.compare((int) value1, (int) value2);
+          break;
+        case LONG:
+          result = Long.compare((long) value1, (long) value2);
+          break;
+        case FLOAT:
+          result = Float.compare((float) value1, (float) value2);
+          break;
+        case DOUBLE:
+          result = Double.compare((double) value1, (double) value2);
+          break;
+        case STRING:
+          result = ((String) value1).compareTo((String) value2);
+          break;
+        case BYTES:
+          result = ByteArray.compare((byte[]) value1, (byte[]) value2);
+          break;
+        default:
+          throw new IllegalStateException("Unsupported sort column stored type: " + _sortColumnStoredTypes[i]);
+      }
+      if (result != 0) {
+        return result;
+      }
+    }
+    return 0;
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/serde/GenericRowDeserializer.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/serde/GenericRowDeserializer.java
new file mode 100644
index 0000000..83a5863
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/serde/GenericRowDeserializer.java
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.serde;
+
+import com.google.common.base.Preconditions;
+import java.util.List;
+import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.utils.StringUtils;
+
+
+/**
+ * Utility class to deserialize the {@link GenericRow}.
+ * The bytes are read in NATIVE order. The data should be serialized by the {@link GenericRowSerializer} on the same
+ * host to ensure that both of them are using the same byte order.
+ */
+public class GenericRowDeserializer {
+  private final PinotDataBuffer _dataBuffer;
+  private final int _numFields;
+  private final String[] _fieldNames;
+  private final boolean[] _isSingleValueFields;
+  private final DataType[] _storedTypes;
+  private final boolean _includeNullFields;
+
+  public GenericRowDeserializer(PinotDataBuffer dataBuffer, List<FieldSpec> fieldSpecs, boolean includeNullFields) {
+    _dataBuffer = dataBuffer;
+    _numFields = fieldSpecs.size();
+    _fieldNames = new String[_numFields];
+    _isSingleValueFields = new boolean[_numFields];
+    _storedTypes = new DataType[_numFields];
+    for (int i = 0; i < _numFields; i++) {
+      FieldSpec fieldSpec = fieldSpecs.get(i);
+      _fieldNames[i] = fieldSpec.getName();
+      _isSingleValueFields[i] = fieldSpec.isSingleValueField();
+      _storedTypes[i] = fieldSpec.getDataType().getStoredType();
+    }
+    _includeNullFields = includeNullFields;
+  }
+
+  /**
+   * Deserializes the {@link GenericRow} at the given offset.
+   */
+  public GenericRow deserialize(long offset, GenericRow reuse) {
+    reuse.clear();
+
+    for (int i = 0; i < _numFields; i++) {
+      String fieldName = _fieldNames[i];
+
+      if (_isSingleValueFields[i]) {
+        switch (_storedTypes[i]) {
+          case INT:
+            int intValue = _dataBuffer.getInt(offset);
+            reuse.putValue(fieldName, intValue);
+            offset += Integer.BYTES;
+            break;
+          case LONG:
+            long longValue = _dataBuffer.getLong(offset);
+            reuse.putValue(fieldName, longValue);
+            offset += Long.BYTES;
+            break;
+          case FLOAT:
+            float floatValue = _dataBuffer.getFloat(offset);
+            reuse.putValue(fieldName, floatValue);
+            offset += Float.BYTES;
+            break;
+          case DOUBLE:
+            double doubleValue = _dataBuffer.getDouble(offset);
+            reuse.putValue(fieldName, doubleValue);
+            offset += Double.BYTES;
+            break;
+          case STRING: {
+            int numBytes = _dataBuffer.getInt(offset);
+            offset += Integer.BYTES;
+            byte[] stringBytes = new byte[numBytes];
+            _dataBuffer.copyTo(offset, stringBytes);
+            offset += numBytes;
+            reuse.putValue(fieldName, StringUtils.decodeUtf8(stringBytes));
+            break;
+          }
+          case BYTES: {
+            int numBytes = _dataBuffer.getInt(offset);
+            offset += Integer.BYTES;
+            byte[] bytes = new byte[numBytes];
+            _dataBuffer.copyTo(offset, bytes);
+            offset += numBytes;
+            reuse.putValue(fieldName, bytes);
+            break;
+          }
+          default:
+            throw new IllegalStateException("Unsupported SV stored type: " + _storedTypes[i]);
+        }
+      } else {
+        int numValues = _dataBuffer.getInt(offset);
+        offset += Integer.BYTES;
+        Object[] multiValue = new Object[numValues];
+
+        switch (_storedTypes[i]) {
+          case INT:
+            for (int j = 0; j < numValues; j++) {
+              multiValue[j] = _dataBuffer.getInt(offset);
+              offset += Integer.BYTES;
+            }
+            break;
+          case LONG:
+            for (int j = 0; j < numValues; j++) {
+              multiValue[j] = _dataBuffer.getLong(offset);
+              offset += Long.BYTES;
+            }
+            break;
+          case FLOAT:
+            for (int j = 0; j < numValues; j++) {
+              multiValue[j] = _dataBuffer.getFloat(offset);
+              offset += Float.BYTES;
+            }
+            break;
+          case DOUBLE:
+            for (int j = 0; j < numValues; j++) {
+              multiValue[j] = _dataBuffer.getDouble(offset);
+              offset += Double.BYTES;
+            }
+            break;
+          case STRING:
+            for (int j = 0; j < numValues; j++) {
+              int numBytes = _dataBuffer.getInt(offset);
+              offset += Integer.BYTES;
+              byte[] stringBytes = new byte[numBytes];
+              _dataBuffer.copyTo(offset, stringBytes);
+              offset += numBytes;
+              multiValue[j] = StringUtils.decodeUtf8(stringBytes);
+            }
+            break;
+          default:
+            throw new IllegalStateException("Unsupported MV stored type: " + _storedTypes[i]);
+        }
+
+        reuse.putValue(fieldName, multiValue);
+      }
+    }
+
+    // Deserialize null fields if enabled
+    if (_includeNullFields) {
+      int numNullFields = _dataBuffer.getInt(offset);
+      offset += Integer.BYTES;
+      for (int i = 0; i < numNullFields; i++) {
+        reuse.addNullValueField(_fieldNames[_dataBuffer.getInt(offset)]);
+        offset += Integer.BYTES;
+      }
+    }
+
+    return reuse;
+  }
+
+  /**
+   * Deserializes the first several fields at the given offset. This method can be used to sort the generic rows without
+   * fully deserialize the whole row for each comparison. The selected fields should all be single-valued.
+   */
+  public Object[] partialDeserialize(long offset, int numFields) {
+    Object[] values = new Object[numFields];
+
+    for (int i = 0; i < numFields; i++) {
+      Preconditions.checkState(_isSingleValueFields[i], "Partial deserialize should not be applied to MV column: %s",
+          _fieldNames[i]);
+      switch (_storedTypes[i]) {
+        case INT:
+          values[i] = _dataBuffer.getInt(offset);
+          offset += Integer.BYTES;
+          break;
+        case LONG:
+          values[i] = _dataBuffer.getLong(offset);
+          offset += Long.BYTES;
+          break;
+        case FLOAT:
+          values[i] = _dataBuffer.getFloat(offset);
+          offset += Float.BYTES;
+          break;
+        case DOUBLE:
+          values[i] = _dataBuffer.getDouble(offset);
+          offset += Double.BYTES;
+          break;
+        case STRING: {
+          int numBytes = _dataBuffer.getInt(offset);
+          offset += Integer.BYTES;
+          byte[] stringBytes = new byte[numBytes];
+          _dataBuffer.copyTo(offset, stringBytes);
+          offset += numBytes;
+          values[i] = StringUtils.decodeUtf8(stringBytes);
+          break;
+        }
+        case BYTES: {
+          int numBytes = _dataBuffer.getInt(offset);
+          offset += Integer.BYTES;
+          byte[] bytes = new byte[numBytes];
+          _dataBuffer.copyTo(offset, bytes);
+          offset += numBytes;
+          values[i] = bytes;
+          break;
+        }
+        default:
+          throw new IllegalStateException("Unsupported SV stored type: " + _storedTypes[i]);
+      }
+    }
+
+    return values;
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/serde/GenericRowSerializer.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/serde/GenericRowSerializer.java
new file mode 100644
index 0000000..7d70201
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/serde/GenericRowSerializer.java
@@ -0,0 +1,237 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.serde;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.utils.StringUtils;
+
+
+/**
+ * Utility class to serialize the {@link GenericRow}.
+ * The bytes are stored in NATIVE order. The data should be deserialized by the {@link GenericRowDeserializer} on the
+ * same host to ensure that both of them are using the same byte order.
+ */
+public class GenericRowSerializer {
+  private final int _numFields;
+  private final String[] _fieldNames;
+  private final boolean[] _isSingleValueFields;
+  private final DataType[] _storedTypes;
+  // Cache the encoded string bytes
+  private final Object[] _stringBytes;
+  // Store index for null fields
+  private final Map<String, Integer> _fieldIndexMap;
+  private final int[] _nullFieldIndexes;
+
+  public GenericRowSerializer(List<FieldSpec> fieldSpecs, boolean includeNullFields) {
+    _numFields = fieldSpecs.size();
+    _fieldNames = new String[_numFields];
+    _isSingleValueFields = new boolean[_numFields];
+    _storedTypes = new DataType[_numFields];
+    _stringBytes = new Object[_numFields];
+    for (int i = 0; i < _numFields; i++) {
+      FieldSpec fieldSpec = fieldSpecs.get(i);
+      _fieldNames[i] = fieldSpec.getName();
+      _isSingleValueFields[i] = fieldSpec.isSingleValueField();
+      _storedTypes[i] = fieldSpec.getDataType().getStoredType();
+    }
+    if (includeNullFields) {
+      _fieldIndexMap = new HashMap<>();
+      for (int i = 0; i < _numFields; i++) {
+        _fieldIndexMap.put(_fieldNames[i], i);
+      }
+      _nullFieldIndexes = new int[_numFields];
+    } else {
+      _fieldIndexMap = null;
+      _nullFieldIndexes = null;
+    }
+  }
+
+  /**
+   * Serializes the given {@link GenericRow}.
+   */
+  public byte[] serialize(GenericRow row) {
+    int numBytes = 0;
+
+    // First pass: calculate the number of bytes required
+    for (int i = 0; i < _numFields; i++) {
+      Object value = row.getValue(_fieldNames[i]);
+
+      if (_isSingleValueFields[i]) {
+        switch (_storedTypes[i]) {
+          case INT:
+            numBytes += Integer.BYTES;
+            break;
+          case LONG:
+            numBytes += Long.BYTES;
+            break;
+          case FLOAT:
+            numBytes += Float.BYTES;
+            break;
+          case DOUBLE:
+            numBytes += Double.BYTES;
+            break;
+          case STRING:
+            byte[] stringBytes = StringUtils.encodeUtf8((String) value);
+            numBytes += Integer.BYTES + stringBytes.length;
+            _stringBytes[i] = stringBytes;
+            break;
+          case BYTES:
+            numBytes += Integer.BYTES + ((byte[]) value).length;
+            break;
+          default:
+            throw new IllegalStateException("Unsupported SV stored type: " + _storedTypes[i]);
+        }
+      } else {
+        Object[] multiValue = (Object[]) value;
+        int numValues = multiValue.length;
+        numBytes += Integer.BYTES; // Number of values
+
+        switch (_storedTypes[i]) {
+          case INT:
+            numBytes += Integer.BYTES * numValues;
+            break;
+          case LONG:
+            numBytes += Long.BYTES * numValues;
+            break;
+          case FLOAT:
+            numBytes += Float.BYTES * numValues;
+            break;
+          case DOUBLE:
+            numBytes += Double.BYTES * numValues;
+            break;
+          case STRING:
+            numBytes += Integer.BYTES * numValues;
+            byte[][] stringBytesArray = new byte[numValues][];
+            for (int j = 0; j < numValues; j++) {
+              byte[] stringBytes = StringUtils.encodeUtf8((String) multiValue[j]);
+              numBytes += stringBytes.length;
+              stringBytesArray[j] = stringBytes;
+            }
+            _stringBytes[i] = stringBytesArray;
+            break;
+          default:
+            throw new IllegalStateException("Unsupported MV stored type: " + _storedTypes[i]);
+        }
+      }
+    }
+
+    // Serialize null fields if enabled
+    int numNullFields = 0;
+    if (_fieldIndexMap != null) {
+      Set<String> nullFields = row.getNullValueFields();
+      for (String nullField : nullFields) {
+        Integer nullFieldIndex = _fieldIndexMap.get(nullField);
+        if (nullFieldIndex != null) {
+          _nullFieldIndexes[numNullFields++] = nullFieldIndex;
+        }
+      }
+      numBytes += Integer.BYTES * (1 + numNullFields);
+    }
+
+    byte[] serializedBytes = new byte[numBytes];
+    ByteBuffer byteBuffer = ByteBuffer.wrap(serializedBytes).order(PinotDataBuffer.NATIVE_ORDER);
+
+    // Second pass: serialize the values
+    for (int i = 0; i < _numFields; i++) {
+      Object value = row.getValue(_fieldNames[i]);
+
+      if (_isSingleValueFields[i]) {
+        switch (_storedTypes[i]) {
+          case INT:
+            byteBuffer.putInt((int) value);
+            break;
+          case LONG:
+            byteBuffer.putLong((long) value);
+            break;
+          case FLOAT:
+            byteBuffer.putFloat((float) value);
+            break;
+          case DOUBLE:
+            byteBuffer.putDouble((double) value);
+            break;
+          case STRING:
+            byte[] stringBytes = (byte[]) _stringBytes[i];
+            byteBuffer.putInt(stringBytes.length);
+            byteBuffer.put(stringBytes);
+            break;
+          case BYTES:
+            byte[] bytes = (byte[]) value;
+            byteBuffer.putInt(bytes.length);
+            byteBuffer.put(bytes);
+            break;
+          default:
+            throw new IllegalStateException("Unsupported SV stored type: " + _storedTypes[i]);
+        }
+      } else {
+        Object[] multiValue = (Object[]) value;
+        byteBuffer.putInt(multiValue.length);
+
+        switch (_storedTypes[i]) {
+          case INT:
+            for (Object element : multiValue) {
+              byteBuffer.putInt((int) element);
+            }
+            break;
+          case LONG:
+            for (Object element : multiValue) {
+              byteBuffer.putLong((long) element);
+            }
+            break;
+          case FLOAT:
+            for (Object element : multiValue) {
+              byteBuffer.putFloat((float) element);
+            }
+            break;
+          case DOUBLE:
+            for (Object element : multiValue) {
+              byteBuffer.putDouble((double) element);
+            }
+            break;
+          case STRING:
+            byte[][] stringBytesArray = (byte[][]) _stringBytes[i];
+            for (byte[] stringBytes : stringBytesArray) {
+              byteBuffer.putInt(stringBytes.length);
+              byteBuffer.put(stringBytes);
+            }
+            break;
+          default:
+            throw new IllegalStateException("Unsupported MV stored type: " + _storedTypes[i]);
+        }
+      }
+    }
+
+    // Serialize null fields if enabled
+    if (_fieldIndexMap != null) {
+      byteBuffer.putInt(numNullFields);
+      for (int i = 0; i < numNullFields; i++) {
+        byteBuffer.putInt(_nullFieldIndexes[i]);
+      }
+    }
+
+    return serializedBytes;
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/util/GenericRowSerDeUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/util/GenericRowSerDeUtils.java
deleted file mode 100644
index 2505c8c..0000000
--- a/pinot-core/src/main/java/org/apache/pinot/core/util/GenericRowSerDeUtils.java
+++ /dev/null
@@ -1,316 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.util;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
-import org.apache.pinot.spi.data.FieldSpec;
-import org.apache.pinot.spi.data.readers.GenericRow;
-import org.apache.pinot.spi.utils.StringUtils;
-
-
-/**
- * Utility methods for serde of {@link GenericRow}
- * Deserialization assumes it is deserializing from a {@link PinotDataBuffer}
- */
-public final class GenericRowSerDeUtils {
-
-  private GenericRowSerDeUtils() {
-
-  }
-
-  /**
-   * Serialize the given GenericRow. The data is stored in native byte order.
-   * @param genericRow GenericRow to serialize
-   * @param fieldSpecs the fields to serialize
-   * @return serialized bytes
-   */
-  public static byte[] serializeGenericRow(GenericRow genericRow, List<FieldSpec> fieldSpecs) {
-    int numBytes = 0;
-
-    for (FieldSpec fieldSpec : fieldSpecs) {
-      Object value = genericRow.getValue(fieldSpec.getName());
-
-      if (fieldSpec.isSingleValueField()) {
-        switch (fieldSpec.getDataType().getStoredType()) {
-
-          case INT:
-            numBytes += Integer.BYTES;
-            break;
-          case LONG:
-            numBytes += Long.BYTES;
-            break;
-          case FLOAT:
-            numBytes += Float.BYTES;
-            break;
-          case DOUBLE:
-            numBytes += Double.BYTES;
-            break;
-          case STRING:
-            byte[] stringBytes = StringUtils.encodeUtf8((String) value);
-            numBytes += Integer.BYTES; // string length
-            numBytes += stringBytes.length;
-            break;
-          case BYTES:
-            numBytes += Integer.BYTES; // byte array length
-            numBytes += ((byte[]) value).length;
-            break;
-          default:
-            throw new UnsupportedOperationException(
-                String.format("DataType '%s' not supported", fieldSpec.getDataType()));
-        }
-      } else {
-        Object[] multiValue = (Object[]) value;
-        numBytes += Integer.BYTES; // array length
-
-        switch (fieldSpec.getDataType().getStoredType()) {
-          case INT:
-            numBytes += Integer.BYTES * multiValue.length;
-            break;
-          case LONG:
-            numBytes += Long.BYTES * multiValue.length;
-            break;
-          case FLOAT:
-            numBytes += Float.BYTES * multiValue.length;
-            break;
-          case DOUBLE:
-            numBytes += Double.BYTES * multiValue.length;
-            break;
-          case STRING:
-            for (Object element : multiValue) {
-              byte[] stringBytes = StringUtils.encodeUtf8((String) element);
-              numBytes += Integer.BYTES; // string length
-              numBytes += stringBytes.length;
-            }
-            break;
-          case BYTES:
-            for (Object element : multiValue) {
-              numBytes += Integer.BYTES; // byte array length
-              numBytes += ((byte[]) element).length;
-            }
-            break;
-          default:
-            throw new UnsupportedOperationException(
-                String.format("DataType '%s' not supported", fieldSpec.getDataType()));
-        }
-      }
-    }
-
-    byte[] genericRowBytes = new byte[numBytes];
-    ByteBuffer byteBuffer = ByteBuffer.wrap(genericRowBytes).order(PinotDataBuffer.NATIVE_ORDER);
-
-    for (FieldSpec fieldSpec : fieldSpecs) {
-      Object value = genericRow.getValue(fieldSpec.getName());
-
-      if (fieldSpec.isSingleValueField()) {
-        switch (fieldSpec.getDataType().getStoredType()) {
-
-          case INT:
-            byteBuffer.putInt((int) value);
-            break;
-          case LONG:
-            byteBuffer.putLong((long) value);
-            break;
-          case FLOAT:
-            byteBuffer.putFloat((float) value);
-            break;
-          case DOUBLE:
-            byteBuffer.putDouble((double) value);
-            break;
-          case STRING:
-            byte[] stringBytes = StringUtils.encodeUtf8((String) value);
-            byteBuffer.putInt(stringBytes.length);
-            byteBuffer.put(stringBytes);
-            break;
-          case BYTES:
-            byte[] bytes = (byte[]) value;
-            byteBuffer.putInt(bytes.length);
-            byteBuffer.put(bytes);
-            break;
-          default:
-            throw new UnsupportedOperationException(
-                String.format("DataType '%s' not supported", fieldSpec.getDataType()));
-        }
-      } else {
-        Object[] multiValue = (Object[]) value;
-        byteBuffer.putInt(multiValue.length);
-
-        switch (fieldSpec.getDataType().getStoredType()) {
-
-          case INT:
-            for (Object element : multiValue) {
-              byteBuffer.putInt((int) element);
-            }
-            break;
-          case LONG:
-            for (Object element : multiValue) {
-              byteBuffer.putLong((long) element);
-            }
-            break;
-          case FLOAT:
-            for (Object element : multiValue) {
-              byteBuffer.putFloat((float) element);
-            }
-            break;
-          case DOUBLE:
-            for (Object element : multiValue) {
-              byteBuffer.putDouble((double) element);
-            }
-            break;
-          case STRING:
-            for (Object element : multiValue) {
-              byte[] stringBytes = StringUtils.encodeUtf8((String) element);
-              byteBuffer.putInt(stringBytes.length);
-              byteBuffer.put(stringBytes);
-            }
-            break;
-          case BYTES:
-            for (Object element : multiValue) {
-              byte[] bytes = (byte[]) element;
-              byteBuffer.putInt(bytes.length);
-              byteBuffer.put(bytes);
-            }
-            break;
-          default:
-            throw new UnsupportedOperationException(
-                String.format("DataType '%s' not supported", fieldSpec.getDataType()));
-        }
-      }
-    }
-    return genericRowBytes;
-  }
-
-  /**
-   * Deserializes bytes from the native order data buffer to GenericRow
-   * @param dataBuffer the pinot data buffer
-   * @param offset offset to begin reading from
-   * @param fieldSpecs list of field specs to determine fields in deserialization
-   * @param reuse GenericRow object for returning
-   * @return Deserialized GenericRow
-   */
-  public static GenericRow deserializeGenericRow(PinotDataBuffer dataBuffer, long offset, List<FieldSpec> fieldSpecs,
-      GenericRow reuse) {
-    for (FieldSpec fieldSpec : fieldSpecs) {
-      String fieldName = fieldSpec.getName();
-      if (fieldSpec.isSingleValueField()) {
-        switch (fieldSpec.getDataType().getStoredType()) {
-
-          case INT:
-            int intValue = dataBuffer.getInt(offset);
-            reuse.putValue(fieldName, intValue);
-            offset += Integer.BYTES;
-            break;
-          case LONG:
-            long longValue = dataBuffer.getLong(offset);
-            reuse.putValue(fieldName, longValue);
-            offset += Long.BYTES;
-            break;
-          case FLOAT:
-            float floatValue = dataBuffer.getFloat(offset);
-            reuse.putValue(fieldName, floatValue);
-            offset += Float.BYTES;
-            break;
-          case DOUBLE:
-            double doubleValue = dataBuffer.getDouble(offset);
-            reuse.putValue(fieldName, doubleValue);
-            offset += Double.BYTES;
-            break;
-          case STRING:
-            int stringSize = dataBuffer.getInt(offset);
-            offset += Integer.BYTES;
-            byte[] stringBytes = new byte[stringSize];
-            dataBuffer.copyTo(offset, stringBytes);
-            offset += stringSize;
-            reuse.putValue(fieldName, StringUtils.decodeUtf8(stringBytes));
-            break;
-          case BYTES:
-            int bytesSize = dataBuffer.getInt(offset);
-            offset += Integer.BYTES;
-            byte[] bytes = new byte[bytesSize];
-            dataBuffer.copyTo(offset, bytes);
-            offset += bytesSize;
-            reuse.putValue(fieldName, bytes);
-            break;
-          default:
-            throw new UnsupportedOperationException(
-                String.format("DataType '%s' not supported", fieldSpec.getDataType()));
-        }
-      } else {
-
-        int numMultiValues = dataBuffer.getInt(offset);
-        offset += Integer.BYTES;
-        Object[] values = new Object[numMultiValues];
-
-        switch (fieldSpec.getDataType().getStoredType()) {
-
-          case INT:
-            for (int i = 0; i < numMultiValues; i++) {
-              values[i] = dataBuffer.getInt(offset);
-              offset += Integer.BYTES;
-            }
-            break;
-          case LONG:
-            for (int i = 0; i < numMultiValues; i++) {
-              values[i] = dataBuffer.getLong(offset);
-              offset += Long.BYTES;
-            }
-            break;
-          case FLOAT:
-            for (int i = 0; i < numMultiValues; i++) {
-              values[i] = dataBuffer.getFloat(offset);
-              offset += Float.BYTES;
-            }
-            break;
-          case DOUBLE:
-            for (int i = 0; i < numMultiValues; i++) {
-              values[i] = dataBuffer.getDouble(offset);
-              offset += Double.BYTES;
-            }
-            break;
-          case STRING:
-            for (int i = 0; i < numMultiValues; i++) {
-              int stringSize = dataBuffer.getInt(offset);
-              offset += Integer.BYTES;
-              byte[] stringBytes = new byte[stringSize];
-              dataBuffer.copyTo(offset, stringBytes);
-              offset += stringSize;
-              values[i] = StringUtils.decodeUtf8(stringBytes);
-            }
-            break;
-          case BYTES:
-            for (int i = 0; i < numMultiValues; i++) {
-              int bytesSize = dataBuffer.getInt(offset);
-              offset += Integer.BYTES;
-              byte[] bytes = new byte[bytesSize];
-              dataBuffer.copyTo(offset, bytes);
-              offset += bytesSize;
-              values[i] = bytes;
-            }
-            break;
-          default:
-            throw new UnsupportedOperationException(
-                String.format("DataType '%s' not supported", fieldSpec.getDataType()));
-        }
-        reuse.putValue(fieldName, values);
-      }
-    }
-    return reuse;
-  }
-}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/serde/GenericRowSerDeTest.java b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/serde/GenericRowSerDeTest.java
new file mode 100644
index 0000000..fc2d8ab
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/serde/GenericRowSerDeTest.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.serde;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
+import org.apache.pinot.spi.data.DimensionFieldSpec;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+
+public class GenericRowSerDeTest {
+  private List<FieldSpec> _fieldSpecs;
+  private GenericRow _row;
+
+  @BeforeClass
+  public void setUp() {
+    _fieldSpecs = Arrays.asList(new DimensionFieldSpec("intSV", DataType.INT, true),
+        new DimensionFieldSpec("longSV", DataType.LONG, true), new DimensionFieldSpec("floatSV", DataType.FLOAT, true),
+        new DimensionFieldSpec("doubleSV", DataType.DOUBLE, true),
+        new DimensionFieldSpec("stringSV", DataType.STRING, true),
+        new DimensionFieldSpec("bytesSV", DataType.BYTES, true), new DimensionFieldSpec("nullSV", DataType.INT, true),
+        new DimensionFieldSpec("intMV", DataType.INT, false), new DimensionFieldSpec("longMV", DataType.LONG, false),
+        new DimensionFieldSpec("floatMV", DataType.FLOAT, false),
+        new DimensionFieldSpec("doubleMV", DataType.DOUBLE, false),
+        new DimensionFieldSpec("stringMV", DataType.STRING, false),
+        new DimensionFieldSpec("nullMV", DataType.LONG, false));
+
+    _row = new GenericRow();
+    _row.putValue("intSV", 123);
+    _row.putValue("longSV", 123L);
+    _row.putValue("floatSV", 123.0f);
+    _row.putValue("doubleSV", 123.0);
+    _row.putValue("stringSV", "123");
+    _row.putValue("bytesSV", new byte[]{1, 2, 3});
+    _row.putDefaultNullValue("nullSV", Integer.MAX_VALUE);
+    _row.putValue("intMV", new Object[]{123, 456});
+    _row.putValue("longMV", new Object[]{123L, 456L});
+    _row.putValue("floatMV", new Object[]{123.0f, 456.0f});
+    _row.putValue("doubleMV", new Object[]{123.0, 456.0});
+    _row.putValue("stringMV", new Object[]{"123", "456"});
+    _row.putDefaultNullValue("nullMV", new Object[]{Long.MIN_VALUE});
+  }
+
+  @Test
+  public void testSerDeWithoutNullFields() {
+    GenericRowSerializer serializer = new GenericRowSerializer(_fieldSpecs, false);
+    byte[] bytes = serializer.serialize(_row);
+    PinotDataBuffer dataBuffer = PinotDataBuffer.allocateDirect(bytes.length, PinotDataBuffer.NATIVE_ORDER, null);
+    dataBuffer.readFrom(0L, bytes);
+    GenericRowDeserializer deserializer = new GenericRowDeserializer(dataBuffer, _fieldSpecs, false);
+    GenericRow reuse = new GenericRow();
+    deserializer.deserialize(0L, reuse);
+    Map<String, Object> actualValueMap = reuse.getFieldToValueMap();
+    Map<String, Object> expectedValueMap = _row.getFieldToValueMap();
+    // NOTE: Cannot directly assert equals on maps because they contain arrays
+    assertEquals(actualValueMap.size(), expectedValueMap.size());
+    for (Map.Entry<String, Object> entry : expectedValueMap.entrySet()) {
+      assertEquals(actualValueMap.get(entry.getKey()), entry.getValue());
+    }
+    assertTrue(reuse.getNullValueFields().isEmpty());
+  }
+
+  @Test
+  public void testSerDeWithNullFields() {
+    GenericRowSerializer serializer = new GenericRowSerializer(_fieldSpecs, true);
+    byte[] bytes = serializer.serialize(_row);
+    PinotDataBuffer dataBuffer = PinotDataBuffer.allocateDirect(bytes.length, PinotDataBuffer.NATIVE_ORDER, null);
+    dataBuffer.readFrom(0L, bytes);
+    GenericRowDeserializer deserializer = new GenericRowDeserializer(dataBuffer, _fieldSpecs, true);
+    GenericRow reuse = new GenericRow();
+    deserializer.deserialize(0L, reuse);
+    assertEquals(reuse, _row);
+  }
+
+  @Test
+  public void testSerDeWithPartialFields() {
+    List<FieldSpec> fieldSpecs = Arrays.asList(new DimensionFieldSpec("intSV", DataType.INT, true),
+        new DimensionFieldSpec("nullSV", DataType.INT, true));
+    GenericRowSerializer serializer = new GenericRowSerializer(fieldSpecs, true);
+    byte[] bytes = serializer.serialize(_row);
+    PinotDataBuffer dataBuffer = PinotDataBuffer.allocateDirect(bytes.length, PinotDataBuffer.NATIVE_ORDER, null);
+    dataBuffer.readFrom(0L, bytes);
+    GenericRowDeserializer deserializer = new GenericRowDeserializer(dataBuffer, fieldSpecs, true);
+    GenericRow reuse = new GenericRow();
+    deserializer.deserialize(0L, reuse);
+    Map<String, Object> fieldToValueMap = reuse.getFieldToValueMap();
+    assertEquals(fieldToValueMap.size(), 2);
+    assertEquals(fieldToValueMap.get("intSV"), _row.getValue("intSV"));
+    assertEquals(fieldToValueMap.get("nullSV"), _row.getValue("nullSV"));
+    Set<String> nullValueFields = reuse.getNullValueFields();
+    assertEquals(nullValueFields, Collections.singleton("nullSV"));
+  }
+
+  @Test
+  public void testPartialDeserialize() {
+    GenericRowSerializer serializer = new GenericRowSerializer(_fieldSpecs, true);
+    byte[] bytes = serializer.serialize(_row);
+    PinotDataBuffer dataBuffer = PinotDataBuffer.allocateDirect(bytes.length, PinotDataBuffer.NATIVE_ORDER, null);
+    dataBuffer.readFrom(0L, bytes);
+    GenericRowDeserializer deserializer = new GenericRowDeserializer(dataBuffer, _fieldSpecs, true);
+    Object[] values = deserializer.partialDeserialize(0L, 7);
+    assertEquals(values[0], _row.getValue("intSV"));
+    assertEquals(values[1], _row.getValue("longSV"));
+    assertEquals(values[2], _row.getValue("floatSV"));
+    assertEquals(values[3], _row.getValue("doubleSV"));
+    assertEquals(values[4], _row.getValue("stringSV"));
+    assertEquals(values[5], _row.getValue("bytesSV"));
+    assertEquals(values[6], _row.getValue("nullSV"));
+  }
+}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java
index f4875ec..a88886c 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java
@@ -198,6 +198,13 @@ public class GenericRow implements Serializable {
   }
 
   /**
+   * Marks a field as {@code null}.
+   */
+  public void addNullValueField(String fieldName) {
+    _nullValueFields.add(fieldName);
+  }
+
+  /**
    * Removes all the fields from the row.
    */
   public void clear() {

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org