You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2023/11/20 23:00:49 UTC
(pinot) branch master updated: Explicit null handling (#11960)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 35748a013b Explicit null handling (#11960)
35748a013b is described below
commit 35748a013b486b3526f7f6d53e6e1bee930c6e6a
Author: Gonzalo Ortiz Jaureguizar <go...@users.noreply.github.com>
AuthorDate: Tue Nov 21 00:00:44 2023 +0100
Explicit null handling (#11960)
---
.../apache/pinot/common/data/FieldSpecTest.java | 45 +++
.../org/apache/pinot/common/data/SchemaTest.java | 26 ++
.../query/selection/SelectionOperatorUtils.java | 2 +
.../src/test/resources/test_null_handling.schema | 1 -
.../org/apache/pinot/query/type/TypeFactory.java | 63 ++--
.../query/queries/ResourceBasedQueryPlansTest.java | 8 +-
.../apache/pinot/query/type/TypeFactoryTest.java | 199 +++++++++++-
.../query/runtime/queries/QueryRunnerTest.java | 3 +-
.../query/runtime/queries/QueryRunnerTestBase.java | 58 +++-
.../runtime/queries/ResourceBasedQueriesTest.java | 14 +-
.../testutils/MockInstanceDataManagerFactory.java | 8 +-
.../src/test/resources/queries/CountDistinct.json | 8 +-
.../test/resources/queries/MetadataTestQuery.json | 2 +-
.../src/test/resources/queries/NullHandling.json | 338 ++++++++++++++++++++-
.../immutable/ImmutableSegmentLoader.java | 13 +
.../indexsegment/mutable/MutableSegmentImpl.java | 12 +-
.../creator/impl/SegmentColumnarIndexCreator.java | 32 +-
.../segment/index/loader/IndexLoadingConfig.java | 4 +
.../defaultcolumn/BaseDefaultColumnHandler.java | 14 +-
.../index/nullvalue/NullValueIndexType.java | 41 ++-
.../index/nullvalue/NullValueIndexTypeTest.java | 66 ++++
.../java/org/apache/pinot/spi/data/FieldSpec.java | 31 +-
.../java/org/apache/pinot/spi/data/Schema.java | 67 +++-
23 files changed, 962 insertions(+), 93 deletions(-)
diff --git a/pinot-common/src/test/java/org/apache/pinot/common/data/FieldSpecTest.java b/pinot-common/src/test/java/org/apache/pinot/common/data/FieldSpecTest.java
index 75cd4d9cdc..91641e1afa 100644
--- a/pinot-common/src/test/java/org/apache/pinot/common/data/FieldSpecTest.java
+++ b/pinot-common/src/test/java/org/apache/pinot/common/data/FieldSpecTest.java
@@ -18,6 +18,8 @@
*/
package org.apache.pinot.common.data;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import java.io.IOException;
import java.math.BigDecimal;
import java.sql.Timestamp;
import java.util.ArrayList;
@@ -404,4 +406,47 @@ public class FieldSpecTest {
jsonString.append('}');
return jsonString.toString();
}
+
+ @DataProvider(name = "nullableCases")
+ public static Object[][] nullableCases() {
+ return new Object[][] {
+ // declared notNull, returned notNull
+ new Object[] {null},
+ new Object[] {false},
+ new Object[] {true}
+ };
+ }
+
+ @Test(dataProvider = "nullableCases")
+ void testNullability(Boolean declared)
+ throws IOException {
+ boolean expected = declared == Boolean.TRUE;
+ String json;
+ if (declared == null) {
+ json = "{\"name\": \"col1\", \"dataType\":\"BOOLEAN\"}";
+ } else {
+ json = "{\"name\": \"col1\", \"dataType\":\"BOOLEAN\", \"notNull\": " + declared + "}";
+ }
+ DimensionFieldSpec fieldSpec = JsonUtils.stringToObject(json, DimensionFieldSpec.class);
+
+ Assert.assertEquals(fieldSpec.isNotNull(), expected, "Unexpected notNull read when declared as " + declared);
+ Assert.assertEquals(fieldSpec.isNullable(), !expected, "Unexpected nullable read when declared as " + declared);
+ }
+
+ @Test(dataProvider = "nullableCases")
+ void testNullabilityIdempotency(Boolean declared)
+ throws JsonProcessingException {
+ String json;
+ if (declared == null) {
+ json = "{\"name\": \"col1\", \"dataType\":\"BOOLEAN\"}";
+ } else {
+ json = "{\"name\": \"col1\", \"dataType\":\"BOOLEAN\", \"notNull\": " + declared + "}";
+ }
+ DimensionFieldSpec fieldSpec = JsonUtils.stringToObject(json, DimensionFieldSpec.class);
+
+ String serialized = JsonUtils.objectToString(fieldSpec);
+ DimensionFieldSpec deserialized = JsonUtils.stringToObject(serialized, DimensionFieldSpec.class);
+
+ Assert.assertEquals(deserialized, fieldSpec, "Changes detected while checking serialize/deserialize idempotency");
+ }
}
diff --git a/pinot-common/src/test/java/org/apache/pinot/common/data/SchemaTest.java b/pinot-common/src/test/java/org/apache/pinot/common/data/SchemaTest.java
index e2e02ff193..626a091005 100644
--- a/pinot-common/src/test/java/org/apache/pinot/common/data/SchemaTest.java
+++ b/pinot-common/src/test/java/org/apache/pinot/common/data/SchemaTest.java
@@ -18,7 +18,9 @@
*/
package org.apache.pinot.common.data;
+import com.fasterxml.jackson.databind.JsonNode;
import java.io.File;
+import java.io.IOException;
import java.math.BigDecimal;
import java.net.URL;
import java.sql.Timestamp;
@@ -33,6 +35,7 @@ import org.apache.pinot.spi.data.TimeFieldSpec;
import org.apache.pinot.spi.data.TimeGranularitySpec;
import org.apache.pinot.spi.data.TimeGranularitySpec.TimeFormat;
import org.apache.pinot.spi.utils.BytesUtils;
+import org.apache.pinot.spi.utils.JsonUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
@@ -320,6 +323,29 @@ public class SchemaTest {
Assert.assertNotEquals(jsonSchemaToCompare, jsonSchema);
}
+ @Test
+ public void testSerializeDeserializeOptions()
+ throws IOException {
+ String json = "{\n"
+ + " \"primaryKeyColumns\" : null,\n"
+ + " \"timeFieldSpec\" : null,\n"
+ + " \"schemaName\" : null,\n"
+ + " \"enableColumnBasedNullHandling\" : true,\n"
+ + " \"dimensionFieldSpecs\" : [ ],\n"
+ + " \"metricFieldSpecs\" : [ ],\n"
+ + " \"dateTimeFieldSpecs\" : [ ]\n"
+ + "}";
+ JsonNode expectedNode = JsonUtils.stringToJsonNode(json);
+
+ Schema schema = JsonUtils.jsonNodeToObject(expectedNode, Schema.class);
+ Assert.assertTrue(schema.isEnableColumnBasedNullHandling(), "Column null handling should be enabled");
+
+ String serialized = JsonUtils.objectToString(schema);
+ Schema deserialized = JsonUtils.stringToObject(serialized, Schema.class);
+
+ Assert.assertEquals(deserialized, schema, "Changes detected while checking serialize/deserialize idempotency");
+ }
+
@Test
public void testSimpleDateFormat()
throws Exception {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
index 681fbe44b8..9d4bc6dd8b 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
@@ -354,6 +354,8 @@ public class SelectionOperatorUtils {
/**
* Build a {@link DataTable} from a {@link Collection} of selection rows with {@link DataSchema}. (Server side)
+ *
+ * This method is allowed to modify the given rows. Specifically, it may remove nulls cells from it.
*/
public static DataTable getDataTableFromRows(Collection<Object[]> rows, DataSchema dataSchema,
boolean nullHandlingEnabled)
diff --git a/pinot-integration-tests/src/test/resources/test_null_handling.schema b/pinot-integration-tests/src/test/resources/test_null_handling.schema
index 2113389249..1e9c0ec10b 100644
--- a/pinot-integration-tests/src/test/resources/test_null_handling.schema
+++ b/pinot-integration-tests/src/test/resources/test_null_handling.schema
@@ -30,4 +30,3 @@
},
"schemaName": "mytable"
}
-
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/type/TypeFactory.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/type/TypeFactory.java
index cf64492125..ccc0f41a08 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/type/TypeFactory.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/type/TypeFactory.java
@@ -19,6 +19,7 @@
package org.apache.pinot.query.type;
import java.util.Map;
+import java.util.function.Predicate;
import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeSystem;
@@ -31,8 +32,8 @@ import org.apache.pinot.spi.data.Schema;
* Extends Java-base TypeFactory from Calcite.
*
* <p>{@link JavaTypeFactoryImpl} is used here because we are not overriding much of the TypeFactory methods
- * required by Calcite. We will start extending {@link SqlTypeFactoryImpl} or even {@link RelDataTypeFactory}
- * when necessary for Pinot to override such mechanism.
+ * required by Calcite. We will start extending {@link org.apache.calcite.sql.type.SqlTypeFactoryImpl} or even
+ * {@link org.apache.calcite.rel.type.RelDataTypeFactory} when necessary for Pinot to override such mechanism.
*
* <p>Noted that {@link JavaTypeFactoryImpl} is subject to change. Please pay extra attention to this class when
* upgrading Calcite versions.
@@ -45,20 +46,36 @@ public class TypeFactory extends JavaTypeFactoryImpl {
public RelDataType createRelDataTypeFromSchema(Schema schema) {
Builder builder = new Builder(this);
+ Predicate<FieldSpec> isNullable;
+ if (schema.isEnableColumnBasedNullHandling()) {
+ isNullable = FieldSpec::isNullable;
+ } else {
+ isNullable = fieldSpec -> false;
+ }
for (Map.Entry<String, FieldSpec> e : schema.getFieldSpecMap().entrySet()) {
- builder.add(e.getKey(), toRelDataType(e.getValue()));
+ builder.add(e.getKey(), toRelDataType(e.getValue(), isNullable));
}
return builder.build();
}
- private RelDataType toRelDataType(FieldSpec fieldSpec) {
+ private RelDataType toRelDataType(FieldSpec fieldSpec, Predicate<FieldSpec> isNullable) {
+ RelDataType type = createSqlType(getSqlTypeName(fieldSpec));
+ boolean isArray = !fieldSpec.isSingleValueField();
+ if (isArray) {
+ type = createArrayType(type, -1);
+ }
+ if (isNullable.test(fieldSpec)) {
+ type = createTypeWithNullability(type, true);
+ }
+ return type;
+ }
+
+ private SqlTypeName getSqlTypeName(FieldSpec fieldSpec) {
switch (fieldSpec.getDataType()) {
case INT:
- return fieldSpec.isSingleValueField() ? createSqlType(SqlTypeName.INTEGER)
- : createArrayType(createSqlType(SqlTypeName.INTEGER), -1);
+ return SqlTypeName.INTEGER;
case LONG:
- return fieldSpec.isSingleValueField() ? createSqlType(SqlTypeName.BIGINT)
- : createArrayType(createSqlType(SqlTypeName.BIGINT), -1);
+ return SqlTypeName.BIGINT;
// Map float and double to the same RelDataType so that queries like
// `select count(*) from table where aFloatColumn = 0.05` works correctly in multi-stage query engine.
//
@@ -71,34 +88,32 @@ public class TypeFactory extends JavaTypeFactoryImpl {
// With float and double mapped to the same RelDataType, the behavior in multi-stage query engine will be the same
// as the query in v1 query engine.
case FLOAT:
- return fieldSpec.isSingleValueField() ? createSqlType(SqlTypeName.DOUBLE)
- : createArrayType(createSqlType(SqlTypeName.REAL), -1);
+ if (fieldSpec.isSingleValueField()) {
+ return SqlTypeName.DOUBLE;
+ } else {
+ // TODO: This may be wrong. The reason why we want to use DOUBLE in single value float may also apply here
+ return SqlTypeName.REAL;
+ }
case DOUBLE:
- return fieldSpec.isSingleValueField() ? createSqlType(SqlTypeName.DOUBLE)
- : createArrayType(createSqlType(SqlTypeName.DOUBLE), -1);
+ return SqlTypeName.DOUBLE;
case BOOLEAN:
- return fieldSpec.isSingleValueField() ? createSqlType(SqlTypeName.BOOLEAN)
- : createArrayType(createSqlType(SqlTypeName.BOOLEAN), -1);
+ return SqlTypeName.BOOLEAN;
case TIMESTAMP:
- return fieldSpec.isSingleValueField() ? createSqlType(SqlTypeName.TIMESTAMP)
- : createArrayType(createSqlType(SqlTypeName.TIMESTAMP), -1);
+ return SqlTypeName.TIMESTAMP;
case STRING:
- return fieldSpec.isSingleValueField() ? createSqlType(SqlTypeName.VARCHAR)
- : createArrayType(createSqlType(SqlTypeName.VARCHAR), -1);
+ return SqlTypeName.VARCHAR;
case BYTES:
- return fieldSpec.isSingleValueField() ? createSqlType(SqlTypeName.VARBINARY)
- : createArrayType(createSqlType(SqlTypeName.VARBINARY), -1);
+ return SqlTypeName.VARBINARY;
case BIG_DECIMAL:
- return fieldSpec.isSingleValueField() ? createSqlType(SqlTypeName.DECIMAL)
- : createArrayType(createSqlType(SqlTypeName.DECIMAL), -1);
+ return SqlTypeName.DECIMAL;
case JSON:
- return createSqlType(SqlTypeName.VARCHAR);
+ return SqlTypeName.VARCHAR;
case LIST:
// TODO: support LIST, MV column should go fall into this category.
case STRUCT:
case MAP:
default:
- String message = String.format("Unsupported type: %s ", fieldSpec.getDataType().toString());
+ String message = String.format("Unsupported type: %s ", fieldSpec.getDataType());
throw new UnsupportedOperationException(message);
}
}
diff --git a/pinot-query-planner/src/test/java/org/apache/pinot/query/queries/ResourceBasedQueryPlansTest.java b/pinot-query-planner/src/test/java/org/apache/pinot/query/queries/ResourceBasedQueryPlansTest.java
index 562e3d23fb..fa58d62291 100644
--- a/pinot-query-planner/src/test/java/org/apache/pinot/query/queries/ResourceBasedQueryPlansTest.java
+++ b/pinot-query-planner/src/test/java/org/apache/pinot/query/queries/ResourceBasedQueryPlansTest.java
@@ -47,12 +47,14 @@ public class ResourceBasedQueryPlansTest extends QueryEnvironmentTestBase {
private static final String FILE_FILTER_PROPERTY = "pinot.fileFilter";
@Test(dataProvider = "testResourceQueryPlannerTestCaseProviderHappyPath")
- public void testQueryExplainPlansAndQueryPlanConversion(String testCaseName, String query, String output) {
+ public void testQueryExplainPlansAndQueryPlanConversion(String testCaseName, String description, String query,
+ String output) {
try {
long requestId = RANDOM_REQUEST_ID_GEN.nextLong();
String explainedPlan = _queryEnvironment.explainQuery(query, requestId);
Assert.assertEquals(explainedPlan, output,
- String.format("Test case %s for query %s doesn't match expected output: %s", testCaseName, query, output));
+ String.format("Test case %s for query %s (%s) doesn't match expected output: %s", testCaseName, description,
+ query, output));
// use a regex to exclude the
String queryWithoutExplainPlan = query.replaceFirst(EXPLAIN_REGEX, "");
DispatchableSubPlan dispatchableSubPlan = _queryEnvironment.planQuery(queryWithoutExplainPlan);
@@ -105,7 +107,7 @@ public class ResourceBasedQueryPlansTest extends QueryEnvironmentTestBase {
String sql = queryCase._sql;
List<String> orgOutput = queryCase._output;
String concatenatedOutput = StringUtils.join(orgOutput, "");
- Object[] testEntry = new Object[]{testCaseName, sql, concatenatedOutput};
+ Object[] testEntry = new Object[]{testCaseName, queryCase._description, sql, concatenatedOutput};
providerContent.add(testEntry);
}
}
diff --git a/pinot-query-planner/src/test/java/org/apache/pinot/query/type/TypeFactoryTest.java b/pinot-query-planner/src/test/java/org/apache/pinot/query/type/TypeFactoryTest.java
index 698f02591e..8c1686b2c8 100644
--- a/pinot-query-planner/src/test/java/org/apache/pinot/query/type/TypeFactoryTest.java
+++ b/pinot-query-planner/src/test/java/org/apache/pinot/query/type/TypeFactoryTest.java
@@ -18,7 +18,11 @@
*/
package org.apache.pinot.query.type;
+import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.sql.type.ArraySqlType;
@@ -27,12 +31,203 @@ import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.testng.Assert;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
public class TypeFactoryTest {
- private static final TypeSystem TYPE_SYSTEM = new TypeSystem() {
- };
+ private static final TypeSystem TYPE_SYSTEM = new TypeSystem();
+
+ @DataProvider(name = "relDataTypeConversion")
+ public Iterator<Object[]> relDataTypeConversion() {
+ ArrayList<Object[]> cases = new ArrayList<>();
+
+ JavaTypeFactory javaTypeFactory = new JavaTypeFactoryImpl(TYPE_SYSTEM);
+
+ for (FieldSpec.DataType dataType : FieldSpec.DataType.values()) {
+ RelDataType basicType;
+ RelDataType arrayType = null;
+ switch (dataType) {
+ case INT: {
+ basicType = javaTypeFactory.createSqlType(SqlTypeName.INTEGER);
+ break;
+ }
+ case LONG: {
+ basicType = javaTypeFactory.createSqlType(SqlTypeName.BIGINT);
+ break;
+ }
+ // Map float and double to the same RelDataType so that queries like
+ // `select count(*) from table where aFloatColumn = 0.05` works correctly in multi-stage query engine.
+ //
+ // If float and double are mapped to different RelDataType,
+ // `select count(*) from table where aFloatColumn = 0.05` will be converted to
+ // `select count(*) from table where CAST(aFloatColumn as "DOUBLE") = 0.05`. While casting
+ // from float to double does not always produce the same double value as the original float value, this leads to
+ // wrong query result.
+ //
+ // With float and double mapped to the same RelDataType, the behavior in multi-stage query engine will be the
+ // same as the query in v1 query engine.
+ case FLOAT: {
+ basicType = javaTypeFactory.createSqlType(SqlTypeName.DOUBLE);
+ arrayType = javaTypeFactory.createSqlType(SqlTypeName.REAL);
+ break;
+ }
+ case DOUBLE: {
+ basicType = javaTypeFactory.createSqlType(SqlTypeName.DOUBLE);
+ break;
+ }
+ case BOOLEAN: {
+ basicType = javaTypeFactory.createSqlType(SqlTypeName.BOOLEAN);
+ break;
+ }
+ case TIMESTAMP: {
+ basicType = javaTypeFactory.createSqlType(SqlTypeName.TIMESTAMP);
+ break;
+ }
+ case STRING:
+ case JSON: {
+ basicType = javaTypeFactory.createSqlType(SqlTypeName.VARCHAR);
+ break;
+ }
+ case BYTES: {
+ basicType = javaTypeFactory.createSqlType(SqlTypeName.VARBINARY);
+ break;
+ }
+ case BIG_DECIMAL: {
+ basicType = javaTypeFactory.createSqlType(SqlTypeName.DECIMAL);
+ break;
+ }
+ case LIST:
+ case STRUCT:
+ case MAP:
+ case UNKNOWN:
+ continue;
+ default:
+ String message = String.format("Unsupported type: %s ", dataType);
+ throw new UnsupportedOperationException(message);
+ }
+ if (arrayType == null) {
+ arrayType = basicType;
+ }
+ cases.add(new Object[]{dataType, basicType, arrayType, true});
+ cases.add(new Object[]{dataType, basicType, arrayType, false});
+ }
+ return cases.iterator();
+ }
+
+ @Test(dataProvider = "relDataTypeConversion")
+ public void testScalarTypes(FieldSpec.DataType dataType, RelDataType scalarType, RelDataType arrayType,
+ boolean columnNullMode) {
+ TypeFactory typeFactory = new TypeFactory(TYPE_SYSTEM);
+ Schema testSchema = new Schema.SchemaBuilder()
+ .addSingleValueDimension("col", dataType)
+ .setEnableColumnBasedNullHandling(columnNullMode)
+ .build();
+ RelDataType relDataTypeFromSchema = typeFactory.createRelDataTypeFromSchema(testSchema);
+ List<RelDataTypeField> fieldList = relDataTypeFromSchema.getFieldList();
+ RelDataTypeField field = fieldList.get(0);
+ boolean colNullable = isColNullable(testSchema);
+ Assert.assertEquals(field.getType(), typeFactory.createTypeWithNullability(scalarType, colNullable));
+ }
+
+ @Test(dataProvider = "relDataTypeConversion")
+ public void testNullableScalarTypes(FieldSpec.DataType dataType, RelDataType scalarType, RelDataType arrayType,
+ boolean columnNullMode) {
+ TypeFactory typeFactory = new TypeFactory(TYPE_SYSTEM);
+ Schema testSchema = new Schema.SchemaBuilder()
+ .addDimensionField("col", dataType, field -> field.setNullable(true))
+ .setEnableColumnBasedNullHandling(columnNullMode)
+ .build();
+ RelDataType relDataTypeFromSchema = typeFactory.createRelDataTypeFromSchema(testSchema);
+ List<RelDataTypeField> fieldList = relDataTypeFromSchema.getFieldList();
+ RelDataTypeField field = fieldList.get(0);
+
+ boolean colNullable = isColNullable(testSchema);
+ RelDataType expectedType = typeFactory.createTypeWithNullability(scalarType, colNullable);
+
+ Assert.assertEquals(field.getType(), expectedType);
+ }
+
+
+ @Test(dataProvider = "relDataTypeConversion")
+ public void testNotNullableScalarTypes(FieldSpec.DataType dataType, RelDataType scalarType, RelDataType arrayType,
+ boolean columnNullMode) {
+ TypeFactory typeFactory = new TypeFactory(TYPE_SYSTEM);
+ Schema testSchema = new Schema.SchemaBuilder()
+ .addDimensionField("col", dataType, field -> field.setNullable(false))
+ .setEnableColumnBasedNullHandling(columnNullMode)
+ .build();
+ RelDataType relDataTypeFromSchema = typeFactory.createRelDataTypeFromSchema(testSchema);
+ List<RelDataTypeField> fieldList = relDataTypeFromSchema.getFieldList();
+ RelDataTypeField field = fieldList.get(0);
+
+ Assert.assertEquals(field.getType(), scalarType);
+ }
+
+ private boolean isColNullable(Schema schema) {
+ return schema.isEnableColumnBasedNullHandling() && schema.getFieldSpecFor("col").isNullable();
+ }
+
+ @Test(dataProvider = "relDataTypeConversion")
+ public void testArrayTypes(FieldSpec.DataType dataType, RelDataType scalarType, RelDataType arrayType,
+ boolean columnNullMode) {
+ TypeFactory typeFactory = new TypeFactory(TYPE_SYSTEM);
+ Schema testSchema = new Schema.SchemaBuilder()
+ .addMultiValueDimension("col", dataType)
+ .setEnableColumnBasedNullHandling(columnNullMode)
+ .build();
+ RelDataType relDataTypeFromSchema = typeFactory.createRelDataTypeFromSchema(testSchema);
+ List<RelDataTypeField> fieldList = relDataTypeFromSchema.getFieldList();
+ RelDataTypeField field = fieldList.get(0);
+
+ boolean nullable = isColNullable(testSchema);
+ RelDataType expectedType =
+ typeFactory.createTypeWithNullability(typeFactory.createArrayType(arrayType, -1), nullable);
+
+ Assert.assertEquals(field.getType(), expectedType);
+ }
+
+ @Test(dataProvider = "relDataTypeConversion")
+ public void testNullableArrayTypes(FieldSpec.DataType dataType, RelDataType scalarType, RelDataType arrayType,
+ boolean columnNullMode) {
+ TypeFactory typeFactory = new TypeFactory(TYPE_SYSTEM);
+ Schema testSchema = new Schema.SchemaBuilder()
+ .addDimensionField("col", dataType, field -> {
+ field.setNullable(true);
+ field.setSingleValueField(false);
+ })
+ .setEnableColumnBasedNullHandling(columnNullMode)
+ .build();
+ RelDataType relDataTypeFromSchema = typeFactory.createRelDataTypeFromSchema(testSchema);
+ List<RelDataTypeField> fieldList = relDataTypeFromSchema.getFieldList();
+ RelDataTypeField field = fieldList.get(0);
+
+ boolean colNullable = isColNullable(testSchema);
+ RelDataType expectedType =
+ typeFactory.createTypeWithNullability(typeFactory.createArrayType(arrayType, -1), colNullable);
+
+ Assert.assertEquals(field.getType(), expectedType);
+ }
+
+ @Test(dataProvider = "relDataTypeConversion")
+ public void testNotNullableArrayTypes(FieldSpec.DataType dataType, RelDataType scalarType, RelDataType arrayType,
+ boolean columnNullMode) {
+ TypeFactory typeFactory = new TypeFactory(TYPE_SYSTEM);
+ Schema testSchema = new Schema.SchemaBuilder()
+ .addDimensionField("col", dataType, field -> {
+ field.setNullable(false);
+ field.setSingleValueField(false);
+ })
+ .setEnableColumnBasedNullHandling(columnNullMode)
+ .build();
+ RelDataType relDataTypeFromSchema = typeFactory.createRelDataTypeFromSchema(testSchema);
+ List<RelDataTypeField> fieldList = relDataTypeFromSchema.getFieldList();
+ RelDataTypeField field = fieldList.get(0);
+
+ RelDataType expectedType = typeFactory.createArrayType(arrayType, -1);
+
+ Assert.assertEquals(field.getType(), expectedType);
+ }
@Test
public void testRelDataTypeConversion() {
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTest.java
index f25984f5fd..5a36f30c5a 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTest.java
@@ -62,7 +62,8 @@ public class QueryRunnerTest extends QueryRunnerTestBase {
SCHEMA_BUILDER = new Schema.SchemaBuilder().addSingleValueDimension("col1", FieldSpec.DataType.STRING, "")
.addSingleValueDimension("col2", FieldSpec.DataType.STRING, "")
.addDateTime("ts", FieldSpec.DataType.LONG, "1:MILLISECONDS:EPOCH", "1:HOURS")
- .addMetric("col3", FieldSpec.DataType.INT, 0).setSchemaName("defaultSchemaName");
+ .addMetric("col3", FieldSpec.DataType.INT, 0).setSchemaName("defaultSchemaName")
+ .setEnableColumnBasedNullHandling(true);
}
public static List<GenericRow> buildRows(String tableName) {
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java
index 8837e97a0e..4d6a79052c 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java
@@ -18,8 +18,10 @@
*/
package org.apache.pinot.query.runtime.queries;
+import com.fasterxml.jackson.annotation.JsonAnySetter;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.math.DoubleMath;
import java.math.BigDecimal;
import java.sql.Connection;
@@ -204,9 +206,10 @@ public abstract class QueryRunnerTestBase extends QueryTestSet {
protected void compareRowEquals(ResultTable resultTable, List<Object[]> expectedRows, boolean keepOutputRowsInOrder) {
List<Object[]> resultRows = resultTable.getRows();
int numRows = resultRows.size();
- assertEquals(numRows, expectedRows.size(), String.format("Mismatched number of results. expected: %s, actual: %s",
- expectedRows.stream().map(Arrays::toString).collect(Collectors.joining(",\n")),
- resultRows.stream().map(Arrays::toString).collect(Collectors.joining(",\n"))));
+ assertEquals(numRows, expectedRows.size(),
+ String.format("Mismatched number of results.\nExpected Rows:\n%s\nActual Rows:\n%s",
+ expectedRows.stream().map(Arrays::toString).collect(Collectors.joining(",\n")),
+ resultRows.stream().map(Arrays::toString).collect(Collectors.joining(",\n"))));
DataSchema dataSchema = resultTable.getDataSchema();
resultRows.forEach(row -> canonicalizeRow(dataSchema, row));
@@ -219,12 +222,12 @@ public abstract class QueryRunnerTestBase extends QueryTestSet {
Object[] resultRow = resultRows.get(i);
Object[] expectedRow = expectedRows.get(i);
assertEquals(resultRow.length, expectedRow.length,
- String.format("Unexpected row size mismatch. Expected: %s, Actual: %s", Arrays.toString(expectedRow),
- Arrays.toString(resultRow)));
+ String.format("Mismatched row size at row id: %d. Expected Row: %s, Actual Row: %s", i,
+ Arrays.toString(expectedRow), Arrays.toString(resultRow)));
for (int j = 0; j < resultRow.length; j++) {
assertTrue(typeCompatibleFuzzyEquals(dataSchema.getColumnDataType(j), resultRow[j], expectedRow[j]),
- "Not match at (" + i + "," + j + ")! Expected: " + Arrays.toString(expectedRow) + " Actual: "
- + Arrays.toString(resultRow));
+ String.format("Mismatched value at row id: %d, column id: %d. Expected Row: %s, Actual Row: %s", i, j,
+ Arrays.toString(expectedRow), Arrays.toString(resultRow)));
}
}
}
@@ -407,7 +410,11 @@ public abstract class QueryRunnerTestBase extends QueryTestSet {
// TODO: ts is built-in, but we should allow user overwrite
builder.addDateTime("ts", FieldSpec.DataType.LONG, "1:MILLISECONDS:EPOCH", "1:SECONDS");
builder.setSchemaName(schemaName);
- return builder.build();
+ Schema schema = builder.build();
+ for (QueryTestCase.ColumnAndType columnAndType : columnAndTypes) {
+ schema.getFieldSpecMap().get(columnAndType._name).setNotNull(columnAndType._notNull);
+ }
+ return schema;
}
protected List<GenericRow> toRow(List<QueryTestCase.ColumnAndType> columnAndTypes, List<List<Object>> value) {
@@ -540,7 +547,7 @@ public abstract class QueryRunnerTestBase extends QueryTestSet {
@JsonProperty("queries")
public List<Query> _queries;
@JsonProperty("extraProps")
- public Map<String, Object> _extraProps = Collections.emptyMap();
+ public ExtraProperties _extraProps = new ExtraProperties();
public static class Table {
@JsonProperty("schema")
@@ -581,6 +588,39 @@ public abstract class QueryRunnerTestBase extends QueryTestSet {
String _type;
@JsonProperty("isSingleValue")
boolean _isSingleValue = true;
+ @JsonProperty("notNull")
+ boolean _notNull = false;
+ }
+
+ public static class ExtraProperties {
+ private boolean _enableColumnBasedNullHandling = false;
+ private boolean _noEmptySegment = false;
+ private final Map<String, JsonNode> _unknownProps = new HashMap<>();
+
+ public boolean isEnableColumnBasedNullHandling() {
+ return _enableColumnBasedNullHandling;
+ }
+
+ public void setEnableColumnBasedNullHandling(boolean enableColumnBasedNullHandling) {
+ _enableColumnBasedNullHandling = enableColumnBasedNullHandling;
+ }
+
+ public boolean isNoEmptySegment() {
+ return _noEmptySegment;
+ }
+
+ public void setNoEmptySegment(boolean noEmptySegment) {
+ _noEmptySegment = noEmptySegment;
+ }
+
+ public Map<String, JsonNode> getUnknownProps() {
+ return _unknownProps;
+ }
+
+ @JsonAnySetter
+ public void setAny(String key, JsonNode value) {
+ _unknownProps.put(key, value);
+ }
}
}
}
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
index 1ebf761920..bb6744084a 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
@@ -31,7 +31,6 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.Random;
import java.util.Set;
import java.util.TimeZone;
import java.util.regex.Matcher;
@@ -54,7 +53,6 @@ import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.env.PinotConfiguration;
-import org.apache.pinot.spi.utils.BooleanUtils;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
@@ -69,7 +67,6 @@ public class ResourceBasedQueriesTest extends QueryRunnerTestBase {
private static final ObjectMapper MAPPER = new ObjectMapper();
private static final Pattern TABLE_NAME_REPLACE_PATTERN = Pattern.compile("\\{([\\w\\d]+)\\}");
private static final String QUERY_TEST_RESOURCE_FOLDER = "queries";
- private static final Random RANDOM = new Random(42);
private static final String FILE_FILTER_PROPERTY = "pinot.fileFilter";
private static final String IGNORE_FILTER_PROPERTY = "pinot.runIgnored";
private static final int DEFAULT_NUM_PARTITIONS = 4;
@@ -109,17 +106,17 @@ public class ResourceBasedQueriesTest extends QueryRunnerTestBase {
// table will be registered on both servers.
Map<String, Schema> schemaMap = new HashMap<>();
for (Map.Entry<String, QueryTestCase.Table> tableEntry : testCase._tables.entrySet()) {
- boolean allowEmptySegment = !BooleanUtils.toBoolean(extractExtraProps(testCase._extraProps, "noEmptySegment"));
+ boolean allowEmptySegment = !testCase._extraProps.isNoEmptySegment();
String tableName = testCaseName + "_" + tableEntry.getKey();
// Testing only OFFLINE table b/c Hybrid table test is a special case to test separately.
String offlineTableName = TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(tableName);
Schema pinotSchema = constructSchema(tableName, tableEntry.getValue()._schema);
+ pinotSchema.setEnableColumnBasedNullHandling(testCase._extraProps.isEnableColumnBasedNullHandling());
schemaMap.put(tableName, pinotSchema);
factory1.registerTable(pinotSchema, offlineTableName);
factory2.registerTable(pinotSchema, offlineTableName);
List<QueryTestCase.ColumnAndType> columnAndTypes = tableEntry.getValue()._schema;
List<GenericRow> genericRows = toRow(columnAndTypes, tableEntry.getValue()._inputs);
-
// generate segments and dump into server1 and server2
List<String> partitionColumns = tableEntry.getValue()._partitionColumns;
int numPartitions = tableEntry.getValue()._partitionCount == null ? DEFAULT_NUM_PARTITIONS
@@ -139,14 +136,17 @@ public class ResourceBasedQueriesTest extends QueryRunnerTestBase {
partitionIdToRowsMap.add(new ArrayList<>());
}
- for (GenericRow row : genericRows) {
+ int numRows = genericRows.size();
+ for (int i = 0; i < numRows; i++) {
+ GenericRow row = genericRows.get(i);
if (row == SEGMENT_BREAKER_ROW) {
addSegments(factory1, factory2, offlineTableName, allowEmptySegment, partitionIdToRowsMap,
partitionIdToSegmentsMap, numPartitions);
} else {
int partitionId;
if (partitionColumns == null) {
- partitionId = RANDOM.nextInt(numPartitions);
+ // Round-robin when there is no partition column
+ partitionId = i % numPartitions;
} else {
int hashCode = 0;
for (String field : partitionColumns) {
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/testutils/MockInstanceDataManagerFactory.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/testutils/MockInstanceDataManagerFactory.java
index fd3ef7a374..c3a34cbd20 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/testutils/MockInstanceDataManagerFactory.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/testutils/MockInstanceDataManagerFactory.java
@@ -65,7 +65,7 @@ public class MockInstanceDataManagerFactory {
// Key is registered table (with or without type)
private final Map<String, Schema> _registeredSchemaMap;
- private String _serverName;
+ private final String _serverName;
public MockInstanceDataManagerFactory(String serverName) {
_serverName = serverName;
@@ -162,8 +162,8 @@ public class MockInstanceDataManagerFactory {
String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType);
TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
// TODO: plugin table config constructor
- TableConfig tableConfig =
- new TableConfigBuilder(tableType).setTableName(rawTableName).setTimeColumnName("ts").build();
+ TableConfig tableConfig = new TableConfigBuilder(tableType).setTableName(rawTableName).setTimeColumnName("ts")
+ .setNullHandlingEnabled(true).build();
Schema schema = _schemaMap.get(rawTableName);
SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig, schema);
config.setOutDir(indexDir.getPath());
@@ -174,7 +174,7 @@ public class MockInstanceDataManagerFactory {
try (RecordReader recordReader = new GenericRowRecordReader(rows)) {
driver.init(config, recordReader);
driver.build();
- return ImmutableSegmentLoader.load(new File(indexDir, segmentName), ReadMode.mmap);
+ return ImmutableSegmentLoader.load(new File(indexDir, segmentName), ReadMode.mmap, tableConfig, schema);
} catch (Exception e) {
throw new RuntimeException("Unable to construct immutable segment from records", e);
}
diff --git a/pinot-query-runtime/src/test/resources/queries/CountDistinct.json b/pinot-query-runtime/src/test/resources/queries/CountDistinct.json
index 973d83b70c..cb146b149b 100644
--- a/pinot-query-runtime/src/test/resources/queries/CountDistinct.json
+++ b/pinot-query-runtime/src/test/resources/queries/CountDistinct.json
@@ -122,9 +122,9 @@
"outputs": [[8]]
},
{
- "comments": "table aren't actually partitioned by val thus all segments can produce duplicate results, thus [[b, 5], [a, 4]]",
+ "comments": "table aren't actually partitioned by val thus all segments can produce duplicate results, thus [[b, 6], [a, 4]]",
"sql": "SELECT groupingCol, SEGMENT_PARTITIONED_DISTINCT_COUNT(val) FROM {tbl1} GROUP BY groupingCol",
- "outputs": [["b", 5], ["a", 4]]
+ "outputs": [["b", 6], ["a", 4]]
},
{
"sql": "SELECT l.groupingCol, SEGMENT_PARTITIONED_DISTINCT_COUNT(l.val), SEGMENT_PARTITIONED_DISTINCT_COUNT(r.val) FROM {tbl1} l JOIN {tbl2} r ON l.groupingCol = r.groupingCol GROUP BY l.groupingCol",
@@ -135,9 +135,9 @@
"outputs": [["b", 6], ["a", 6]]
},
{
- "comments": "table aren't actually partitioned by val thus all segments can produce duplicate results, thus [[b, 5], [a, 4]]",
+ "comments": "table aren't actually partitioned by val thus all segments can produce duplicate results, thus [[b, 6], [a, 4]]",
"sql": "SELECT /*+ aggOptions(is_skip_leaf_stage_aggregate='true') */ groupingCol, SEGMENT_PARTITIONED_DISTINCT_COUNT(val) FROM {tbl1} GROUP BY groupingCol",
- "outputs": [["b", 5], ["a", 4]]
+ "outputs": [["b", 6], ["a", 4]]
},
{
"sql": "SELECT /*+ aggOptions(is_skip_leaf_stage_aggregate='true') */ l.groupingCol, SEGMENT_PARTITIONED_DISTINCT_COUNT(l.val), SEGMENT_PARTITIONED_DISTINCT_COUNT(r.val) FROM {tbl1} l JOIN {tbl2} r ON l.groupingCol = r.groupingCol GROUP BY l.groupingCol",
diff --git a/pinot-query-runtime/src/test/resources/queries/MetadataTestQuery.json b/pinot-query-runtime/src/test/resources/queries/MetadataTestQuery.json
index fdccd9cee8..3ed27c8a82 100644
--- a/pinot-query-runtime/src/test/resources/queries/MetadataTestQuery.json
+++ b/pinot-query-runtime/src/test/resources/queries/MetadataTestQuery.json
@@ -60,7 +60,7 @@
}
],
"extraProps": {
- "noEmptySegment": "true"
+ "noEmptySegment": true
}
}
}
diff --git a/pinot-query-runtime/src/test/resources/queries/NullHandling.json b/pinot-query-runtime/src/test/resources/queries/NullHandling.json
index 334eb5011a..ee15c88f34 100644
--- a/pinot-query-runtime/src/test/resources/queries/NullHandling.json
+++ b/pinot-query-runtime/src/test/resources/queries/NullHandling.json
@@ -66,5 +66,341 @@
"keepOutputRowOrder": true
}
]
+ },
+ "table_level_null_handling": {
+ "tables": {
+ "tbl1" : {
+ "schema": [
+ {"name": "strCol1", "type": "STRING"},
+ {"name": "intCol1", "type": "INT"},
+ {"name": "nIntCol1", "type": "INT", "notNull": false},
+ {"name": "nnIntCol1", "type": "INT", "notNull": true},
+ {"name": "strCol2", "type": "STRING"},
+ {"name": "nStrCol2", "type": "STRING", "notNull": false},
+ {"name": "nnStrCol2", "type": "STRING", "notNull": true}
+ ],
+ "inputs": [
+ ["foo", 1, 1, 1, "foo", "foo", "foo"],
+ ["bar", 2, 2, 2, "alice", "alice", "alice"],
+ ["alice", 2, 2, 2, null, null, null],
+ [null, 4, 4, 4, null, null, null],
+ ["bob", null, null, null, null, null, null]
+ ]
+ },
+ "tbl2" : {
+ "schema": [
+ {"name": "strCol1", "type": "STRING"},
+ {"name": "strCol2", "type": "STRING"},
+ {"name": "intCol1", "type": "INT"},
+ {"name": "doubleCol1", "type": "DOUBLE"}
+ ],
+ "inputs": [
+ ["foo", "bob", 3, 3.1416],
+ ["alice", "alice", 4, 2.7183],
+ [null, null, null, null]
+ ]
+ },
+ "tbl3": {
+ "schema": [
+ {"name": "strCol1", "type": "STRING"},
+ {"name": "intCol1", "type": "INT"},
+ {"name": "strCol2", "type": "STRING"}
+ ],
+ "inputs": [
+ ["foo", 1, "foo"],
+ ["bar", 2, "foo"]
+ ]
+ }
+ },
+ "queries": [
+ {
+ "description": "join 3 tables, mixed join conditions with null non-matching",
+ "sql": "SET enableNullHandling=true; SELECT * FROM {tbl1} JOIN {tbl2} ON {tbl1}.intCol1 > {tbl2}.doubleCol1 JOIN {tbl3} ON {tbl1}.strCol1 = {tbl3}.strCol2"
+ },
+ {
+ "description": "join 3 tables, mixed join condition with null matching",
+ "sql": "SET enableNullHandling=true; SELECT * FROM {tbl1} JOIN {tbl2} ON {tbl1}.intCol1 != {tbl2}.doubleCol1 JOIN {tbl3} ON {tbl1}.strCol1 = {tbl3}.strCol2"
+ },
+ {
+ "description": "join 2 tables, mixed join conditions of null matching or non-matching",
+ "sql": "SET enableNullHandling=true; SELECT * FROM {tbl1} JOIN {tbl2} ON {tbl1}.intCol1 > {tbl2}.doubleCol1 AND {tbl1}.strCol1 = {tbl2}.strCol1"
+ },
+
+ {
+ "description": "nIntCol1 IS NULL is always false",
+ "comment": "When column level nullability is disabled, V2 considers all columns not nullable",
+ "sql": "SET enableNullHandling=true; SELECT count(*) FROM {tbl1} WHERE nIntCol1 IS NULL",
+ "h2Sql": "SELECT 0"
+ },
+ {
+ "description": "nIntCol1 IS NOT NULL is always true",
+ "comment": "When column level nullability is disabled, V2 considers all columns not nullable",
+ "sql": "SET enableNullHandling=true; SELECT count(*) FROM {tbl1} WHERE nIntCol1 IS NOT NULL",
+ "h2Sql": "SELECT count(*) FROM {tbl1}"
+ },
+ {
+ "description": "nnIntCol1 IS NULL is always false",
+ "sql": "SET enableNullHandling=true; SELECT count(*) FROM {tbl1} WHERE nnIntCol1 IS NULL",
+ "h2Sql": "SELECT 0"
+ },
+ {
+ "description": "nnIntCol1 IS NOT NULL is always true",
+ "sql": "SET enableNullHandling=true; SELECT count(*) FROM {tbl1} WHERE nnIntCol1 IS NOT NULL",
+ "h2Sql": "SELECT count(*) FROM {tbl1}"
+ },
+ {
+ "description": "intCol1 IS NULL is always false",
+ "sql": "SET enableNullHandling=true; SELECT count(*) FROM {tbl1} WHERE intCol1 IS NULL",
+ "h2Sql": "SELECT 0"
+ },
+ {
+ "description": "intCol1 IS NULL is always true",
+ "sql": "SET enableNullHandling=true; SELECT count(*) FROM {tbl1} WHERE intCol1 IS NOT NULL",
+ "h2Sql": "SELECT count(*) FROM {tbl1}"
+ },
+
+ {
+ "description": "Leaf stages may return nulls",
+ "sql": "SET enableNullHandling=true; SELECT nStrCol2 FROM {tbl1} WHERE nIntCol1 = 4 and nStrCol2 IS NOT NULL",
+ "h2Sql": "SELECT null"
+ },
+
+ {
+ "description": "When projected with enableNullHandling=true, intCol1 is considered null",
+ "sql": "SET enableNullHandling=true; SELECT intCol1 FROM {tbl1} WHERE strCol1 = 'bob'",
+ "h2Sql": "SELECT null FROM {tbl1} WHERE strCol1 = 'bob'"
+ },
+ {
+ "description": "When projected with enableNullHandling=false, intCol1 is considered -2147483648",
+ "sql": "SET enableNullHandling=false; SELECT intCol1 FROM {tbl1} WHERE strCol1 = 'bob'",
+ "h2Sql": "SELECT -2147483648 FROM {tbl1} WHERE strCol1 = 'bob'"
+ },
+ {
+ "description": "When projected with enableNullHandling=true, nIntCol1 is considered null",
+ "sql": "SET enableNullHandling=true; SELECT nIntCol1 FROM {tbl1} WHERE strCol1 = 'bob'",
+ "h2Sql": "SELECT null FROM {tbl1} WHERE strCol1 = 'bob'"
+ },
+ {
+ "description": "When projected with enableNullHandling=false, nIntCol1 is considered -2147483648",
+ "sql": "SET enableNullHandling=false; SELECT nIntCol1 FROM {tbl1} WHERE strCol1 = 'bob'",
+ "h2Sql": "SELECT -2147483648 FROM {tbl1} WHERE strCol1 = 'bob'"
+ },
+ {
+ "description": "When projected with enableNullHandling=true, nnIntCol1 is considered null",
+ "sql": "SET enableNullHandling=true; SELECT nnIntCol1 FROM {tbl1} WHERE strCol1 = 'bob'",
+ "h2Sql": "SELECT null FROM {tbl1} WHERE strCol1 = 'bob'"
+ },
+ {
+ "description": "When projected with enableNullHandling=false, nnIntCol1 is considered -2147483648",
+ "sql": "SET enableNullHandling=false; SELECT nnIntCol1 FROM {tbl1} WHERE strCol1 = 'bob'",
+ "h2Sql": "SELECT -2147483648 FROM {tbl1} WHERE strCol1 = 'bob'"
+ },
+
+ {
+ "description": "aggregate with null checkers",
+ "sql": "SET enableNullHandling=true; SELECT SUM(intCol1) FROM {tbl1} WHERE intCol1 IS NOT NULL GROUP BY strCol2"
+ },
+
+ {
+ "description": "null checks in aggregate are skipped in implicit nullable columns",
+ "sql": "SET enableNullHandling=true; SELECT strCol2, COUNT(*) FROM {tbl1} WHERE intCol1 IS NOT NULL GROUP BY strCol2",
+ "h2Sql": "SELECT strCol2, COUNT(*) FROM {tbl1} GROUP BY strCol2"
+ },
+ {
+ "description": "null checks in aggregate are skipped in explicit nullable columns",
+ "sql": "SET enableNullHandling=true; SELECT strCol2, COUNT(*) FROM {tbl1} WHERE nIntCol1 IS NOT NULL GROUP BY strCol2",
+ "h2Sql": "SELECT strCol2, COUNT(*) FROM {tbl1} GROUP BY strCol2"
+ },
+ {
+ "description": "null checks in aggregate are skipped in not nullable columns",
+ "comment": "Given nnIntCol1 is not nullable, V2 removes the IS NOT NULL predicate",
+ "sql": "SET enableNullHandling=true; SELECT strCol2, COUNT(*) FROM {tbl1} WHERE nnIntCol1 IS NOT NULL GROUP BY strCol2",
+ "h2Sql": "SELECT strCol2, COUNT(*) FROM {tbl1} GROUP BY strCol2"
+ },
+
+ {
+ "description": "aggregate with null checkers on key from group set are skipped on not nullable columns",
+ "comment": "Given strCol2 is not explicitly nullable, V2 returns empty set",
+ "sql": "SET enableNullHandling=true; SELECT COUNT(*) FROM {tbl1} WHERE nnStrCol2 IS NULL GROUP BY strCol2",
+ "h2Sql": "SELECT 0 FROM {tbl1} WHERE false GROUP BY strCol2"
+ },
+ {
+ "description": "aggregate with null checkers on key from group set are skipped on nullable columns",
+ "sql": "SET enableNullHandling=true; SELECT COUNT(*) FROM {tbl1} WHERE nStrCol2 IS NULL GROUP BY nStrCol2",
+ "h2Sql": "SELECT 0 FROM {tbl1} WHERE false GROUP BY strCol2"
+ }
+ ]
+ },
+ "column_level_null_handling": {
+ "extraProps": {
+ "enableColumnBasedNullHandling": true
+ },
+ "tables": {
+ "tbl1" : {
+ "schema": [
+ {"name": "strCol1", "type": "STRING"},
+ {"name": "intCol1", "type": "INT"},
+ {"name": "nIntCol1", "type": "INT", "notNull": false},
+ {"name": "nnIntCol1", "type": "INT", "notNull": true},
+ {"name": "strCol2", "type": "STRING"},
+ {"name": "nStrCol2", "type": "STRING", "notNull": false},
+ {"name": "nnStrCol2", "type": "STRING", "notNull": true}
+ ],
+ "inputs": [
+ ["foo", 1, 1, 1, "foo", "foo", "foo"],
+ ["bar", 2, 2, 2, "alice", "alice", "alice"],
+ ["alice", 2, 2, 2, null, null, null],
+ [null, 4, 4, 4, null, null, null],
+ ["bob", null, null, null, null, null, null]
+ ]
+ },
+ "tbl2" : {
+ "schema": [
+ {"name": "strCol1", "type": "STRING"},
+ {"name": "strCol2", "type": "STRING", "notNull": false},
+ {"name": "intCol1", "type": "INT"},
+ {"name": "doubleCol1", "type": "DOUBLE", "notNull": false}
+ ],
+ "inputs": [
+ ["foo", "bob", 3, 3.1416],
+ ["alice", "alice", 4, 2.7183],
+ ["bob", null, 5, null]
+ ]
+ },
+ "tbl3" : {
+ "schema": [
+ {"name": "strCol1", "type": "STRING"},
+ {"name": "intCol1", "type": "INT"},
+ {"name": "strCol2", "type": "STRING"}
+ ],
+ "inputs": [
+ ["foo", 1, "foo"],
+ ["bar", 2, "foo"]
+ ]
+ }
+ },
+ "queries": [
+ {
+ "description": "join 3 tables, mixed join conditions with null non-matching",
+ "sql": "SET enableNullHandling=true; SELECT * FROM {tbl1} JOIN {tbl2} ON {tbl1}.intCol1 > {tbl2}.doubleCol1 JOIN {tbl3} ON {tbl1}.strCol1 = {tbl3}.strCol2"
+ },
+ {
+ "description": "join 3 tables, mixed join condition with null matching",
+ "sql": "SET enableNullHandling=true; SELECT * FROM {tbl1} JOIN {tbl2} ON {tbl1}.intCol1 != {tbl2}.doubleCol1 JOIN {tbl3} ON {tbl1}.strCol1 = {tbl3}.strCol2"
+ },
+ {
+ "description": "join 2 tables, mixed join conditions of null matching or non-matching",
+ "sql": "SET enableNullHandling=true; SELECT * FROM {tbl1} JOIN {tbl2} ON {tbl1}.intCol1 > {tbl2}.doubleCol1 AND {tbl1}.strCol1 = {tbl2}.strCol1"
+ },
+ {
+ "description": "aggregate with null checkers",
+ "sql": "SET enableNullHandling=true; SELECT SUM(intCol1) FROM {tbl1} WHERE intCol1 IS NOT NULL GROUP BY strCol2"
+ },
+ {
+ "description": "aggregate with null checkers",
+ "sql": "SET enableNullHandling=true; SELECT COUNT(*) FROM {tbl1} WHERE intCol1 IS NOT NULL GROUP BY strCol2"
+ },
+ {
+ "description": "aggregate with null checkers on key from group set",
+ "sql": "SET enableNullHandling=true; SELECT COUNT(*) FROM {tbl1} WHERE strCol2 IS NULL GROUP BY strCol2"
+ },
+ {
+ "description": "select only with NULL handling",
+ "sql": "SET enableNullHandling=true; SELECT strCol1, intCol1, nIntCol1, nnIntCol1, strCol2, nStrCol2, nnStrCol2 FROM {tbl1} WHERE nStrCol2 IS NULL AND nIntCol1 IS NOT NULL",
+ "h2Sql": "SELECT strCol1, intCol1, nIntCol1, nnIntCol1, strCol2, nStrCol2, 'null' FROM {tbl1} WHERE nStrCol2 IS NULL AND nIntCol1 IS NOT NULL"
+ },
+
+ {
+ "description": "Leaf stages should not return nulls",
+ "comment": "Result is the string 'null', not the value null",
+ "sql": "SET enableNullHandling=true; SELECT nnStrCol2 FROM {tbl1} WHERE nIntCol1 = 4",
+ "h2Sql": "SELECT 'null'"
+ },
+
+ {
+ "description": "nIntCol1 IS NULL is honored",
+ "sql": "SET enableNullHandling=true; SELECT count(*) FROM {tbl1} WHERE nIntCol1 IS NULL"
+ },
+ {
+ "description": "nIntCol1 IS NOT NULL is honored",
+ "comment": "When column level nullability is disabled, V2 considers all columns not nullable",
+ "sql": "SET enableNullHandling=true; SELECT count(*) FROM {tbl1} WHERE nIntCol1 IS NOT NULL"
+ },
+ {
+ "description": "nnIntCol1 IS NULL is always false",
+ "sql": "SET enableNullHandling=true; SELECT count(*) FROM {tbl1} WHERE nnIntCol1 IS NULL",
+ "h2Sql": "SELECT 0"
+ },
+ {
+ "description": "nnIntCol1 IS NOT NULL is always true",
+ "sql": "SET enableNullHandling=true; SELECT count(*) FROM {tbl1} WHERE nnIntCol1 IS NOT NULL",
+ "h2Sql": "SELECT count(*) FROM {tbl1}"
+ },
+ {
+ "description": "(IS NULL) Column with no explicit nullability are treated as nullable",
+ "sql": "SET enableNullHandling=true; SELECT count(*) FROM {tbl1} WHERE intCol1 IS NULL"
+ },
+ {
+ "description": "(IS NOT NULL) Column with no explicit nullability are treated as nullable",
+ "sql": "SET enableNullHandling=true; SELECT count(*) FROM {tbl1} WHERE intCol1 IS NOT NULL"
+ },
+
+ {
+ "description": "aggregate with null checkers on key from group set are skipped on not nullable columns",
+ "comment": "Given strCol2 is not explicitly nullable, V2 considers the where predicate always false",
+ "sql": "SET enableNullHandling=true; SELECT COUNT(*) FROM {tbl1} WHERE nnStrCol2 IS NULL GROUP BY strCol2",
+ "h2Sql": "SELECT COUNT(*) FROM {tbl1} WHERE false GROUP BY strCol2"
+ },
+ {
+ "description": "aggregate with null checkers on key from group set are honored on nullable columns",
+ "sql": "SET enableNullHandling=true; SELECT COUNT(*) FROM {tbl1} WHERE nStrCol2 IS NULL GROUP BY nStrCol2"
+ },
+
+ {
+ "description": "null checks in aggregate are honored in implicit nullable columns",
+ "sql": "SET enableNullHandling=true; SELECT strCol2, COUNT(*) FROM {tbl1} WHERE intCol1 IS NOT NULL GROUP BY strCol2"
+ },
+ {
+ "description": "null checks in aggregate are honored in explicit nullable columns",
+ "sql": "SET enableNullHandling=true; SELECT strCol2, COUNT(*) FROM {tbl1} WHERE nIntCol1 IS NOT NULL GROUP BY strCol2"
+ },
+ {
+ "description": "null checks in aggregate are skipped in not nullable columns",
+ "comment": "Given nnIntCol1 is not nullable, V2 removes the IS NOT NULL predicate",
+ "sql": "SET enableNullHandling=true; SELECT strCol2, COUNT(*) FROM {tbl1} WHERE nnIntCol1 IS NOT NULL GROUP BY strCol2",
+ "h2Sql": "SELECT strCol2, COUNT(*) FROM {tbl1} GROUP BY strCol2"
+ },
+
+ {
+ "description": "When projected with enableNullHandling=true, intCol1 is considered null",
+ "sql": "SET enableNullHandling=true; SELECT intCol1 FROM {tbl1} WHERE strCol1 = 'bob'",
+ "h2Sql": "SELECT null FROM {tbl1} WHERE strCol1 = 'bob'"
+ },
+ {
+ "description": "When projected with enableNullHandling=false, intCol1 is considered -2147483648",
+ "sql": "SET enableNullHandling=false; SELECT intCol1 FROM {tbl1} WHERE strCol1 = 'bob'",
+ "h2Sql": "SELECT -2147483648 FROM {tbl1} WHERE strCol1 = 'bob'"
+ },
+ {
+ "description": "When projected with enableNullHandling=true, nIntCol1 is considered null",
+ "sql": "SET enableNullHandling=true; SELECT nIntCol1 FROM {tbl1} WHERE strCol1 = 'bob'",
+ "h2Sql": "SELECT null FROM {tbl1} WHERE strCol1 = 'bob'"
+ },
+ {
+ "description": "When projected with enableNullHandling=false, nIntCol1 is considered -2147483648",
+ "sql": "SET enableNullHandling=false; SELECT nIntCol1 FROM {tbl1} WHERE strCol1 = 'bob'",
+ "h2Sql": "SELECT -2147483648 FROM {tbl1} WHERE strCol1 = 'bob'"
+ },
+ {
+ "description": "When projected with enableNullHandling=true, nnIntCol1 is considered -2147483648",
+ "sql": "SET enableNullHandling=true; SELECT nnIntCol1 FROM {tbl1} WHERE strCol1 = 'bob'",
+ "h2Sql": "SELECT -2147483648 FROM {tbl1} WHERE strCol1 = 'bob'"
+ },
+ {
+ "description": "When projected with enableNullHandling=false, nnIntCol1 is considered -2147483648",
+ "sql": "SET enableNullHandling=false; SELECT nnIntCol1 FROM {tbl1} WHERE strCol1 = 'bob'",
+ "h2Sql": "SELECT -2147483648 FROM {tbl1} WHERE strCol1 = 'bob'"
+ }
+ ]
}
-}
\ No newline at end of file
+}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentLoader.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentLoader.java
index 2f0ed2131b..359ea2dba8 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentLoader.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentLoader.java
@@ -45,6 +45,7 @@ import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderContext;
import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderRegistry;
import org.apache.pinot.segment.spi.store.SegmentDirectory;
import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths;
+import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.env.PinotConfiguration;
@@ -71,6 +72,18 @@ public class ImmutableSegmentLoader {
return load(indexDir, defaultIndexLoadingConfig, null, false);
}
+ /**
+ * Loads the segment with specified table config and schema.
+ * This method is used to access the segment without modifying it, i.e. in read-only mode.
+ */
+ public static ImmutableSegment load(File indexDir, ReadMode readMode, TableConfig tableConfig,
+ @Nullable Schema schema)
+ throws Exception {
+ IndexLoadingConfig defaultIndexLoadingConfig = new IndexLoadingConfig(tableConfig, schema);
+ defaultIndexLoadingConfig.setReadMode(readMode);
+ return load(indexDir, defaultIndexLoadingConfig, schema, false);
+ }
+
/**
* Loads the segment with empty schema but a specified IndexLoadingConfig.
* This method modifies the segment like to convert segment format, add or remove indices.
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
index c47d420c82..36f5d6e81c 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
@@ -256,8 +256,8 @@ public class MutableSegmentImpl implements MutableSegment {
}
Set<IndexType> specialIndexes =
- Sets.newHashSet(StandardIndexes.dictionary(), // dictionaries implement other contract
- StandardIndexes.nullValueVector()); // null value vector implement other contract
+ Sets.newHashSet(StandardIndexes.dictionary(), // dictionary implements other contract
+ StandardIndexes.nullValueVector()); // null value vector implements other contract
// Initialize for each column
for (FieldSpec fieldSpec : _physicalFieldSpecs) {
@@ -336,7 +336,7 @@ public class MutableSegmentImpl implements MutableSegment {
}
// Null value vector
- MutableNullValueVector nullValueVector = _nullHandlingEnabled ? new MutableNullValueVector() : null;
+ MutableNullValueVector nullValueVector = isNullable(fieldSpec) ? new MutableNullValueVector() : null;
Map<IndexType, MutableIndex> mutableIndexes = new HashMap<>();
for (IndexType<?, ?, ?> indexType : IndexService.getInstance().getAllIndexes()) {
@@ -401,6 +401,10 @@ public class MutableSegmentImpl implements MutableSegment {
}
}
+ private boolean isNullable(FieldSpec fieldSpec) {
+ return _schema.isEnableColumnBasedNullHandling() ? fieldSpec.isNullable() : _nullHandlingEnabled;
+ }
+
private <C extends IndexConfig> void addMutableIndex(Map<IndexType, MutableIndex> mutableIndexes,
IndexType<C, ?, ?> indexType, MutableIndexContext context, FieldIndexConfigs indexConfigs) {
MutableIndex mutableIndex = indexType.createMutableIndex(context, indexConfigs.getConfig(indexType));
@@ -658,7 +662,7 @@ public class MutableSegmentImpl implements MutableSegment {
}
// Update the null value vector even if a null value is somehow produced
- if (_nullHandlingEnabled && row.isNullValue(column)) {
+ if (indexContainer._nullValueVector != null && row.isNullValue(column)) {
indexContainer._nullValueVector.setNull(docId);
}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
index 10e0a5c1d0..3a7a96415a 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
@@ -105,7 +105,6 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
private File _indexDir;
private int _totalDocs;
private int _docIdCounter;
- private boolean _nullHandlingEnabled;
@Override
public void init(SegmentGeneratorConfig segmentCreationSpec, SegmentIndexCreationInfo segmentIndexCreationInfo,
@@ -133,10 +132,7 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
for (String columnName : indexConfigs.keySet()) {
FieldSpec fieldSpec = schema.getFieldSpecFor(columnName);
- if (fieldSpec == null) {
- Preconditions.checkState(schema.hasColumn(columnName),
- "Cannot create index for column: %s because it is not in schema", columnName);
- }
+ Preconditions.checkState(fieldSpec != null, "Failed to find column: %s in the schema", columnName);
if (fieldSpec.isVirtualColumn()) {
LOGGER.warn("Ignoring index creation for virtual column " + columnName);
continue;
@@ -207,7 +203,6 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
if (oldFwdCreator != null) {
Object fakeForwardValue = calculateRawValueForTextIndex(dictEnabledColumn, config, fieldSpec);
if (fakeForwardValue != null) {
- @SuppressWarnings("unchecked")
ForwardIndexCreator castedOldFwdCreator = (ForwardIndexCreator) oldFwdCreator;
SameValueForwardIndexCreator fakeValueFwdCreator =
new SameValueForwardIndexCreator(fakeForwardValue, castedOldFwdCreator);
@@ -218,9 +213,8 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
}
// Although NullValueVector is implemented as an index, it needs to be treated in a different way than other indexes
- _nullHandlingEnabled = _config.isNullHandlingEnabled();
- if (_nullHandlingEnabled) {
- for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
+ for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
+ if (isNullable(fieldSpec)) {
// Initialize Null value vector map
String columnName = fieldSpec.getName();
_nullValueVectorCreatorMap.put(columnName, new NullValueVectorCreator(_indexDir, columnName));
@@ -228,6 +222,10 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
}
}
+ private boolean isNullable(FieldSpec fieldSpec) {
+ return _schema.isEnableColumnBasedNullHandling() ? fieldSpec.isNullable() : _config.isNullHandlingEnabled();
+ }
+
private FieldIndexConfigs adaptConfig(String columnName, FieldIndexConfigs config,
ColumnIndexCreationInfo columnIndexCreationInfo, SegmentGeneratorConfig segmentCreationSpec) {
FieldIndexConfigs.Builder builder = new FieldIndexConfigs.Builder(config);
@@ -325,13 +323,10 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
}
}
- if (_nullHandlingEnabled) {
- for (Map.Entry<String, NullValueVectorCreator> entry : _nullValueVectorCreatorMap.entrySet()) {
- String columnName = entry.getKey();
- // If row has null value for given column name, add to null value vector
- if (row.isNullValue(columnName)) {
- _nullValueVectorCreatorMap.get(columnName).setNull(_docIdCounter);
- }
+ for (Map.Entry<String, NullValueVectorCreator> entry : _nullValueVectorCreatorMap.entrySet()) {
+ // If row has null value for given column name, add to null value vector
+ if (row.isNullValue(entry.getKey())) {
+ entry.getValue().setNull(_docIdCounter);
}
}
@@ -369,7 +364,8 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
private void indexColumnValue(PinotSegmentColumnReader colReader,
Map<IndexType<?, ?, ?>, IndexCreator> creatorsByIndex, String columnName, FieldSpec fieldSpec,
- SegmentDictionaryCreator dictionaryCreator, int sourceDocId, int onDiskDocPos, NullValueVectorCreator nullVec)
+ SegmentDictionaryCreator dictionaryCreator, int sourceDocId, int onDiskDocPos,
+ @Nullable NullValueVectorCreator nullVec)
throws IOException {
Object columnValueToIndex = colReader.getValue(sourceDocId);
if (columnValueToIndex == null) {
@@ -382,7 +378,7 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
indexMultiValueRow(dictionaryCreator, (Object[]) columnValueToIndex, creatorsByIndex);
}
- if (_nullHandlingEnabled) {
+ if (nullVec != null) {
if (colReader.isNull(sourceDocId)) {
nullVec.setNull(onDiskDocPos);
}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java
index 7fe5bd3385..bdd54224e0 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java
@@ -133,6 +133,10 @@ public class IndexLoadingConfig {
this(instanceDataManagerConfig, tableConfig, null);
}
+ public IndexLoadingConfig(TableConfig tableConfig, @Nullable Schema schema) {
+ extractFromTableConfigAndSchema(tableConfig, schema);
+ }
+
public IndexLoadingConfig() {
}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
index 9fc15e86b1..acfd4827c6 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
@@ -530,9 +530,7 @@ public abstract class BaseDefaultColumnHandler implements DefaultColumnHandler {
}
}
- if (_indexLoadingConfig.getTableConfig() != null
- && _indexLoadingConfig.getTableConfig().getIndexingConfig() != null
- && _indexLoadingConfig.getTableConfig().getIndexingConfig().isNullHandlingEnabled()) {
+ if (isNullable(fieldSpec)) {
if (!_segmentWriter.hasIndexFor(column, StandardIndexes.nullValueVector())) {
try (NullValueVectorCreator nullValueVectorCreator =
new NullValueVectorCreator(_indexDir, fieldSpec.getName())) {
@@ -550,6 +548,16 @@ public abstract class BaseDefaultColumnHandler implements DefaultColumnHandler {
fieldSpec, true/*hasDictionary*/, dictionaryElementSize);
}
+ private boolean isNullable(FieldSpec fieldSpec) {
+ if (_schema.isEnableColumnBasedNullHandling()) {
+ return fieldSpec.isNullable();
+ } else {
+ return _indexLoadingConfig.getTableConfig() != null
+ && _indexLoadingConfig.getTableConfig().getIndexingConfig() != null
+ && _indexLoadingConfig.getTableConfig().getIndexingConfig().isNullHandlingEnabled();
+ }
+ }
+
/**
* Helper method to create the V1 indices (dictionary and forward index) for a column with derived values.
* TODO:
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/nullvalue/NullValueIndexType.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/nullvalue/NullValueIndexType.java
index c468870821..9c01b3a321 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/nullvalue/NullValueIndexType.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/nullvalue/NullValueIndexType.java
@@ -19,8 +19,10 @@
package org.apache.pinot.segment.local.segment.index.nullvalue;
+import com.google.common.collect.Maps;
import java.io.File;
import java.io.IOException;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -33,15 +35,16 @@ import org.apache.pinot.segment.spi.creator.IndexCreationContext;
import org.apache.pinot.segment.spi.index.AbstractIndexType;
import org.apache.pinot.segment.spi.index.ColumnConfigDeserializer;
import org.apache.pinot.segment.spi.index.FieldIndexConfigs;
-import org.apache.pinot.segment.spi.index.IndexConfigDeserializer;
import org.apache.pinot.segment.spi.index.IndexHandler;
import org.apache.pinot.segment.spi.index.IndexReaderFactory;
+import org.apache.pinot.segment.spi.index.IndexType;
import org.apache.pinot.segment.spi.index.StandardIndexes;
import org.apache.pinot.segment.spi.index.reader.NullValueVectorReader;
import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
import org.apache.pinot.segment.spi.store.SegmentDirectory;
import org.apache.pinot.spi.config.table.IndexConfig;
import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
@@ -67,7 +70,7 @@ public class NullValueIndexType extends AbstractIndexType<IndexConfig, NullValue
@Override
public IndexConfig getDefaultConfig() {
- return IndexConfig.DISABLED;
+ return IndexConfig.ENABLED;
}
@Override
@@ -77,11 +80,27 @@ public class NullValueIndexType extends AbstractIndexType<IndexConfig, NullValue
@Override
public ColumnConfigDeserializer<IndexConfig> createDeserializer() {
- return IndexConfigDeserializer.ifIndexingConfig(
- IndexConfigDeserializer.alwaysCall((TableConfig tableConfig, Schema schema) ->
- tableConfig.getIndexingConfig().isNullHandlingEnabled()
- ? IndexConfig.ENABLED
- : IndexConfig.DISABLED));
+ return (TableConfig tableConfig, Schema schema) -> {
+ Collection<FieldSpec> allFieldSpecs = schema.getAllFieldSpecs();
+ Map<String, IndexConfig> configMap = Maps.newHashMapWithExpectedSize(allFieldSpecs.size());
+
+ boolean enableColumnBasedNullHandling = schema.isEnableColumnBasedNullHandling();
+ boolean nullHandlingEnabled = tableConfig.getIndexingConfig() != null
+ && tableConfig.getIndexingConfig().isNullHandlingEnabled();
+
+ for (FieldSpec fieldSpec : allFieldSpecs) {
+ IndexConfig indexConfig;
+ boolean enabled;
+ if (enableColumnBasedNullHandling) {
+ enabled = fieldSpec.isNullable();
+ } else {
+ enabled = nullHandlingEnabled;
+ }
+ indexConfig = enabled ? IndexConfig.ENABLED : IndexConfig.DISABLED;
+ configMap.put(fieldSpec.getName(), indexConfig);
+ }
+ return configMap;
+ };
}
public NullValueVectorCreator createIndexCreator(File indexDir, String columnName) {
@@ -116,10 +135,14 @@ public class NullValueIndexType extends AbstractIndexType<IndexConfig, NullValue
public NullValueVectorReader createIndexReader(SegmentDirectory.Reader segmentReader,
FieldIndexConfigs fieldIndexConfigs, ColumnMetadata metadata)
throws IOException {
- if (!segmentReader.hasIndexFor(metadata.getColumnName(), StandardIndexes.nullValueVector())) {
+ IndexType<IndexConfig, NullValueVectorReader, ?> indexType = StandardIndexes.nullValueVector();
+ if (fieldIndexConfigs.getConfig(indexType).isDisabled()) {
+ return null;
+ }
+ if (!segmentReader.hasIndexFor(metadata.getColumnName(), indexType)) {
return null;
}
- PinotDataBuffer buffer = segmentReader.getIndexFor(metadata.getColumnName(), StandardIndexes.nullValueVector());
+ PinotDataBuffer buffer = segmentReader.getIndexFor(metadata.getColumnName(), indexType);
return new NullValueVectorReaderImpl(buffer);
}
}
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/nullvalue/NullValueIndexTypeTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/nullvalue/NullValueIndexTypeTest.java
new file mode 100644
index 0000000000..e8a15fc82f
--- /dev/null
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/nullvalue/NullValueIndexTypeTest.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.index.nullvalue;
+
+import org.apache.pinot.segment.local.segment.index.AbstractSerdeIndexContract;
+import org.apache.pinot.segment.spi.index.StandardIndexes;
+import org.apache.pinot.spi.config.table.IndexConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.testng.Assert;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+
+public class NullValueIndexTypeTest {
+
+ @DataProvider(name = "provideCases")
+ public Object[][] provideCases() {
+ return new Object[][]{
+ // This is the semantic table, assuming a null bitmap buffer exists in the segment
+ // enableColumnBasedNullHandling | table nullable | column nullable | expected index config
+ new Object[]{false, false, false, IndexConfig.DISABLED}, new Object[]{
+ false, false, true, IndexConfig.DISABLED
+ }, new Object[]{false, true, false, IndexConfig.ENABLED}, new Object[]{
+ false, true, true, IndexConfig.ENABLED
+ },
+
+ new Object[]{true, false, false, IndexConfig.DISABLED}, new Object[]{true, false, true, IndexConfig.ENABLED},
+ new Object[]{true, true, false, IndexConfig.DISABLED}, new Object[]{true, true, true, IndexConfig.ENABLED}
+ };
+ }
+
+ public static class ConfTest extends AbstractSerdeIndexContract {
+
+ protected void assertEquals(IndexConfig expected) {
+ Assert.assertEquals(getActualConfig("dimStr", StandardIndexes.nullValueVector()), expected);
+ }
+
+ @Test(dataProvider = "provideCases", dataProviderClass = NullValueIndexTypeTest.class)
+ public void isEnabledWhenNullable(boolean enableColumnBasedNullHandling, boolean tableNullable,
+ boolean fieldNullable, IndexConfig expected) {
+ _schema.setEnableColumnBasedNullHandling(enableColumnBasedNullHandling);
+ _tableConfig.getIndexingConfig().setNullHandlingEnabled(tableNullable);
+
+ FieldSpec fieldSpec = _schema.getFieldSpecFor("dimStr");
+ fieldSpec.setNullable(fieldNullable);
+
+ assertEquals(expected);
+ }
+ }
+}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java
index 978862e600..a3dc1bbef5 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java
@@ -44,6 +44,7 @@ import org.apache.pinot.spi.utils.TimestampUtils;
* <p>- <code>IsSingleValueField</code>: single-value or multi-value field.
* <p>- <code>DefaultNullValue</code>: when no value found for this field, use this value. Stored in string format.
* <p>- <code>VirtualColumnProvider</code>: the virtual column provider to use for this field.
+ * <p>- <code>NotNull</code>: whether the column accepts nulls or not. Defaults to false.
*/
@SuppressWarnings("unused")
public abstract class FieldSpec implements Comparable<FieldSpec>, Serializable {
@@ -99,6 +100,7 @@ public abstract class FieldSpec implements Comparable<FieldSpec>, Serializable {
protected String _name;
protected DataType _dataType;
protected boolean _isSingleValueField = true;
+ protected boolean _notNull = false;
// NOTE: This only applies to STRING column, which is the max number of characters
private int _maxLength = DEFAULT_MAX_LENGTH;
@@ -300,6 +302,30 @@ public abstract class FieldSpec implements Comparable<FieldSpec>, Serializable {
_transformFunction = transformFunction;
}
+ /**
+ * Returns whether the column is nullable or not.
+ */
+ @JsonIgnore
+ public boolean isNullable() {
+ return !_notNull;
+ }
+
+ /**
+ * @see #isNullable()
+ */
+ @JsonIgnore
+ public void setNullable(Boolean nullable) {
+ _notNull = !nullable;
+ }
+
+ public boolean isNotNull() {
+ return _notNull;
+ }
+
+ public void setNotNull(boolean notNull) {
+ _notNull = notNull;
+ }
+
/**
* Returns the {@link ObjectNode} representing the field spec.
* <p>Only contains fields with non-default value.
@@ -317,6 +343,7 @@ public abstract class FieldSpec implements Comparable<FieldSpec>, Serializable {
}
appendDefaultNullValue(jsonObject);
appendTransformFunction(jsonObject);
+ jsonObject.put("notNull", _notNull);
return jsonObject;
}
@@ -381,7 +408,8 @@ public abstract class FieldSpec implements Comparable<FieldSpec>, Serializable {
.isEqual(_isSingleValueField, that._isSingleValueField) && EqualityUtils
.isEqual(getStringValue(_defaultNullValue), getStringValue(that._defaultNullValue)) && EqualityUtils
.isEqual(_maxLength, that._maxLength) && EqualityUtils.isEqual(_transformFunction, that._transformFunction)
- && EqualityUtils.isEqual(_virtualColumnProvider, that._virtualColumnProvider);
+ && EqualityUtils.isEqual(_virtualColumnProvider, that._virtualColumnProvider)
+ && EqualityUtils.isEqual(_notNull, that._notNull);
}
@Override
@@ -393,6 +421,7 @@ public abstract class FieldSpec implements Comparable<FieldSpec>, Serializable {
result = EqualityUtils.hashCodeOf(result, _maxLength);
result = EqualityUtils.hashCodeOf(result, _transformFunction);
result = EqualityUtils.hashCodeOf(result, _virtualColumnProvider);
+ result = EqualityUtils.hashCodeOf(result, _notNull);
return result;
}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/Schema.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/Schema.java
index 4bd63a16df..63add34987 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/Schema.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/Schema.java
@@ -37,6 +37,7 @@ import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pinot.spi.data.FieldSpec.DataType;
@@ -64,6 +65,7 @@ public final class Schema implements Serializable {
private static final Logger LOGGER = LoggerFactory.getLogger(Schema.class);
private String _schemaName;
+ private boolean _enableColumnBasedNullHandling;
private final List<DimensionFieldSpec> _dimensionFieldSpecs = new ArrayList<>();
private final List<MetricFieldSpec> _metricFieldSpecs = new ArrayList<>();
private TimeFieldSpec _timeFieldSpec;
@@ -165,6 +167,14 @@ public final class Schema implements Serializable {
_schemaName = schemaName;
}
+ public boolean isEnableColumnBasedNullHandling() {
+ return _enableColumnBasedNullHandling;
+ }
+
+ public void setEnableColumnBasedNullHandling(boolean enableColumnBasedNullHandling) {
+ _enableColumnBasedNullHandling = enableColumnBasedNullHandling;
+ }
+
public List<String> getPrimaryKeyColumns() {
return _primaryKeyColumns;
}
@@ -427,6 +437,7 @@ public final class Schema implements Serializable {
public ObjectNode toJsonObject() {
ObjectNode jsonObject = JsonUtils.newObjectNode();
jsonObject.put("schemaName", _schemaName);
+ jsonObject.set("enableColumnBasedNullHandling", JsonUtils.objectToJsonNode(_enableColumnBasedNullHandling));
if (!_dimensionFieldSpecs.isEmpty()) {
ArrayNode jsonArray = JsonUtils.newArrayNode();
for (DimensionFieldSpec dimensionFieldSpec : _dimensionFieldSpecs) {
@@ -520,6 +531,58 @@ public final class Schema implements Serializable {
return this;
}
+ public SchemaBuilder setEnableColumnBasedNullHandling(boolean enableColumnBasedNullHandling) {
+ _schema.setEnableColumnBasedNullHandling(enableColumnBasedNullHandling);
+ return this;
+ }
+
+ public SchemaBuilder addField(FieldSpec fieldSpec) {
+ _schema.addField(fieldSpec);
+ return this;
+ }
+
+ public SchemaBuilder addMetricField(String name, DataType dataType) {
+ return addMetricField(name, dataType, ignore -> {
+ });
+ }
+
+ public SchemaBuilder addMetricField(String name, DataType dataType, Consumer<MetricFieldSpec> customizer) {
+ MetricFieldSpec fieldSpec = new MetricFieldSpec();
+ return addField(fieldSpec, name, dataType, customizer);
+ }
+
+ public SchemaBuilder addDimensionField(String name, DataType dataType) {
+ return addDimensionField(name, dataType, ignore -> {
+ });
+ }
+
+ public SchemaBuilder addDimensionField(String name, DataType dataType, Consumer<DimensionFieldSpec> customizer) {
+ DimensionFieldSpec fieldSpec = new DimensionFieldSpec();
+ return addField(fieldSpec, name, dataType, customizer);
+ }
+
+ public SchemaBuilder addDateTimeField(String name, DataType dataType, String format, String granularity) {
+ return addDateTimeField(name, dataType, format, granularity, ignore -> {
+ });
+ }
+
+ public SchemaBuilder addDateTimeField(String name, DataType dataType, String format, String granularity,
+ Consumer<DateTimeFieldSpec> customizer) {
+ DateTimeFieldSpec fieldSpec = new DateTimeFieldSpec();
+ fieldSpec.setFormat(format);
+ fieldSpec.setGranularity(granularity);
+ return addField(fieldSpec, name, dataType, customizer);
+ }
+
+ private <E extends FieldSpec> SchemaBuilder addField(E fieldSpec, String name, DataType dataType,
+ Consumer<E> customizer) {
+ fieldSpec.setName(name);
+ fieldSpec.setDataType(dataType);
+ customizer.accept(fieldSpec);
+ _schema.addField(fieldSpec);
+ return this;
+ }
+
/**
* Add single value dimensionFieldSpec
*/
@@ -674,7 +737,8 @@ public final class Schema implements Serializable {
&& EqualityUtils.isEqual(_timeFieldSpec, that._timeFieldSpec)
&& EqualityUtils.isEqualIgnoreOrder(_dateTimeFieldSpecs, that._dateTimeFieldSpecs)
&& EqualityUtils.isEqualIgnoreOrder(_complexFieldSpecs, that._complexFieldSpecs)
- && EqualityUtils.isEqual(_primaryKeyColumns, that._primaryKeyColumns);
+ && EqualityUtils.isEqual(_primaryKeyColumns, that._primaryKeyColumns)
+ && EqualityUtils.isEqual(_enableColumnBasedNullHandling, that._enableColumnBasedNullHandling);
//@formatter:on
}
@@ -733,6 +797,7 @@ public final class Schema implements Serializable {
result = EqualityUtils.hashCodeOf(result, _dateTimeFieldSpecs);
result = EqualityUtils.hashCodeOf(result, _complexFieldSpecs);
result = EqualityUtils.hashCodeOf(result, _primaryKeyColumns);
+ result = EqualityUtils.hashCodeOf(result, _enableColumnBasedNullHandling);
return result;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org