You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2023/11/20 23:00:49 UTC

(pinot) branch master updated: Explicit null handling (#11960)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 35748a013b Explicit null handling (#11960)
35748a013b is described below

commit 35748a013b486b3526f7f6d53e6e1bee930c6e6a
Author: Gonzalo Ortiz Jaureguizar <go...@users.noreply.github.com>
AuthorDate: Tue Nov 21 00:00:44 2023 +0100

    Explicit null handling (#11960)
---
 .../apache/pinot/common/data/FieldSpecTest.java    |  45 +++
 .../org/apache/pinot/common/data/SchemaTest.java   |  26 ++
 .../query/selection/SelectionOperatorUtils.java    |   2 +
 .../src/test/resources/test_null_handling.schema   |   1 -
 .../org/apache/pinot/query/type/TypeFactory.java   |  63 ++--
 .../query/queries/ResourceBasedQueryPlansTest.java |   8 +-
 .../apache/pinot/query/type/TypeFactoryTest.java   | 199 +++++++++++-
 .../query/runtime/queries/QueryRunnerTest.java     |   3 +-
 .../query/runtime/queries/QueryRunnerTestBase.java |  58 +++-
 .../runtime/queries/ResourceBasedQueriesTest.java  |  14 +-
 .../testutils/MockInstanceDataManagerFactory.java  |   8 +-
 .../src/test/resources/queries/CountDistinct.json  |   8 +-
 .../test/resources/queries/MetadataTestQuery.json  |   2 +-
 .../src/test/resources/queries/NullHandling.json   | 338 ++++++++++++++++++++-
 .../immutable/ImmutableSegmentLoader.java          |  13 +
 .../indexsegment/mutable/MutableSegmentImpl.java   |  12 +-
 .../creator/impl/SegmentColumnarIndexCreator.java  |  32 +-
 .../segment/index/loader/IndexLoadingConfig.java   |   4 +
 .../defaultcolumn/BaseDefaultColumnHandler.java    |  14 +-
 .../index/nullvalue/NullValueIndexType.java        |  41 ++-
 .../index/nullvalue/NullValueIndexTypeTest.java    |  66 ++++
 .../java/org/apache/pinot/spi/data/FieldSpec.java  |  31 +-
 .../java/org/apache/pinot/spi/data/Schema.java     |  67 +++-
 23 files changed, 962 insertions(+), 93 deletions(-)

diff --git a/pinot-common/src/test/java/org/apache/pinot/common/data/FieldSpecTest.java b/pinot-common/src/test/java/org/apache/pinot/common/data/FieldSpecTest.java
index 75cd4d9cdc..91641e1afa 100644
--- a/pinot-common/src/test/java/org/apache/pinot/common/data/FieldSpecTest.java
+++ b/pinot-common/src/test/java/org/apache/pinot/common/data/FieldSpecTest.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pinot.common.data;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import java.io.IOException;
 import java.math.BigDecimal;
 import java.sql.Timestamp;
 import java.util.ArrayList;
@@ -404,4 +406,47 @@ public class FieldSpecTest {
     jsonString.append('}');
     return jsonString.toString();
   }
+
+  @DataProvider(name = "nullableCases")
+  public static Object[][] nullableCases() {
+    return new Object[][] {
+        //            declared notNull, returned notNull
+        new Object[] {null},
+        new Object[] {false},
+        new Object[] {true}
+    };
+  }
+
+  @Test(dataProvider = "nullableCases")
+  void testNullability(Boolean declared)
+      throws IOException {
+    boolean expected = declared == Boolean.TRUE;
+    String json;
+    if (declared == null) {
+      json = "{\"name\": \"col1\", \"dataType\":\"BOOLEAN\"}";
+    } else {
+      json = "{\"name\": \"col1\", \"dataType\":\"BOOLEAN\", \"notNull\": " + declared + "}";
+    }
+    DimensionFieldSpec fieldSpec = JsonUtils.stringToObject(json, DimensionFieldSpec.class);
+
+    Assert.assertEquals(fieldSpec.isNotNull(), expected, "Unexpected notNull read when declared as " + declared);
+    Assert.assertEquals(fieldSpec.isNullable(), !expected, "Unexpected nullable read when declared as " + declared);
+  }
+
+  @Test(dataProvider = "nullableCases")
+  void testNullabilityIdempotency(Boolean declared)
+      throws JsonProcessingException {
+    String json;
+    if (declared == null) {
+      json = "{\"name\": \"col1\", \"dataType\":\"BOOLEAN\"}";
+    } else {
+      json = "{\"name\": \"col1\", \"dataType\":\"BOOLEAN\", \"notNull\": " + declared + "}";
+    }
+    DimensionFieldSpec fieldSpec = JsonUtils.stringToObject(json, DimensionFieldSpec.class);
+
+    String serialized = JsonUtils.objectToString(fieldSpec);
+    DimensionFieldSpec deserialized = JsonUtils.stringToObject(serialized, DimensionFieldSpec.class);
+
+    Assert.assertEquals(deserialized, fieldSpec, "Changes detected while checking serialize/deserialize idempotency");
+  }
 }
diff --git a/pinot-common/src/test/java/org/apache/pinot/common/data/SchemaTest.java b/pinot-common/src/test/java/org/apache/pinot/common/data/SchemaTest.java
index e2e02ff193..626a091005 100644
--- a/pinot-common/src/test/java/org/apache/pinot/common/data/SchemaTest.java
+++ b/pinot-common/src/test/java/org/apache/pinot/common/data/SchemaTest.java
@@ -18,7 +18,9 @@
  */
 package org.apache.pinot.common.data;
 
+import com.fasterxml.jackson.databind.JsonNode;
 import java.io.File;
+import java.io.IOException;
 import java.math.BigDecimal;
 import java.net.URL;
 import java.sql.Timestamp;
@@ -33,6 +35,7 @@ import org.apache.pinot.spi.data.TimeFieldSpec;
 import org.apache.pinot.spi.data.TimeGranularitySpec;
 import org.apache.pinot.spi.data.TimeGranularitySpec.TimeFormat;
 import org.apache.pinot.spi.utils.BytesUtils;
+import org.apache.pinot.spi.utils.JsonUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -320,6 +323,29 @@ public class SchemaTest {
     Assert.assertNotEquals(jsonSchemaToCompare, jsonSchema);
   }
 
+  @Test
+  public void testSerializeDeserializeOptions()
+      throws IOException {
+    String json = "{\n"
+        + "  \"primaryKeyColumns\" : null,\n"
+        + "  \"timeFieldSpec\" : null,\n"
+        + "  \"schemaName\" : null,\n"
+        + "  \"enableColumnBasedNullHandling\" : true,\n"
+        + "  \"dimensionFieldSpecs\" : [ ],\n"
+        + "  \"metricFieldSpecs\" : [ ],\n"
+        + "  \"dateTimeFieldSpecs\" : [ ]\n"
+        + "}";
+    JsonNode expectedNode = JsonUtils.stringToJsonNode(json);
+
+    Schema schema = JsonUtils.jsonNodeToObject(expectedNode, Schema.class);
+    Assert.assertTrue(schema.isEnableColumnBasedNullHandling(), "Column null handling should be enabled");
+
+    String serialized = JsonUtils.objectToString(schema);
+    Schema deserialized = JsonUtils.stringToObject(serialized, Schema.class);
+
+    Assert.assertEquals(deserialized, schema, "Changes detected while checking serialize/deserialize idempotency");
+  }
+
   @Test
   public void testSimpleDateFormat()
       throws Exception {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
index 681fbe44b8..9d4bc6dd8b 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
@@ -354,6 +354,8 @@ public class SelectionOperatorUtils {
 
   /**
    * Build a {@link DataTable} from a {@link Collection} of selection rows with {@link DataSchema}. (Server side)
+   *
+   * This method is allowed to modify the given rows. Specifically, it may remove nulls cells from it.
    */
   public static DataTable getDataTableFromRows(Collection<Object[]> rows, DataSchema dataSchema,
       boolean nullHandlingEnabled)
diff --git a/pinot-integration-tests/src/test/resources/test_null_handling.schema b/pinot-integration-tests/src/test/resources/test_null_handling.schema
index 2113389249..1e9c0ec10b 100644
--- a/pinot-integration-tests/src/test/resources/test_null_handling.schema
+++ b/pinot-integration-tests/src/test/resources/test_null_handling.schema
@@ -30,4 +30,3 @@
   },
   "schemaName": "mytable"
 }
-
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/type/TypeFactory.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/type/TypeFactory.java
index cf64492125..ccc0f41a08 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/type/TypeFactory.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/type/TypeFactory.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.query.type;
 
 import java.util.Map;
+import java.util.function.Predicate;
 import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeSystem;
@@ -31,8 +32,8 @@ import org.apache.pinot.spi.data.Schema;
  * Extends Java-base TypeFactory from Calcite.
  *
  * <p>{@link JavaTypeFactoryImpl} is used here because we are not overriding much of the TypeFactory methods
- * required by Calcite. We will start extending {@link SqlTypeFactoryImpl} or even {@link RelDataTypeFactory}
- * when necessary for Pinot to override such mechanism.
+ * required by Calcite. We will start extending {@link org.apache.calcite.sql.type.SqlTypeFactoryImpl} or even
+ * {@link org.apache.calcite.rel.type.RelDataTypeFactory} when necessary for Pinot to override such mechanism.
  *
  * <p>Noted that {@link JavaTypeFactoryImpl} is subject to change. Please pay extra attention to this class when
  * upgrading Calcite versions.
@@ -45,20 +46,36 @@ public class TypeFactory extends JavaTypeFactoryImpl {
 
   public RelDataType createRelDataTypeFromSchema(Schema schema) {
     Builder builder = new Builder(this);
+    Predicate<FieldSpec> isNullable;
+    if (schema.isEnableColumnBasedNullHandling()) {
+      isNullable = FieldSpec::isNullable;
+    } else {
+      isNullable = fieldSpec -> false;
+    }
     for (Map.Entry<String, FieldSpec> e : schema.getFieldSpecMap().entrySet()) {
-      builder.add(e.getKey(), toRelDataType(e.getValue()));
+      builder.add(e.getKey(), toRelDataType(e.getValue(), isNullable));
     }
     return builder.build();
   }
 
-  private RelDataType toRelDataType(FieldSpec fieldSpec) {
+  private RelDataType toRelDataType(FieldSpec fieldSpec, Predicate<FieldSpec> isNullable) {
+    RelDataType type = createSqlType(getSqlTypeName(fieldSpec));
+    boolean isArray = !fieldSpec.isSingleValueField();
+    if (isArray) {
+      type = createArrayType(type, -1);
+    }
+    if (isNullable.test(fieldSpec)) {
+      type = createTypeWithNullability(type, true);
+    }
+    return type;
+  }
+
+  private SqlTypeName getSqlTypeName(FieldSpec fieldSpec) {
     switch (fieldSpec.getDataType()) {
       case INT:
-        return fieldSpec.isSingleValueField() ? createSqlType(SqlTypeName.INTEGER)
-            : createArrayType(createSqlType(SqlTypeName.INTEGER), -1);
+        return SqlTypeName.INTEGER;
       case LONG:
-        return fieldSpec.isSingleValueField() ? createSqlType(SqlTypeName.BIGINT)
-            : createArrayType(createSqlType(SqlTypeName.BIGINT), -1);
+        return SqlTypeName.BIGINT;
       // Map float and double to the same RelDataType so that queries like
       // `select count(*) from table where aFloatColumn = 0.05` works correctly in multi-stage query engine.
       //
@@ -71,34 +88,32 @@ public class TypeFactory extends JavaTypeFactoryImpl {
       // With float and double mapped to the same RelDataType, the behavior in multi-stage query engine will be the same
       // as the query in v1 query engine.
       case FLOAT:
-        return fieldSpec.isSingleValueField() ? createSqlType(SqlTypeName.DOUBLE)
-            : createArrayType(createSqlType(SqlTypeName.REAL), -1);
+        if (fieldSpec.isSingleValueField()) {
+          return SqlTypeName.DOUBLE;
+        } else {
+          // TODO: This may be wrong. The reason why we want to use DOUBLE in single value float may also apply here
+          return SqlTypeName.REAL;
+        }
       case DOUBLE:
-        return fieldSpec.isSingleValueField() ? createSqlType(SqlTypeName.DOUBLE)
-            : createArrayType(createSqlType(SqlTypeName.DOUBLE), -1);
+        return SqlTypeName.DOUBLE;
       case BOOLEAN:
-        return fieldSpec.isSingleValueField() ? createSqlType(SqlTypeName.BOOLEAN)
-            : createArrayType(createSqlType(SqlTypeName.BOOLEAN), -1);
+        return SqlTypeName.BOOLEAN;
       case TIMESTAMP:
-        return fieldSpec.isSingleValueField() ? createSqlType(SqlTypeName.TIMESTAMP)
-            : createArrayType(createSqlType(SqlTypeName.TIMESTAMP), -1);
+        return SqlTypeName.TIMESTAMP;
       case STRING:
-        return fieldSpec.isSingleValueField() ? createSqlType(SqlTypeName.VARCHAR)
-            : createArrayType(createSqlType(SqlTypeName.VARCHAR), -1);
+        return SqlTypeName.VARCHAR;
       case BYTES:
-        return fieldSpec.isSingleValueField() ? createSqlType(SqlTypeName.VARBINARY)
-            : createArrayType(createSqlType(SqlTypeName.VARBINARY), -1);
+        return SqlTypeName.VARBINARY;
       case BIG_DECIMAL:
-        return fieldSpec.isSingleValueField() ? createSqlType(SqlTypeName.DECIMAL)
-            : createArrayType(createSqlType(SqlTypeName.DECIMAL), -1);
+        return SqlTypeName.DECIMAL;
       case JSON:
-        return createSqlType(SqlTypeName.VARCHAR);
+        return SqlTypeName.VARCHAR;
       case LIST:
         // TODO: support LIST, MV column should go fall into this category.
       case STRUCT:
       case MAP:
       default:
-        String message = String.format("Unsupported type: %s ", fieldSpec.getDataType().toString());
+        String message = String.format("Unsupported type: %s ", fieldSpec.getDataType());
         throw new UnsupportedOperationException(message);
     }
   }
diff --git a/pinot-query-planner/src/test/java/org/apache/pinot/query/queries/ResourceBasedQueryPlansTest.java b/pinot-query-planner/src/test/java/org/apache/pinot/query/queries/ResourceBasedQueryPlansTest.java
index 562e3d23fb..fa58d62291 100644
--- a/pinot-query-planner/src/test/java/org/apache/pinot/query/queries/ResourceBasedQueryPlansTest.java
+++ b/pinot-query-planner/src/test/java/org/apache/pinot/query/queries/ResourceBasedQueryPlansTest.java
@@ -47,12 +47,14 @@ public class ResourceBasedQueryPlansTest extends QueryEnvironmentTestBase {
   private static final String FILE_FILTER_PROPERTY = "pinot.fileFilter";
 
   @Test(dataProvider = "testResourceQueryPlannerTestCaseProviderHappyPath")
-  public void testQueryExplainPlansAndQueryPlanConversion(String testCaseName, String query, String output) {
+  public void testQueryExplainPlansAndQueryPlanConversion(String testCaseName, String description, String query,
+      String output) {
     try {
       long requestId = RANDOM_REQUEST_ID_GEN.nextLong();
       String explainedPlan = _queryEnvironment.explainQuery(query, requestId);
       Assert.assertEquals(explainedPlan, output,
-          String.format("Test case %s for query %s doesn't match expected output: %s", testCaseName, query, output));
+          String.format("Test case %s for query %s (%s) doesn't match expected output: %s", testCaseName, description,
+              query, output));
       // use a regex to exclude the
       String queryWithoutExplainPlan = query.replaceFirst(EXPLAIN_REGEX, "");
       DispatchableSubPlan dispatchableSubPlan = _queryEnvironment.planQuery(queryWithoutExplainPlan);
@@ -105,7 +107,7 @@ public class ResourceBasedQueryPlansTest extends QueryEnvironmentTestBase {
           String sql = queryCase._sql;
           List<String> orgOutput = queryCase._output;
           String concatenatedOutput = StringUtils.join(orgOutput, "");
-          Object[] testEntry = new Object[]{testCaseName, sql, concatenatedOutput};
+          Object[] testEntry = new Object[]{testCaseName, queryCase._description, sql, concatenatedOutput};
           providerContent.add(testEntry);
         }
       }
diff --git a/pinot-query-planner/src/test/java/org/apache/pinot/query/type/TypeFactoryTest.java b/pinot-query-planner/src/test/java/org/apache/pinot/query/type/TypeFactoryTest.java
index 698f02591e..8c1686b2c8 100644
--- a/pinot-query-planner/src/test/java/org/apache/pinot/query/type/TypeFactoryTest.java
+++ b/pinot-query-planner/src/test/java/org/apache/pinot/query/type/TypeFactoryTest.java
@@ -18,7 +18,11 @@
  */
 package org.apache.pinot.query.type;
 
+import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeField;
 import org.apache.calcite.sql.type.ArraySqlType;
@@ -27,12 +31,203 @@ import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.testng.Assert;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 
 public class TypeFactoryTest {
-  private static final TypeSystem TYPE_SYSTEM = new TypeSystem() {
-  };
+  private static final TypeSystem TYPE_SYSTEM = new TypeSystem();
+
+  @DataProvider(name = "relDataTypeConversion")
+  public Iterator<Object[]> relDataTypeConversion() {
+    ArrayList<Object[]> cases = new ArrayList<>();
+
+    JavaTypeFactory javaTypeFactory = new JavaTypeFactoryImpl(TYPE_SYSTEM);
+
+    for (FieldSpec.DataType dataType : FieldSpec.DataType.values()) {
+      RelDataType basicType;
+      RelDataType arrayType = null;
+      switch (dataType) {
+        case INT: {
+          basicType = javaTypeFactory.createSqlType(SqlTypeName.INTEGER);
+          break;
+        }
+        case LONG: {
+          basicType = javaTypeFactory.createSqlType(SqlTypeName.BIGINT);
+          break;
+        }
+        // Map float and double to the same RelDataType so that queries like
+        // `select count(*) from table where aFloatColumn = 0.05` works correctly in multi-stage query engine.
+        //
+        // If float and double are mapped to different RelDataType,
+        // `select count(*) from table where aFloatColumn = 0.05` will be converted to
+        // `select count(*) from table where CAST(aFloatColumn as "DOUBLE") = 0.05`. While casting
+        // from float to double does not always produce the same double value as the original float value, this leads to
+        // wrong query result.
+        //
+        // With float and double mapped to the same RelDataType, the behavior in multi-stage query engine will be the
+        // same as the query in v1 query engine.
+        case FLOAT: {
+          basicType = javaTypeFactory.createSqlType(SqlTypeName.DOUBLE);
+          arrayType = javaTypeFactory.createSqlType(SqlTypeName.REAL);
+          break;
+        }
+        case DOUBLE: {
+          basicType = javaTypeFactory.createSqlType(SqlTypeName.DOUBLE);
+          break;
+        }
+        case BOOLEAN: {
+          basicType = javaTypeFactory.createSqlType(SqlTypeName.BOOLEAN);
+          break;
+        }
+        case TIMESTAMP: {
+          basicType = javaTypeFactory.createSqlType(SqlTypeName.TIMESTAMP);
+          break;
+        }
+        case STRING:
+        case JSON: {
+          basicType = javaTypeFactory.createSqlType(SqlTypeName.VARCHAR);
+          break;
+        }
+        case BYTES: {
+          basicType = javaTypeFactory.createSqlType(SqlTypeName.VARBINARY);
+          break;
+        }
+        case BIG_DECIMAL: {
+          basicType = javaTypeFactory.createSqlType(SqlTypeName.DECIMAL);
+          break;
+        }
+        case LIST:
+        case STRUCT:
+        case MAP:
+        case UNKNOWN:
+          continue;
+        default:
+          String message = String.format("Unsupported type: %s ", dataType);
+          throw new UnsupportedOperationException(message);
+      }
+      if (arrayType == null) {
+        arrayType = basicType;
+      }
+      cases.add(new Object[]{dataType, basicType, arrayType, true});
+      cases.add(new Object[]{dataType, basicType, arrayType, false});
+    }
+    return cases.iterator();
+  }
+
+  @Test(dataProvider = "relDataTypeConversion")
+  public void testScalarTypes(FieldSpec.DataType dataType, RelDataType scalarType, RelDataType arrayType,
+      boolean columnNullMode) {
+    TypeFactory typeFactory = new TypeFactory(TYPE_SYSTEM);
+    Schema testSchema = new Schema.SchemaBuilder()
+        .addSingleValueDimension("col", dataType)
+        .setEnableColumnBasedNullHandling(columnNullMode)
+        .build();
+    RelDataType relDataTypeFromSchema = typeFactory.createRelDataTypeFromSchema(testSchema);
+    List<RelDataTypeField> fieldList = relDataTypeFromSchema.getFieldList();
+    RelDataTypeField field = fieldList.get(0);
+    boolean colNullable = isColNullable(testSchema);
+    Assert.assertEquals(field.getType(), typeFactory.createTypeWithNullability(scalarType, colNullable));
+  }
+
+  @Test(dataProvider = "relDataTypeConversion")
+  public void testNullableScalarTypes(FieldSpec.DataType dataType, RelDataType scalarType, RelDataType arrayType,
+      boolean columnNullMode) {
+    TypeFactory typeFactory = new TypeFactory(TYPE_SYSTEM);
+    Schema testSchema = new Schema.SchemaBuilder()
+        .addDimensionField("col", dataType, field -> field.setNullable(true))
+        .setEnableColumnBasedNullHandling(columnNullMode)
+        .build();
+    RelDataType relDataTypeFromSchema = typeFactory.createRelDataTypeFromSchema(testSchema);
+    List<RelDataTypeField> fieldList = relDataTypeFromSchema.getFieldList();
+    RelDataTypeField field = fieldList.get(0);
+
+    boolean colNullable = isColNullable(testSchema);
+    RelDataType expectedType = typeFactory.createTypeWithNullability(scalarType, colNullable);
+
+    Assert.assertEquals(field.getType(), expectedType);
+  }
+
+
+  @Test(dataProvider = "relDataTypeConversion")
+  public void testNotNullableScalarTypes(FieldSpec.DataType dataType, RelDataType scalarType, RelDataType arrayType,
+      boolean columnNullMode) {
+    TypeFactory typeFactory = new TypeFactory(TYPE_SYSTEM);
+    Schema testSchema = new Schema.SchemaBuilder()
+        .addDimensionField("col", dataType, field -> field.setNullable(false))
+        .setEnableColumnBasedNullHandling(columnNullMode)
+        .build();
+    RelDataType relDataTypeFromSchema = typeFactory.createRelDataTypeFromSchema(testSchema);
+    List<RelDataTypeField> fieldList = relDataTypeFromSchema.getFieldList();
+    RelDataTypeField field = fieldList.get(0);
+
+    Assert.assertEquals(field.getType(), scalarType);
+  }
+
+  private boolean isColNullable(Schema schema) {
+    return schema.isEnableColumnBasedNullHandling() && schema.getFieldSpecFor("col").isNullable();
+  }
+
+  @Test(dataProvider = "relDataTypeConversion")
+  public void testArrayTypes(FieldSpec.DataType dataType, RelDataType scalarType, RelDataType arrayType,
+      boolean columnNullMode) {
+    TypeFactory typeFactory = new TypeFactory(TYPE_SYSTEM);
+    Schema testSchema = new Schema.SchemaBuilder()
+        .addMultiValueDimension("col", dataType)
+        .setEnableColumnBasedNullHandling(columnNullMode)
+        .build();
+    RelDataType relDataTypeFromSchema = typeFactory.createRelDataTypeFromSchema(testSchema);
+    List<RelDataTypeField> fieldList = relDataTypeFromSchema.getFieldList();
+    RelDataTypeField field = fieldList.get(0);
+
+    boolean nullable = isColNullable(testSchema);
+    RelDataType expectedType =
+        typeFactory.createTypeWithNullability(typeFactory.createArrayType(arrayType, -1), nullable);
+
+    Assert.assertEquals(field.getType(), expectedType);
+  }
+
+  @Test(dataProvider = "relDataTypeConversion")
+  public void testNullableArrayTypes(FieldSpec.DataType dataType, RelDataType scalarType, RelDataType arrayType,
+      boolean columnNullMode) {
+    TypeFactory typeFactory = new TypeFactory(TYPE_SYSTEM);
+    Schema testSchema = new Schema.SchemaBuilder()
+        .addDimensionField("col", dataType, field -> {
+          field.setNullable(true);
+          field.setSingleValueField(false);
+        })
+        .setEnableColumnBasedNullHandling(columnNullMode)
+        .build();
+    RelDataType relDataTypeFromSchema = typeFactory.createRelDataTypeFromSchema(testSchema);
+    List<RelDataTypeField> fieldList = relDataTypeFromSchema.getFieldList();
+    RelDataTypeField field = fieldList.get(0);
+
+    boolean colNullable = isColNullable(testSchema);
+    RelDataType expectedType =
+        typeFactory.createTypeWithNullability(typeFactory.createArrayType(arrayType, -1), colNullable);
+
+    Assert.assertEquals(field.getType(), expectedType);
+  }
+
+  @Test(dataProvider = "relDataTypeConversion")
+  public void testNotNullableArrayTypes(FieldSpec.DataType dataType, RelDataType scalarType, RelDataType arrayType,
+      boolean columnNullMode) {
+    TypeFactory typeFactory = new TypeFactory(TYPE_SYSTEM);
+    Schema testSchema = new Schema.SchemaBuilder()
+        .addDimensionField("col", dataType, field -> {
+          field.setNullable(false);
+          field.setSingleValueField(false);
+        })
+        .setEnableColumnBasedNullHandling(columnNullMode)
+        .build();
+    RelDataType relDataTypeFromSchema = typeFactory.createRelDataTypeFromSchema(testSchema);
+    List<RelDataTypeField> fieldList = relDataTypeFromSchema.getFieldList();
+    RelDataTypeField field = fieldList.get(0);
+
+    RelDataType expectedType = typeFactory.createArrayType(arrayType, -1);
+
+    Assert.assertEquals(field.getType(), expectedType);
+  }
 
   @Test
   public void testRelDataTypeConversion() {
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTest.java
index f25984f5fd..5a36f30c5a 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTest.java
@@ -62,7 +62,8 @@ public class QueryRunnerTest extends QueryRunnerTestBase {
     SCHEMA_BUILDER = new Schema.SchemaBuilder().addSingleValueDimension("col1", FieldSpec.DataType.STRING, "")
         .addSingleValueDimension("col2", FieldSpec.DataType.STRING, "")
         .addDateTime("ts", FieldSpec.DataType.LONG, "1:MILLISECONDS:EPOCH", "1:HOURS")
-        .addMetric("col3", FieldSpec.DataType.INT, 0).setSchemaName("defaultSchemaName");
+        .addMetric("col3", FieldSpec.DataType.INT, 0).setSchemaName("defaultSchemaName")
+        .setEnableColumnBasedNullHandling(true);
   }
 
   public static List<GenericRow> buildRows(String tableName) {
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java
index 8837e97a0e..4d6a79052c 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java
@@ -18,8 +18,10 @@
  */
 package org.apache.pinot.query.runtime.queries;
 
+import com.fasterxml.jackson.annotation.JsonAnySetter;
 import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.JsonNode;
 import com.google.common.math.DoubleMath;
 import java.math.BigDecimal;
 import java.sql.Connection;
@@ -204,9 +206,10 @@ public abstract class QueryRunnerTestBase extends QueryTestSet {
   protected void compareRowEquals(ResultTable resultTable, List<Object[]> expectedRows, boolean keepOutputRowsInOrder) {
     List<Object[]> resultRows = resultTable.getRows();
     int numRows = resultRows.size();
-    assertEquals(numRows, expectedRows.size(), String.format("Mismatched number of results. expected: %s, actual: %s",
-        expectedRows.stream().map(Arrays::toString).collect(Collectors.joining(",\n")),
-        resultRows.stream().map(Arrays::toString).collect(Collectors.joining(",\n"))));
+    assertEquals(numRows, expectedRows.size(),
+        String.format("Mismatched number of results.\nExpected Rows:\n%s\nActual Rows:\n%s",
+            expectedRows.stream().map(Arrays::toString).collect(Collectors.joining(",\n")),
+            resultRows.stream().map(Arrays::toString).collect(Collectors.joining(",\n"))));
 
     DataSchema dataSchema = resultTable.getDataSchema();
     resultRows.forEach(row -> canonicalizeRow(dataSchema, row));
@@ -219,12 +222,12 @@ public abstract class QueryRunnerTestBase extends QueryTestSet {
       Object[] resultRow = resultRows.get(i);
       Object[] expectedRow = expectedRows.get(i);
       assertEquals(resultRow.length, expectedRow.length,
-          String.format("Unexpected row size mismatch. Expected: %s, Actual: %s", Arrays.toString(expectedRow),
-              Arrays.toString(resultRow)));
+          String.format("Mismatched row size at row id: %d. Expected Row: %s, Actual Row: %s", i,
+              Arrays.toString(expectedRow), Arrays.toString(resultRow)));
       for (int j = 0; j < resultRow.length; j++) {
         assertTrue(typeCompatibleFuzzyEquals(dataSchema.getColumnDataType(j), resultRow[j], expectedRow[j]),
-            "Not match at (" + i + "," + j + ")! Expected: " + Arrays.toString(expectedRow) + " Actual: "
-                + Arrays.toString(resultRow));
+            String.format("Mismatched value at row id: %d, column id: %d. Expected Row: %s, Actual Row: %s", i, j,
+                Arrays.toString(expectedRow), Arrays.toString(resultRow)));
       }
     }
   }
@@ -407,7 +410,11 @@ public abstract class QueryRunnerTestBase extends QueryTestSet {
     // TODO: ts is built-in, but we should allow user overwrite
     builder.addDateTime("ts", FieldSpec.DataType.LONG, "1:MILLISECONDS:EPOCH", "1:SECONDS");
     builder.setSchemaName(schemaName);
-    return builder.build();
+    Schema schema = builder.build();
+    for (QueryTestCase.ColumnAndType columnAndType : columnAndTypes) {
+      schema.getFieldSpecMap().get(columnAndType._name).setNotNull(columnAndType._notNull);
+    }
+    return schema;
   }
 
   protected List<GenericRow> toRow(List<QueryTestCase.ColumnAndType> columnAndTypes, List<List<Object>> value) {
@@ -540,7 +547,7 @@ public abstract class QueryRunnerTestBase extends QueryTestSet {
     @JsonProperty("queries")
     public List<Query> _queries;
     @JsonProperty("extraProps")
-    public Map<String, Object> _extraProps = Collections.emptyMap();
+    public ExtraProperties _extraProps = new ExtraProperties();
 
     public static class Table {
       @JsonProperty("schema")
@@ -581,6 +588,39 @@ public abstract class QueryRunnerTestBase extends QueryTestSet {
       String _type;
       @JsonProperty("isSingleValue")
       boolean _isSingleValue = true;
+      @JsonProperty("notNull")
+      boolean _notNull = false;
+    }
+
+    public static class ExtraProperties {
+      private boolean _enableColumnBasedNullHandling = false;
+      private boolean _noEmptySegment = false;
+      private final Map<String, JsonNode> _unknownProps = new HashMap<>();
+
+      public boolean isEnableColumnBasedNullHandling() {
+        return _enableColumnBasedNullHandling;
+      }
+
+      public void setEnableColumnBasedNullHandling(boolean enableColumnBasedNullHandling) {
+        _enableColumnBasedNullHandling = enableColumnBasedNullHandling;
+      }
+
+      public boolean isNoEmptySegment() {
+        return _noEmptySegment;
+      }
+
+      public void setNoEmptySegment(boolean noEmptySegment) {
+        _noEmptySegment = noEmptySegment;
+      }
+
+      public Map<String, JsonNode> getUnknownProps() {
+        return _unknownProps;
+      }
+
+      @JsonAnySetter
+      public void setAny(String key, JsonNode value) {
+        _unknownProps.put(key, value);
+      }
     }
   }
 }
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
index 1ebf761920..bb6744084a 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
@@ -31,7 +31,6 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.Random;
 import java.util.Set;
 import java.util.TimeZone;
 import java.util.regex.Matcher;
@@ -54,7 +53,6 @@ import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.env.PinotConfiguration;
-import org.apache.pinot.spi.utils.BooleanUtils;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
@@ -69,7 +67,6 @@ public class ResourceBasedQueriesTest extends QueryRunnerTestBase {
   private static final ObjectMapper MAPPER = new ObjectMapper();
   private static final Pattern TABLE_NAME_REPLACE_PATTERN = Pattern.compile("\\{([\\w\\d]+)\\}");
   private static final String QUERY_TEST_RESOURCE_FOLDER = "queries";
-  private static final Random RANDOM = new Random(42);
   private static final String FILE_FILTER_PROPERTY = "pinot.fileFilter";
   private static final String IGNORE_FILTER_PROPERTY = "pinot.runIgnored";
   private static final int DEFAULT_NUM_PARTITIONS = 4;
@@ -109,17 +106,17 @@ public class ResourceBasedQueriesTest extends QueryRunnerTestBase {
       // table will be registered on both servers.
       Map<String, Schema> schemaMap = new HashMap<>();
       for (Map.Entry<String, QueryTestCase.Table> tableEntry : testCase._tables.entrySet()) {
-        boolean allowEmptySegment = !BooleanUtils.toBoolean(extractExtraProps(testCase._extraProps, "noEmptySegment"));
+        boolean allowEmptySegment = !testCase._extraProps.isNoEmptySegment();
         String tableName = testCaseName + "_" + tableEntry.getKey();
         // Testing only OFFLINE table b/c Hybrid table test is a special case to test separately.
         String offlineTableName = TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(tableName);
         Schema pinotSchema = constructSchema(tableName, tableEntry.getValue()._schema);
+        pinotSchema.setEnableColumnBasedNullHandling(testCase._extraProps.isEnableColumnBasedNullHandling());
         schemaMap.put(tableName, pinotSchema);
         factory1.registerTable(pinotSchema, offlineTableName);
         factory2.registerTable(pinotSchema, offlineTableName);
         List<QueryTestCase.ColumnAndType> columnAndTypes = tableEntry.getValue()._schema;
         List<GenericRow> genericRows = toRow(columnAndTypes, tableEntry.getValue()._inputs);
-
         // generate segments and dump into server1 and server2
         List<String> partitionColumns = tableEntry.getValue()._partitionColumns;
         int numPartitions = tableEntry.getValue()._partitionCount == null ? DEFAULT_NUM_PARTITIONS
@@ -139,14 +136,17 @@ public class ResourceBasedQueriesTest extends QueryRunnerTestBase {
           partitionIdToRowsMap.add(new ArrayList<>());
         }
 
-        for (GenericRow row : genericRows) {
+        int numRows = genericRows.size();
+        for (int i = 0; i < numRows; i++) {
+          GenericRow row = genericRows.get(i);
           if (row == SEGMENT_BREAKER_ROW) {
             addSegments(factory1, factory2, offlineTableName, allowEmptySegment, partitionIdToRowsMap,
                 partitionIdToSegmentsMap, numPartitions);
           } else {
             int partitionId;
             if (partitionColumns == null) {
-              partitionId = RANDOM.nextInt(numPartitions);
+              // Round-robin when there is no partition column
+              partitionId = i % numPartitions;
             } else {
               int hashCode = 0;
               for (String field : partitionColumns) {
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/testutils/MockInstanceDataManagerFactory.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/testutils/MockInstanceDataManagerFactory.java
index fd3ef7a374..c3a34cbd20 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/testutils/MockInstanceDataManagerFactory.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/testutils/MockInstanceDataManagerFactory.java
@@ -65,7 +65,7 @@ public class MockInstanceDataManagerFactory {
   // Key is registered table (with or without type)
   private final Map<String, Schema> _registeredSchemaMap;
 
-  private String _serverName;
+  private final String _serverName;
 
   public MockInstanceDataManagerFactory(String serverName) {
     _serverName = serverName;
@@ -162,8 +162,8 @@ public class MockInstanceDataManagerFactory {
     String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType);
     TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
     // TODO: plugin table config constructor
-    TableConfig tableConfig =
-        new TableConfigBuilder(tableType).setTableName(rawTableName).setTimeColumnName("ts").build();
+    TableConfig tableConfig = new TableConfigBuilder(tableType).setTableName(rawTableName).setTimeColumnName("ts")
+        .setNullHandlingEnabled(true).build();
     Schema schema = _schemaMap.get(rawTableName);
     SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig, schema);
     config.setOutDir(indexDir.getPath());
@@ -174,7 +174,7 @@ public class MockInstanceDataManagerFactory {
     try (RecordReader recordReader = new GenericRowRecordReader(rows)) {
       driver.init(config, recordReader);
       driver.build();
-      return ImmutableSegmentLoader.load(new File(indexDir, segmentName), ReadMode.mmap);
+      return ImmutableSegmentLoader.load(new File(indexDir, segmentName), ReadMode.mmap, tableConfig, schema);
     } catch (Exception e) {
       throw new RuntimeException("Unable to construct immutable segment from records", e);
     }
diff --git a/pinot-query-runtime/src/test/resources/queries/CountDistinct.json b/pinot-query-runtime/src/test/resources/queries/CountDistinct.json
index 973d83b70c..cb146b149b 100644
--- a/pinot-query-runtime/src/test/resources/queries/CountDistinct.json
+++ b/pinot-query-runtime/src/test/resources/queries/CountDistinct.json
@@ -122,9 +122,9 @@
         "outputs": [[8]]
       },
       {
-        "comments": "table aren't actually partitioned by val thus all segments can produce duplicate results, thus [[b, 5], [a, 4]]",
+        "comments": "table aren't actually partitioned by val thus all segments can produce duplicate results, thus [[b, 6], [a, 4]]",
         "sql": "SELECT groupingCol, SEGMENT_PARTITIONED_DISTINCT_COUNT(val) FROM {tbl1} GROUP BY groupingCol",
-        "outputs": [["b", 5], ["a", 4]]
+        "outputs": [["b", 6], ["a", 4]]
       },
       {
         "sql": "SELECT l.groupingCol, SEGMENT_PARTITIONED_DISTINCT_COUNT(l.val), SEGMENT_PARTITIONED_DISTINCT_COUNT(r.val) FROM {tbl1} l JOIN {tbl2} r ON l.groupingCol = r.groupingCol GROUP BY l.groupingCol",
@@ -135,9 +135,9 @@
         "outputs": [["b", 6], ["a", 6]]
       },
       {
-        "comments": "table aren't actually partitioned by val thus all segments can produce duplicate results, thus [[b, 5], [a, 4]]",
+        "comments": "table aren't actually partitioned by val thus all segments can produce duplicate results, thus [[b, 6], [a, 4]]",
         "sql": "SELECT /*+ aggOptions(is_skip_leaf_stage_aggregate='true') */ groupingCol, SEGMENT_PARTITIONED_DISTINCT_COUNT(val) FROM {tbl1} GROUP BY groupingCol",
-        "outputs": [["b", 5], ["a", 4]]
+        "outputs": [["b", 6], ["a", 4]]
       },
       {
         "sql": "SELECT /*+ aggOptions(is_skip_leaf_stage_aggregate='true') */ l.groupingCol, SEGMENT_PARTITIONED_DISTINCT_COUNT(l.val), SEGMENT_PARTITIONED_DISTINCT_COUNT(r.val) FROM {tbl1} l JOIN {tbl2} r ON l.groupingCol = r.groupingCol GROUP BY l.groupingCol",
diff --git a/pinot-query-runtime/src/test/resources/queries/MetadataTestQuery.json b/pinot-query-runtime/src/test/resources/queries/MetadataTestQuery.json
index fdccd9cee8..3ed27c8a82 100644
--- a/pinot-query-runtime/src/test/resources/queries/MetadataTestQuery.json
+++ b/pinot-query-runtime/src/test/resources/queries/MetadataTestQuery.json
@@ -60,7 +60,7 @@
       }
     ],
     "extraProps": {
-      "noEmptySegment": "true"
+      "noEmptySegment": true
     }
   }
 }
diff --git a/pinot-query-runtime/src/test/resources/queries/NullHandling.json b/pinot-query-runtime/src/test/resources/queries/NullHandling.json
index 334eb5011a..ee15c88f34 100644
--- a/pinot-query-runtime/src/test/resources/queries/NullHandling.json
+++ b/pinot-query-runtime/src/test/resources/queries/NullHandling.json
@@ -66,5 +66,341 @@
         "keepOutputRowOrder": true
       }
     ]
+  },
+  "table_level_null_handling": {
+    "tables": {
+      "tbl1" : {
+        "schema": [
+          {"name": "strCol1", "type": "STRING"},
+          {"name": "intCol1", "type": "INT"},
+          {"name": "nIntCol1", "type": "INT", "notNull": false},
+          {"name": "nnIntCol1", "type": "INT", "notNull": true},
+          {"name": "strCol2", "type": "STRING"},
+          {"name": "nStrCol2", "type": "STRING", "notNull": false},
+          {"name": "nnStrCol2", "type": "STRING", "notNull": true}
+        ],
+        "inputs": [
+          ["foo", 1, 1, 1, "foo", "foo", "foo"],
+          ["bar", 2, 2, 2, "alice", "alice", "alice"],
+          ["alice", 2, 2, 2, null, null, null],
+          [null, 4, 4, 4, null, null, null],
+          ["bob", null, null, null, null, null, null]
+        ]
+      },
+      "tbl2" : {
+        "schema": [
+          {"name": "strCol1", "type": "STRING"},
+          {"name": "strCol2", "type": "STRING"},
+          {"name": "intCol1", "type": "INT"},
+          {"name": "doubleCol1", "type": "DOUBLE"}
+        ],
+        "inputs": [
+          ["foo", "bob", 3, 3.1416],
+          ["alice", "alice", 4, 2.7183],
+          [null, null, null, null]
+        ]
+      },
+      "tbl3": {
+        "schema": [
+          {"name": "strCol1", "type": "STRING"},
+          {"name": "intCol1", "type": "INT"},
+          {"name": "strCol2", "type": "STRING"}
+        ],
+        "inputs": [
+          ["foo", 1, "foo"],
+          ["bar", 2, "foo"]
+        ]
+      }
+    },
+    "queries": [
+      {
+        "description": "join 3 tables, mixed join conditions with null non-matching",
+        "sql": "SET enableNullHandling=true; SELECT * FROM {tbl1} JOIN {tbl2} ON {tbl1}.intCol1 > {tbl2}.doubleCol1 JOIN {tbl3} ON {tbl1}.strCol1 = {tbl3}.strCol2"
+      },
+      {
+        "description": "join 3 tables, mixed join condition with null matching",
+        "sql": "SET enableNullHandling=true; SELECT * FROM {tbl1} JOIN {tbl2} ON {tbl1}.intCol1 != {tbl2}.doubleCol1 JOIN {tbl3} ON {tbl1}.strCol1 = {tbl3}.strCol2"
+      },
+      {
+        "description": "join 2 tables, mixed join conditions of null matching or non-matching",
+        "sql": "SET enableNullHandling=true; SELECT * FROM {tbl1} JOIN {tbl2} ON {tbl1}.intCol1 > {tbl2}.doubleCol1 AND {tbl1}.strCol1 = {tbl2}.strCol1"
+      },
+
+      {
+        "description": "nIntCol1 IS NULL is always false",
+        "comment": "When column level nullability is disabled, V2 considers all columns not nullable",
+        "sql": "SET enableNullHandling=true; SELECT count(*) FROM {tbl1} WHERE nIntCol1 IS NULL",
+        "h2Sql": "SELECT 0"
+      },
+      {
+        "description": "nIntCol1 IS NOT NULL is always true",
+        "comment": "When column level nullability is disabled, V2 considers all columns not nullable",
+        "sql": "SET enableNullHandling=true; SELECT count(*) FROM {tbl1} WHERE nIntCol1 IS NOT NULL",
+        "h2Sql": "SELECT count(*) FROM {tbl1}"
+      },
+      {
+        "description": "nnIntCol1 IS NULL is always false",
+        "sql": "SET enableNullHandling=true; SELECT count(*) FROM {tbl1} WHERE nnIntCol1 IS NULL",
+        "h2Sql": "SELECT 0"
+      },
+      {
+        "description": "nnIntCol1 IS NOT NULL is always true",
+        "sql": "SET enableNullHandling=true; SELECT count(*) FROM {tbl1} WHERE nnIntCol1 IS NOT NULL",
+        "h2Sql": "SELECT count(*) FROM {tbl1}"
+      },
+      {
+        "description": "intCol1 IS NULL is always false",
+        "sql": "SET enableNullHandling=true; SELECT count(*) FROM {tbl1} WHERE intCol1 IS NULL",
+        "h2Sql": "SELECT 0"
+      },
+      {
+        "description": "intCol1 IS NULL is always true",
+        "sql": "SET enableNullHandling=true; SELECT count(*) FROM {tbl1} WHERE intCol1 IS NOT NULL",
+        "h2Sql": "SELECT count(*) FROM {tbl1}"
+      },
+
+      {
+        "description": "Leaf stages may return nulls",
+        "sql": "SET enableNullHandling=true; SELECT nStrCol2 FROM {tbl1} WHERE nIntCol1 = 4 and nStrCol2 IS NOT NULL",
+        "h2Sql": "SELECT null"
+      },
+
+      {
+        "description": "When projected with enableNullHandling=true, intCol1 is considered null",
+        "sql": "SET enableNullHandling=true; SELECT intCol1 FROM {tbl1} WHERE strCol1 = 'bob'",
+        "h2Sql": "SELECT null FROM {tbl1} WHERE strCol1 = 'bob'"
+      },
+      {
+        "description": "When projected with enableNullHandling=false, intCol1 is considered -2147483648",
+        "sql": "SET enableNullHandling=false; SELECT intCol1 FROM {tbl1} WHERE strCol1 = 'bob'",
+        "h2Sql": "SELECT -2147483648 FROM {tbl1} WHERE strCol1 = 'bob'"
+      },
+      {
+        "description": "When projected with enableNullHandling=true, nIntCol1 is considered null",
+        "sql": "SET enableNullHandling=true; SELECT nIntCol1 FROM {tbl1} WHERE strCol1 = 'bob'",
+        "h2Sql": "SELECT null FROM {tbl1} WHERE strCol1 = 'bob'"
+      },
+      {
+        "description": "When projected with enableNullHandling=false, nIntCol1 is considered -2147483648",
+        "sql": "SET enableNullHandling=false; SELECT nIntCol1 FROM {tbl1} WHERE strCol1 = 'bob'",
+        "h2Sql": "SELECT -2147483648 FROM {tbl1} WHERE strCol1 = 'bob'"
+      },
+      {
+        "description": "When projected with enableNullHandling=true, nnIntCol1 is considered null",
+        "sql": "SET enableNullHandling=true; SELECT nnIntCol1 FROM {tbl1} WHERE strCol1 = 'bob'",
+        "h2Sql": "SELECT null FROM {tbl1} WHERE strCol1 = 'bob'"
+      },
+      {
+        "description": "When projected with enableNullHandling=false, nnIntCol1 is considered -2147483648",
+        "sql": "SET enableNullHandling=false; SELECT nnIntCol1 FROM {tbl1} WHERE strCol1 = 'bob'",
+        "h2Sql": "SELECT -2147483648 FROM {tbl1} WHERE strCol1 = 'bob'"
+      },
+
+      {
+        "description": "aggregate with null checkers",
+        "sql": "SET enableNullHandling=true; SELECT SUM(intCol1) FROM {tbl1} WHERE intCol1 IS NOT NULL GROUP BY strCol2"
+      },
+
+      {
+        "description": "null checks in aggregate are skipped in implicit nullable columns",
+        "sql": "SET enableNullHandling=true; SELECT strCol2, COUNT(*) FROM {tbl1} WHERE intCol1 IS NOT NULL GROUP BY strCol2",
+        "h2Sql": "SELECT strCol2, COUNT(*) FROM {tbl1} GROUP BY strCol2"
+      },
+      {
+        "description": "null checks in aggregate are skipped in explicit nullable columns",
+        "sql": "SET enableNullHandling=true; SELECT strCol2, COUNT(*) FROM {tbl1} WHERE nIntCol1 IS NOT NULL GROUP BY strCol2",
+        "h2Sql": "SELECT strCol2, COUNT(*) FROM {tbl1} GROUP BY strCol2"
+      },
+      {
+        "description": "null checks in aggregate are skipped in not nullable columns",
+        "comment": "Given nnIntCol1 is not nullable, V2 removes the IS NOT NULL predicate",
+        "sql": "SET enableNullHandling=true; SELECT strCol2, COUNT(*) FROM {tbl1} WHERE nnIntCol1 IS NOT NULL GROUP BY strCol2",
+        "h2Sql": "SELECT strCol2, COUNT(*) FROM {tbl1} GROUP BY strCol2"
+      },
+
+      {
+        "description": "aggregate with null checkers on key from group set are skipped on not nullable columns",
+        "comment": "Given strCol2 is not explicitly nullable, V2 returns empty set",
+        "sql": "SET enableNullHandling=true; SELECT COUNT(*) FROM {tbl1} WHERE nnStrCol2 IS NULL GROUP BY strCol2",
+        "h2Sql": "SELECT 0 FROM {tbl1} WHERE false GROUP BY strCol2"
+      },
+      {
+        "description": "aggregate with null checkers on key from group set are skipped on nullable columns",
+        "sql": "SET enableNullHandling=true; SELECT COUNT(*) FROM {tbl1} WHERE nStrCol2 IS NULL GROUP BY nStrCol2",
+        "h2Sql": "SELECT 0 FROM {tbl1} WHERE false GROUP BY strCol2"
+      }
+    ]
+  },
+  "column_level_null_handling": {
+    "extraProps": {
+      "enableColumnBasedNullHandling": true
+    },
+    "tables": {
+      "tbl1" : {
+        "schema": [
+          {"name": "strCol1", "type": "STRING"},
+          {"name": "intCol1", "type": "INT"},
+          {"name": "nIntCol1", "type": "INT", "notNull": false},
+          {"name": "nnIntCol1", "type": "INT", "notNull": true},
+          {"name": "strCol2", "type": "STRING"},
+          {"name": "nStrCol2", "type": "STRING", "notNull": false},
+          {"name": "nnStrCol2", "type": "STRING", "notNull": true}
+        ],
+        "inputs": [
+          ["foo", 1, 1, 1, "foo", "foo", "foo"],
+          ["bar", 2, 2, 2, "alice", "alice", "alice"],
+          ["alice", 2, 2, 2, null, null, null],
+          [null, 4, 4, 4, null, null, null],
+          ["bob", null, null, null, null, null, null]
+        ]
+      },
+      "tbl2" : {
+        "schema": [
+          {"name": "strCol1", "type": "STRING"},
+          {"name": "strCol2", "type": "STRING", "notNull": false},
+          {"name": "intCol1", "type": "INT"},
+          {"name": "doubleCol1", "type": "DOUBLE", "notNull": false}
+        ],
+        "inputs": [
+          ["foo", "bob", 3, 3.1416],
+          ["alice", "alice", 4, 2.7183],
+          ["bob", null, 5, null]
+        ]
+      },
+      "tbl3" : {
+        "schema": [
+          {"name": "strCol1", "type": "STRING"},
+          {"name": "intCol1", "type": "INT"},
+          {"name": "strCol2", "type": "STRING"}
+        ],
+        "inputs": [
+          ["foo", 1, "foo"],
+          ["bar", 2, "foo"]
+        ]
+      }
+    },
+    "queries": [
+      {
+        "description": "join 3 tables, mixed join conditions with null non-matching",
+        "sql": "SET enableNullHandling=true; SELECT * FROM {tbl1} JOIN {tbl2} ON {tbl1}.intCol1 > {tbl2}.doubleCol1 JOIN {tbl3} ON {tbl1}.strCol1 = {tbl3}.strCol2"
+      },
+      {
+        "description": "join 3 tables, mixed join condition with null matching",
+        "sql": "SET enableNullHandling=true; SELECT * FROM {tbl1} JOIN {tbl2} ON {tbl1}.intCol1 != {tbl2}.doubleCol1 JOIN {tbl3} ON {tbl1}.strCol1 = {tbl3}.strCol2"
+      },
+      {
+        "description": "join 2 tables, mixed join conditions of null matching or non-matching",
+        "sql": "SET enableNullHandling=true; SELECT * FROM {tbl1} JOIN {tbl2} ON {tbl1}.intCol1 > {tbl2}.doubleCol1 AND {tbl1}.strCol1 = {tbl2}.strCol1"
+      },
+      {
+        "description": "aggregate with null checkers",
+        "sql": "SET enableNullHandling=true; SELECT SUM(intCol1) FROM {tbl1} WHERE intCol1 IS NOT NULL GROUP BY strCol2"
+      },
+      {
+        "description": "aggregate with null checkers",
+        "sql": "SET enableNullHandling=true; SELECT COUNT(*) FROM {tbl1} WHERE intCol1 IS NOT NULL GROUP BY strCol2"
+      },
+      {
+        "description": "aggregate with null checkers on key from group set",
+        "sql": "SET enableNullHandling=true; SELECT COUNT(*) FROM {tbl1} WHERE strCol2 IS NULL GROUP BY strCol2"
+      },
+      {
+        "description": "select only with NULL handling",
+        "sql": "SET enableNullHandling=true; SELECT strCol1, intCol1, nIntCol1, nnIntCol1, strCol2, nStrCol2, nnStrCol2 FROM {tbl1} WHERE nStrCol2 IS NULL AND nIntCol1 IS NOT NULL",
+        "h2Sql": "SELECT strCol1, intCol1, nIntCol1, nnIntCol1, strCol2, nStrCol2, 'null' FROM {tbl1} WHERE nStrCol2 IS NULL AND nIntCol1 IS NOT NULL"
+      },
+
+      {
+        "description": "Leaf stages should not return nulls",
+        "comment": "Result is the string 'null', not the value null",
+        "sql": "SET enableNullHandling=true; SELECT nnStrCol2 FROM {tbl1} WHERE nIntCol1 = 4",
+        "h2Sql": "SELECT 'null'"
+      },
+
+      {
+        "description": "nIntCol1 IS NULL is honored",
+        "sql": "SET enableNullHandling=true; SELECT count(*) FROM {tbl1} WHERE nIntCol1 IS NULL"
+      },
+      {
+        "description": "nIntCol1 IS NOT NULL is honored",
+        "comment": "When column level nullability is disabled, V2 considers all columns not nullable",
+        "sql": "SET enableNullHandling=true; SELECT count(*) FROM {tbl1} WHERE nIntCol1 IS NOT NULL"
+      },
+      {
+        "description": "nnIntCol1 IS NULL is always false",
+        "sql": "SET enableNullHandling=true; SELECT count(*) FROM {tbl1} WHERE nnIntCol1 IS NULL",
+        "h2Sql": "SELECT 0"
+      },
+      {
+        "description": "nnIntCol1 IS NOT NULL is always true",
+        "sql": "SET enableNullHandling=true; SELECT count(*) FROM {tbl1} WHERE nnIntCol1 IS NOT NULL",
+        "h2Sql": "SELECT count(*) FROM {tbl1}"
+      },
+      {
+        "description": "(IS NULL) Column with no explicit nullability are treated as nullable",
+        "sql": "SET enableNullHandling=true; SELECT count(*) FROM {tbl1} WHERE intCol1 IS NULL"
+      },
+      {
+        "description": "(IS NOT NULL) Column with no explicit nullability are treated as nullable",
+        "sql": "SET enableNullHandling=true; SELECT count(*) FROM {tbl1} WHERE intCol1 IS NOT NULL"
+      },
+
+      {
+        "description": "aggregate with null checkers on key from group set are skipped on not nullable columns",
+        "comment": "Given strCol2 is not explicitly nullable, V2 considers the where predicate always false",
+        "sql": "SET enableNullHandling=true; SELECT COUNT(*) FROM {tbl1} WHERE nnStrCol2 IS NULL GROUP BY strCol2",
+        "h2Sql": "SELECT COUNT(*) FROM {tbl1} WHERE false GROUP BY strCol2"
+      },
+      {
+        "description": "aggregate with null checkers on key from group set are honored on nullable columns",
+        "sql": "SET enableNullHandling=true; SELECT COUNT(*) FROM {tbl1} WHERE nStrCol2 IS NULL GROUP BY nStrCol2"
+      },
+
+      {
+        "description": "null checks in aggregate are honored in implicit nullable columns",
+        "sql": "SET enableNullHandling=true; SELECT strCol2, COUNT(*) FROM {tbl1} WHERE intCol1 IS NOT NULL GROUP BY strCol2"
+      },
+      {
+        "description": "null checks in aggregate are honored in explicit nullable columns",
+        "sql": "SET enableNullHandling=true; SELECT strCol2, COUNT(*) FROM {tbl1} WHERE nIntCol1 IS NOT NULL GROUP BY strCol2"
+      },
+      {
+        "description": "null checks in aggregate are skipped in not nullable columns",
+        "comment": "Given nnIntCol1 is not nullable, V2 removes the IS NOT NULL predicate",
+        "sql": "SET enableNullHandling=true; SELECT strCol2, COUNT(*) FROM {tbl1} WHERE nnIntCol1 IS NOT NULL GROUP BY strCol2",
+        "h2Sql": "SELECT strCol2, COUNT(*) FROM {tbl1} GROUP BY strCol2"
+      },
+
+      {
+        "description": "When projected with enableNullHandling=true, intCol1 is considered null",
+        "sql": "SET enableNullHandling=true; SELECT intCol1 FROM {tbl1} WHERE strCol1 = 'bob'",
+        "h2Sql": "SELECT null FROM {tbl1} WHERE strCol1 = 'bob'"
+      },
+      {
+        "description": "When projected with enableNullHandling=false, intCol1 is considered -2147483648",
+        "sql": "SET enableNullHandling=false; SELECT intCol1 FROM {tbl1} WHERE strCol1 = 'bob'",
+        "h2Sql": "SELECT -2147483648 FROM {tbl1} WHERE strCol1 = 'bob'"
+      },
+      {
+        "description": "When projected with enableNullHandling=true, nIntCol1 is considered null",
+        "sql": "SET enableNullHandling=true; SELECT nIntCol1 FROM {tbl1} WHERE strCol1 = 'bob'",
+        "h2Sql": "SELECT null FROM {tbl1} WHERE strCol1 = 'bob'"
+      },
+      {
+        "description": "When projected with enableNullHandling=false, nIntCol1 is considered -2147483648",
+        "sql": "SET enableNullHandling=false; SELECT nIntCol1 FROM {tbl1} WHERE strCol1 = 'bob'",
+        "h2Sql": "SELECT -2147483648 FROM {tbl1} WHERE strCol1 = 'bob'"
+      },
+      {
+        "description": "When projected with enableNullHandling=true, nnIntCol1 is considered -2147483648",
+        "sql": "SET enableNullHandling=true; SELECT nnIntCol1 FROM {tbl1} WHERE strCol1 = 'bob'",
+        "h2Sql": "SELECT -2147483648 FROM {tbl1} WHERE strCol1 = 'bob'"
+      },
+      {
+        "description": "When projected with enableNullHandling=false, nnIntCol1 is considered -2147483648",
+        "sql": "SET enableNullHandling=false; SELECT nnIntCol1 FROM {tbl1} WHERE strCol1 = 'bob'",
+        "h2Sql": "SELECT -2147483648 FROM {tbl1} WHERE strCol1 = 'bob'"
+      }
+    ]
   }
-}
\ No newline at end of file
+}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentLoader.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentLoader.java
index 2f0ed2131b..359ea2dba8 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentLoader.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentLoader.java
@@ -45,6 +45,7 @@ import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderContext;
 import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderRegistry;
 import org.apache.pinot.segment.spi.store.SegmentDirectory;
 import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths;
+import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.env.PinotConfiguration;
@@ -71,6 +72,18 @@ public class ImmutableSegmentLoader {
     return load(indexDir, defaultIndexLoadingConfig, null, false);
   }
 
+  /**
+   * Loads the segment with specified table config and schema.
+   * This method is used to access the segment without modifying it, i.e. in read-only mode.
+   */
+  public static ImmutableSegment load(File indexDir, ReadMode readMode, TableConfig tableConfig,
+      @Nullable Schema schema)
+      throws Exception {
+    IndexLoadingConfig defaultIndexLoadingConfig = new IndexLoadingConfig(tableConfig, schema);
+    defaultIndexLoadingConfig.setReadMode(readMode);
+    return load(indexDir, defaultIndexLoadingConfig, schema, false);
+  }
+
   /**
    * Loads the segment with empty schema but a specified IndexLoadingConfig.
    * This method modifies the segment like to convert segment format, add or remove indices.
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
index c47d420c82..36f5d6e81c 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
@@ -256,8 +256,8 @@ public class MutableSegmentImpl implements MutableSegment {
     }
 
     Set<IndexType> specialIndexes =
-        Sets.newHashSet(StandardIndexes.dictionary(), // dictionaries implement other contract
-            StandardIndexes.nullValueVector()); // null value vector implement other contract
+        Sets.newHashSet(StandardIndexes.dictionary(), // dictionary implements other contract
+            StandardIndexes.nullValueVector()); // null value vector implements other contract
 
     // Initialize for each column
     for (FieldSpec fieldSpec : _physicalFieldSpecs) {
@@ -336,7 +336,7 @@ public class MutableSegmentImpl implements MutableSegment {
       }
 
       // Null value vector
-      MutableNullValueVector nullValueVector = _nullHandlingEnabled ? new MutableNullValueVector() : null;
+      MutableNullValueVector nullValueVector = isNullable(fieldSpec) ? new MutableNullValueVector() : null;
 
       Map<IndexType, MutableIndex> mutableIndexes = new HashMap<>();
       for (IndexType<?, ?, ?> indexType : IndexService.getInstance().getAllIndexes()) {
@@ -401,6 +401,10 @@ public class MutableSegmentImpl implements MutableSegment {
     }
   }
 
+  private boolean isNullable(FieldSpec fieldSpec) {
+    return _schema.isEnableColumnBasedNullHandling() ? fieldSpec.isNullable() : _nullHandlingEnabled;
+  }
+
   private <C extends IndexConfig> void addMutableIndex(Map<IndexType, MutableIndex> mutableIndexes,
       IndexType<C, ?, ?> indexType, MutableIndexContext context, FieldIndexConfigs indexConfigs) {
     MutableIndex mutableIndex = indexType.createMutableIndex(context, indexConfigs.getConfig(indexType));
@@ -658,7 +662,7 @@ public class MutableSegmentImpl implements MutableSegment {
       }
 
       // Update the null value vector even if a null value is somehow produced
-      if (_nullHandlingEnabled && row.isNullValue(column)) {
+      if (indexContainer._nullValueVector != null && row.isNullValue(column)) {
         indexContainer._nullValueVector.setNull(docId);
       }
 
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
index 10e0a5c1d0..3a7a96415a 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
@@ -105,7 +105,6 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
   private File _indexDir;
   private int _totalDocs;
   private int _docIdCounter;
-  private boolean _nullHandlingEnabled;
 
   @Override
   public void init(SegmentGeneratorConfig segmentCreationSpec, SegmentIndexCreationInfo segmentIndexCreationInfo,
@@ -133,10 +132,7 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
 
     for (String columnName : indexConfigs.keySet()) {
       FieldSpec fieldSpec = schema.getFieldSpecFor(columnName);
-      if (fieldSpec == null) {
-        Preconditions.checkState(schema.hasColumn(columnName),
-            "Cannot create index for column: %s because it is not in schema", columnName);
-      }
+      Preconditions.checkState(fieldSpec != null, "Failed to find column: %s in the schema", columnName);
       if (fieldSpec.isVirtualColumn()) {
         LOGGER.warn("Ignoring index creation for virtual column " + columnName);
         continue;
@@ -207,7 +203,6 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
       if (oldFwdCreator != null) {
         Object fakeForwardValue = calculateRawValueForTextIndex(dictEnabledColumn, config, fieldSpec);
         if (fakeForwardValue != null) {
-          @SuppressWarnings("unchecked")
           ForwardIndexCreator castedOldFwdCreator = (ForwardIndexCreator) oldFwdCreator;
           SameValueForwardIndexCreator fakeValueFwdCreator =
               new SameValueForwardIndexCreator(fakeForwardValue, castedOldFwdCreator);
@@ -218,9 +213,8 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
     }
 
     // Although NullValueVector is implemented as an index, it needs to be treated in a different way than other indexes
-    _nullHandlingEnabled = _config.isNullHandlingEnabled();
-    if (_nullHandlingEnabled) {
-      for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
+    for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
+      if (isNullable(fieldSpec)) {
         // Initialize Null value vector map
         String columnName = fieldSpec.getName();
         _nullValueVectorCreatorMap.put(columnName, new NullValueVectorCreator(_indexDir, columnName));
@@ -228,6 +222,10 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
     }
   }
 
+  private boolean isNullable(FieldSpec fieldSpec) {
+    return _schema.isEnableColumnBasedNullHandling() ? fieldSpec.isNullable() : _config.isNullHandlingEnabled();
+  }
+
   private FieldIndexConfigs adaptConfig(String columnName, FieldIndexConfigs config,
       ColumnIndexCreationInfo columnIndexCreationInfo, SegmentGeneratorConfig segmentCreationSpec) {
     FieldIndexConfigs.Builder builder = new FieldIndexConfigs.Builder(config);
@@ -325,13 +323,10 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
       }
     }
 
-    if (_nullHandlingEnabled) {
-      for (Map.Entry<String, NullValueVectorCreator> entry : _nullValueVectorCreatorMap.entrySet()) {
-        String columnName = entry.getKey();
-        // If row has null value for given column name, add to null value vector
-        if (row.isNullValue(columnName)) {
-          _nullValueVectorCreatorMap.get(columnName).setNull(_docIdCounter);
-        }
+    for (Map.Entry<String, NullValueVectorCreator> entry : _nullValueVectorCreatorMap.entrySet()) {
+      // If row has null value for given column name, add to null value vector
+      if (row.isNullValue(entry.getKey())) {
+        entry.getValue().setNull(_docIdCounter);
       }
     }
 
@@ -369,7 +364,8 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
 
   private void indexColumnValue(PinotSegmentColumnReader colReader,
       Map<IndexType<?, ?, ?>, IndexCreator> creatorsByIndex, String columnName, FieldSpec fieldSpec,
-      SegmentDictionaryCreator dictionaryCreator, int sourceDocId, int onDiskDocPos, NullValueVectorCreator nullVec)
+      SegmentDictionaryCreator dictionaryCreator, int sourceDocId, int onDiskDocPos,
+      @Nullable NullValueVectorCreator nullVec)
       throws IOException {
     Object columnValueToIndex = colReader.getValue(sourceDocId);
     if (columnValueToIndex == null) {
@@ -382,7 +378,7 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
       indexMultiValueRow(dictionaryCreator, (Object[]) columnValueToIndex, creatorsByIndex);
     }
 
-    if (_nullHandlingEnabled) {
+    if (nullVec != null) {
       if (colReader.isNull(sourceDocId)) {
         nullVec.setNull(onDiskDocPos);
       }
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java
index 7fe5bd3385..bdd54224e0 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java
@@ -133,6 +133,10 @@ public class IndexLoadingConfig {
     this(instanceDataManagerConfig, tableConfig, null);
   }
 
+  public IndexLoadingConfig(TableConfig tableConfig, @Nullable Schema schema) {
+    extractFromTableConfigAndSchema(tableConfig, schema);
+  }
+
   public IndexLoadingConfig() {
   }
 
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
index 9fc15e86b1..acfd4827c6 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
@@ -530,9 +530,7 @@ public abstract class BaseDefaultColumnHandler implements DefaultColumnHandler {
       }
     }
 
-    if (_indexLoadingConfig.getTableConfig() != null
-        && _indexLoadingConfig.getTableConfig().getIndexingConfig() != null
-        && _indexLoadingConfig.getTableConfig().getIndexingConfig().isNullHandlingEnabled()) {
+    if (isNullable(fieldSpec)) {
       if (!_segmentWriter.hasIndexFor(column, StandardIndexes.nullValueVector())) {
         try (NullValueVectorCreator nullValueVectorCreator =
             new NullValueVectorCreator(_indexDir, fieldSpec.getName())) {
@@ -550,6 +548,16 @@ public abstract class BaseDefaultColumnHandler implements DefaultColumnHandler {
         fieldSpec, true/*hasDictionary*/, dictionaryElementSize);
   }
 
+  private boolean isNullable(FieldSpec fieldSpec) {
+    if (_schema.isEnableColumnBasedNullHandling()) {
+      return fieldSpec.isNullable();
+    } else {
+      return _indexLoadingConfig.getTableConfig() != null
+          && _indexLoadingConfig.getTableConfig().getIndexingConfig() != null
+          && _indexLoadingConfig.getTableConfig().getIndexingConfig().isNullHandlingEnabled();
+    }
+  }
+
   /**
    * Helper method to create the V1 indices (dictionary and forward index) for a column with derived values.
    * TODO:
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/nullvalue/NullValueIndexType.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/nullvalue/NullValueIndexType.java
index c468870821..9c01b3a321 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/nullvalue/NullValueIndexType.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/nullvalue/NullValueIndexType.java
@@ -19,8 +19,10 @@
 
 package org.apache.pinot.segment.local.segment.index.nullvalue;
 
+import com.google.common.collect.Maps;
 import java.io.File;
 import java.io.IOException;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -33,15 +35,16 @@ import org.apache.pinot.segment.spi.creator.IndexCreationContext;
 import org.apache.pinot.segment.spi.index.AbstractIndexType;
 import org.apache.pinot.segment.spi.index.ColumnConfigDeserializer;
 import org.apache.pinot.segment.spi.index.FieldIndexConfigs;
-import org.apache.pinot.segment.spi.index.IndexConfigDeserializer;
 import org.apache.pinot.segment.spi.index.IndexHandler;
 import org.apache.pinot.segment.spi.index.IndexReaderFactory;
+import org.apache.pinot.segment.spi.index.IndexType;
 import org.apache.pinot.segment.spi.index.StandardIndexes;
 import org.apache.pinot.segment.spi.index.reader.NullValueVectorReader;
 import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
 import org.apache.pinot.segment.spi.store.SegmentDirectory;
 import org.apache.pinot.spi.config.table.IndexConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 
 
@@ -67,7 +70,7 @@ public class NullValueIndexType extends AbstractIndexType<IndexConfig, NullValue
 
   @Override
   public IndexConfig getDefaultConfig() {
-    return IndexConfig.DISABLED;
+    return IndexConfig.ENABLED;
   }
 
   @Override
@@ -77,11 +80,27 @@ public class NullValueIndexType extends AbstractIndexType<IndexConfig, NullValue
 
   @Override
   public ColumnConfigDeserializer<IndexConfig> createDeserializer() {
-    return IndexConfigDeserializer.ifIndexingConfig(
-        IndexConfigDeserializer.alwaysCall((TableConfig tableConfig, Schema schema) ->
-            tableConfig.getIndexingConfig().isNullHandlingEnabled()
-                ? IndexConfig.ENABLED
-                : IndexConfig.DISABLED));
+    return (TableConfig tableConfig, Schema schema) -> {
+      Collection<FieldSpec> allFieldSpecs = schema.getAllFieldSpecs();
+      Map<String, IndexConfig> configMap = Maps.newHashMapWithExpectedSize(allFieldSpecs.size());
+
+      boolean enableColumnBasedNullHandling = schema.isEnableColumnBasedNullHandling();
+      boolean nullHandlingEnabled = tableConfig.getIndexingConfig() != null
+          && tableConfig.getIndexingConfig().isNullHandlingEnabled();
+
+      for (FieldSpec fieldSpec : allFieldSpecs) {
+        IndexConfig indexConfig;
+        boolean enabled;
+        if (enableColumnBasedNullHandling) {
+          enabled = fieldSpec.isNullable();
+        } else {
+          enabled = nullHandlingEnabled;
+        }
+        indexConfig = enabled ? IndexConfig.ENABLED : IndexConfig.DISABLED;
+        configMap.put(fieldSpec.getName(), indexConfig);
+      }
+      return configMap;
+    };
   }
 
   public NullValueVectorCreator createIndexCreator(File indexDir, String columnName) {
@@ -116,10 +135,14 @@ public class NullValueIndexType extends AbstractIndexType<IndexConfig, NullValue
     public NullValueVectorReader createIndexReader(SegmentDirectory.Reader segmentReader,
         FieldIndexConfigs fieldIndexConfigs, ColumnMetadata metadata)
           throws IOException {
-      if (!segmentReader.hasIndexFor(metadata.getColumnName(), StandardIndexes.nullValueVector())) {
+      IndexType<IndexConfig, NullValueVectorReader, ?> indexType = StandardIndexes.nullValueVector();
+      if (fieldIndexConfigs.getConfig(indexType).isDisabled()) {
+        return null;
+      }
+      if (!segmentReader.hasIndexFor(metadata.getColumnName(), indexType)) {
         return null;
       }
-      PinotDataBuffer buffer = segmentReader.getIndexFor(metadata.getColumnName(), StandardIndexes.nullValueVector());
+      PinotDataBuffer buffer = segmentReader.getIndexFor(metadata.getColumnName(), indexType);
       return new NullValueVectorReaderImpl(buffer);
     }
   }
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/nullvalue/NullValueIndexTypeTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/nullvalue/NullValueIndexTypeTest.java
new file mode 100644
index 0000000000..e8a15fc82f
--- /dev/null
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/nullvalue/NullValueIndexTypeTest.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.index.nullvalue;
+
+import org.apache.pinot.segment.local.segment.index.AbstractSerdeIndexContract;
+import org.apache.pinot.segment.spi.index.StandardIndexes;
+import org.apache.pinot.spi.config.table.IndexConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.testng.Assert;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+
+public class NullValueIndexTypeTest {
+
+  @DataProvider(name = "provideCases")
+  public Object[][] provideCases() {
+    return new Object[][]{
+        // This is the semantic table, assuming a null bitmap buffer exists in the segment
+        // enableColumnBasedNullHandling | table nullable | column nullable | expected index config
+        new Object[]{false, false, false, IndexConfig.DISABLED}, new Object[]{
+        false, false, true, IndexConfig.DISABLED
+    }, new Object[]{false, true, false, IndexConfig.ENABLED}, new Object[]{
+        false, true, true, IndexConfig.ENABLED
+    },
+
+        new Object[]{true, false, false, IndexConfig.DISABLED}, new Object[]{true, false, true, IndexConfig.ENABLED},
+        new Object[]{true, true, false, IndexConfig.DISABLED}, new Object[]{true, true, true, IndexConfig.ENABLED}
+    };
+  }
+
+  public static class ConfTest extends AbstractSerdeIndexContract {
+
+    protected void assertEquals(IndexConfig expected) {
+      Assert.assertEquals(getActualConfig("dimStr", StandardIndexes.nullValueVector()), expected);
+    }
+
+    @Test(dataProvider = "provideCases", dataProviderClass = NullValueIndexTypeTest.class)
+    public void isEnabledWhenNullable(boolean enableColumnBasedNullHandling, boolean tableNullable,
+        boolean fieldNullable, IndexConfig expected) {
+      _schema.setEnableColumnBasedNullHandling(enableColumnBasedNullHandling);
+      _tableConfig.getIndexingConfig().setNullHandlingEnabled(tableNullable);
+
+      FieldSpec fieldSpec = _schema.getFieldSpecFor("dimStr");
+      fieldSpec.setNullable(fieldNullable);
+
+      assertEquals(expected);
+    }
+  }
+}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java
index 978862e600..a3dc1bbef5 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java
@@ -44,6 +44,7 @@ import org.apache.pinot.spi.utils.TimestampUtils;
  * <p>- <code>IsSingleValueField</code>: single-value or multi-value field.
  * <p>- <code>DefaultNullValue</code>: when no value found for this field, use this value. Stored in string format.
  * <p>- <code>VirtualColumnProvider</code>: the virtual column provider to use for this field.
+ * <p>- <code>NotNull</code>: whether the column accepts nulls or not. Defaults to false.
  */
 @SuppressWarnings("unused")
 public abstract class FieldSpec implements Comparable<FieldSpec>, Serializable {
@@ -99,6 +100,7 @@ public abstract class FieldSpec implements Comparable<FieldSpec>, Serializable {
   protected String _name;
   protected DataType _dataType;
   protected boolean _isSingleValueField = true;
+  protected boolean _notNull = false;
 
   // NOTE: This only applies to STRING column, which is the max number of characters
   private int _maxLength = DEFAULT_MAX_LENGTH;
@@ -300,6 +302,30 @@ public abstract class FieldSpec implements Comparable<FieldSpec>, Serializable {
     _transformFunction = transformFunction;
   }
 
+  /**
+   * Returns whether the column is nullable or not.
+   */
+  @JsonIgnore
+  public boolean isNullable() {
+    return !_notNull;
+  }
+
+  /**
+   * @see #isNullable()
+   */
+  @JsonIgnore
+  public void setNullable(Boolean nullable) {
+    _notNull = !nullable;
+  }
+
+  public boolean isNotNull() {
+    return _notNull;
+  }
+
+  public void setNotNull(boolean notNull) {
+    _notNull = notNull;
+  }
+
   /**
    * Returns the {@link ObjectNode} representing the field spec.
    * <p>Only contains fields with non-default value.
@@ -317,6 +343,7 @@ public abstract class FieldSpec implements Comparable<FieldSpec>, Serializable {
     }
     appendDefaultNullValue(jsonObject);
     appendTransformFunction(jsonObject);
+    jsonObject.put("notNull", _notNull);
     return jsonObject;
   }
 
@@ -381,7 +408,8 @@ public abstract class FieldSpec implements Comparable<FieldSpec>, Serializable {
         .isEqual(_isSingleValueField, that._isSingleValueField) && EqualityUtils
         .isEqual(getStringValue(_defaultNullValue), getStringValue(that._defaultNullValue)) && EqualityUtils
         .isEqual(_maxLength, that._maxLength) && EqualityUtils.isEqual(_transformFunction, that._transformFunction)
-        && EqualityUtils.isEqual(_virtualColumnProvider, that._virtualColumnProvider);
+        && EqualityUtils.isEqual(_virtualColumnProvider, that._virtualColumnProvider)
+        && EqualityUtils.isEqual(_notNull, that._notNull);
   }
 
   @Override
@@ -393,6 +421,7 @@ public abstract class FieldSpec implements Comparable<FieldSpec>, Serializable {
     result = EqualityUtils.hashCodeOf(result, _maxLength);
     result = EqualityUtils.hashCodeOf(result, _transformFunction);
     result = EqualityUtils.hashCodeOf(result, _virtualColumnProvider);
+    result = EqualityUtils.hashCodeOf(result, _notNull);
     return result;
   }
 
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/Schema.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/Schema.java
index 4bd63a16df..63add34987 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/Schema.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/Schema.java
@@ -37,6 +37,7 @@ import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
 import javax.annotation.Nullable;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
@@ -64,6 +65,7 @@ public final class Schema implements Serializable {
   private static final Logger LOGGER = LoggerFactory.getLogger(Schema.class);
 
   private String _schemaName;
+  private boolean _enableColumnBasedNullHandling;
   private final List<DimensionFieldSpec> _dimensionFieldSpecs = new ArrayList<>();
   private final List<MetricFieldSpec> _metricFieldSpecs = new ArrayList<>();
   private TimeFieldSpec _timeFieldSpec;
@@ -165,6 +167,14 @@ public final class Schema implements Serializable {
     _schemaName = schemaName;
   }
 
+  public boolean isEnableColumnBasedNullHandling() {
+    return _enableColumnBasedNullHandling;
+  }
+
+  public void setEnableColumnBasedNullHandling(boolean enableColumnBasedNullHandling) {
+    _enableColumnBasedNullHandling = enableColumnBasedNullHandling;
+  }
+
   public List<String> getPrimaryKeyColumns() {
     return _primaryKeyColumns;
   }
@@ -427,6 +437,7 @@ public final class Schema implements Serializable {
   public ObjectNode toJsonObject() {
     ObjectNode jsonObject = JsonUtils.newObjectNode();
     jsonObject.put("schemaName", _schemaName);
+    jsonObject.set("enableColumnBasedNullHandling", JsonUtils.objectToJsonNode(_enableColumnBasedNullHandling));
     if (!_dimensionFieldSpecs.isEmpty()) {
       ArrayNode jsonArray = JsonUtils.newArrayNode();
       for (DimensionFieldSpec dimensionFieldSpec : _dimensionFieldSpecs) {
@@ -520,6 +531,58 @@ public final class Schema implements Serializable {
       return this;
     }
 
+    public SchemaBuilder setEnableColumnBasedNullHandling(boolean enableColumnBasedNullHandling) {
+      _schema.setEnableColumnBasedNullHandling(enableColumnBasedNullHandling);
+      return this;
+    }
+
+    public SchemaBuilder addField(FieldSpec fieldSpec) {
+      _schema.addField(fieldSpec);
+      return this;
+    }
+
+    public SchemaBuilder addMetricField(String name, DataType dataType) {
+      return addMetricField(name, dataType, ignore -> {
+      });
+    }
+
+    public SchemaBuilder addMetricField(String name, DataType dataType, Consumer<MetricFieldSpec> customizer) {
+      MetricFieldSpec fieldSpec = new MetricFieldSpec();
+      return addField(fieldSpec, name, dataType, customizer);
+    }
+
+    public SchemaBuilder addDimensionField(String name, DataType dataType) {
+      return addDimensionField(name, dataType, ignore -> {
+      });
+    }
+
+    public SchemaBuilder addDimensionField(String name, DataType dataType, Consumer<DimensionFieldSpec> customizer) {
+      DimensionFieldSpec fieldSpec = new DimensionFieldSpec();
+      return addField(fieldSpec, name, dataType, customizer);
+    }
+
+    public SchemaBuilder addDateTimeField(String name, DataType dataType, String format, String granularity) {
+      return addDateTimeField(name, dataType, format, granularity, ignore -> {
+      });
+    }
+
+    public SchemaBuilder addDateTimeField(String name, DataType dataType, String format, String granularity,
+        Consumer<DateTimeFieldSpec> customizer) {
+      DateTimeFieldSpec fieldSpec = new DateTimeFieldSpec();
+      fieldSpec.setFormat(format);
+      fieldSpec.setGranularity(granularity);
+      return addField(fieldSpec, name, dataType, customizer);
+    }
+
+    private <E extends FieldSpec> SchemaBuilder addField(E fieldSpec, String name, DataType dataType,
+        Consumer<E> customizer) {
+      fieldSpec.setName(name);
+      fieldSpec.setDataType(dataType);
+      customizer.accept(fieldSpec);
+      _schema.addField(fieldSpec);
+      return this;
+    }
+
     /**
      * Add single value dimensionFieldSpec
      */
@@ -674,7 +737,8 @@ public final class Schema implements Serializable {
         && EqualityUtils.isEqual(_timeFieldSpec, that._timeFieldSpec)
         && EqualityUtils.isEqualIgnoreOrder(_dateTimeFieldSpecs, that._dateTimeFieldSpecs)
         && EqualityUtils.isEqualIgnoreOrder(_complexFieldSpecs, that._complexFieldSpecs)
-        && EqualityUtils.isEqual(_primaryKeyColumns, that._primaryKeyColumns);
+        && EqualityUtils.isEqual(_primaryKeyColumns, that._primaryKeyColumns)
+        && EqualityUtils.isEqual(_enableColumnBasedNullHandling, that._enableColumnBasedNullHandling);
     //@formatter:on
   }
 
@@ -733,6 +797,7 @@ public final class Schema implements Serializable {
     result = EqualityUtils.hashCodeOf(result, _dateTimeFieldSpecs);
     result = EqualityUtils.hashCodeOf(result, _complexFieldSpecs);
     result = EqualityUtils.hashCodeOf(result, _primaryKeyColumns);
+    result = EqualityUtils.hashCodeOf(result, _enableColumnBasedNullHandling);
     return result;
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org