You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by si...@apache.org on 2022/10/12 05:24:16 UTC
[pinot] branch master updated: Add an option to disable the creation of the forward index for a column (#9333)
This is an automated email from the ASF dual-hosted git repository.
siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new e813867985 Add an option to disable the creation of the forward index for a column (#9333)
e813867985 is described below
commit e813867985746e916c8e898a530002551b661496
Author: Sonam Mandal <so...@linkedin.com>
AuthorDate: Tue Oct 11 22:24:11 2022 -0700
Add an option to disable the creation of the forward index for a column (#9333)
* Add the option to create columns without a forward index
* Fix unit test
* Set and unset flag in test
* Empty commit to trigger tests
* Empty commit to trigger tests
* Trying test changes to see if unit tests pass
* Add order by to tests for better determinism
* Change disk size used in OfflineClusterIntegrationTest:testAggregateMetadataAPI
* Empty commit to trigger tests
* Address review comments
* Empty commit to trigger tests
* Address review comments, add file for MV forward index disabled tests
* Enhance tests and add fixes and address some comments
* Address review comments, remove no-op reader, allow sorted column, remove metadata
* Address review comments
* Address review comments
* Empty-Commit
* Address review comments, remove default column handling for forwardIndexDisabled columns
* Fix formatting
* Fix formatting
* Update tests to check error message
* Empty commit to trigger tests
* Empty commit to trigger tests
* Add checks for forwardIndexDisabled for derived column handling
* Empty commit to trigger tests
* Address review comments
* Fix tests
---
.../org/apache/pinot/core/common/DataFetcher.java | 8 +-
.../pinot/core/minion/RawIndexConverter.java | 19 +-
.../operator/filter/ScanBasedFilterOperator.java | 4 +
.../ForwardIndexDisabledMultiValueQueriesTest.java | 885 ++++++++++++++
...ForwardIndexDisabledSingleValueQueriesTest.java | 1217 ++++++++++++++++++++
.../converter/stats/MutableColumnStatistics.java | 3 +
.../stats/MutableNoDictionaryColStatistics.java | 3 +
.../creator/impl/DefaultIndexCreatorProvider.java | 28 +-
.../creator/impl/SegmentColumnarIndexCreator.java | 52 +
.../creator/impl/fwd/NoOpForwardIndexCreator.java | 64 +
.../index/column/PhysicalColumnIndexContainer.java | 17 +-
.../segment/index/datasource/BaseDataSource.java | 2 +-
.../segment/index/loader/IndexLoadingConfig.java | 53 +-
.../defaultcolumn/BaseDefaultColumnHandler.java | 78 ++
.../forward/FixedBitMVForwardIndexReader.java | 1 +
.../segment/readers/PinotSegmentColumnReader.java | 1 +
.../startree/v2/store/StarTreeLoaderUtils.java | 1 +
.../segment/local/utils/TableConfigUtils.java | 44 +
.../local/segment/index/loader/LoaderTest.java | 159 +++
.../index/loader/SegmentPreProcessorTest.java | 353 +++++-
.../BaseDefaultColumnHandlerTest.java | 2 +-
.../segment/local/utils/TableConfigUtilsTest.java | 94 ++
.../newColumnsSchemaWithForwardIndexDisabled.json | 77 ++
.../segment/spi/creator/IndexCreationContext.java | 25 +-
.../spi/creator/SegmentGeneratorConfig.java | 44 +
.../apache/pinot/spi/config/table/FieldConfig.java | 3 +
.../converter/DictionaryToRawIndexConverter.java | 19 +-
27 files changed, 3210 insertions(+), 46 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/DataFetcher.java b/pinot-core/src/main/java/org/apache/pinot/core/common/DataFetcher.java
index f04e085a9e..4e2d13a72c 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/DataFetcher.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/DataFetcher.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.core.common;
+import com.google.common.base.Preconditions;
import java.io.Closeable;
import java.io.IOException;
import java.math.BigDecimal;
@@ -66,10 +67,13 @@ public class DataFetcher {
for (Map.Entry<String, DataSource> entry : dataSourceMap.entrySet()) {
String column = entry.getKey();
DataSource dataSource = entry.getValue();
+ DataSourceMetadata dataSourceMetadata = dataSource.getDataSourceMetadata();
+ ForwardIndexReader<?> forwardIndexReader = dataSource.getForwardIndex();
+ Preconditions.checkState(forwardIndexReader != null,
+ "Forward index disabled for column: %s, cannot create DataFetcher!", column);
ColumnValueReader columnValueReader =
- new ColumnValueReader(dataSource.getForwardIndex(), dataSource.getDictionary());
+ new ColumnValueReader(forwardIndexReader, dataSource.getDictionary());
_columnValueReaderMap.put(column, columnValueReader);
- DataSourceMetadata dataSourceMetadata = dataSource.getDataSourceMetadata();
if (!dataSourceMetadata.isSingleValue()) {
maxNumValuesPerMVEntry = Math.max(maxNumValuesPerMVEntry, dataSourceMetadata.getMaxNumValuesPerMVEntry());
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/minion/RawIndexConverter.java b/pinot-core/src/main/java/org/apache/pinot/core/minion/RawIndexConverter.java
index f6d2a7d2a2..49a786c340 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/minion/RawIndexConverter.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/minion/RawIndexConverter.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.core.minion;
+import com.google.common.base.Preconditions;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
@@ -203,7 +204,9 @@ public class RawIndexConverter {
// Create the raw index
DataSource dataSource = _originalImmutableSegment.getDataSource(columnName);
- ForwardIndexReader reader = dataSource.getForwardIndex();
+ ForwardIndexReader forwardIndexReader = dataSource.getForwardIndex();
+ Preconditions.checkState(forwardIndexReader != null,
+ "Forward index disabled for column: %s, raw index conversion operation unsupported!", columnName);
Dictionary dictionary = dataSource.getDictionary();
assert dictionary != null;
DataType storedType = dictionary.getValueType();
@@ -213,36 +216,36 @@ public class RawIndexConverter {
IndexCreationContext.builder().withIndexDir(_convertedIndexDir).withColumnMetadata(columnMetadata)
.withFieldSpec(new DimensionFieldSpec(columnName, storedType, columnMetadata.isSingleValue()))
.withDictionary(false).build().forForwardIndex(ChunkCompressionType.LZ4, null));
- ForwardIndexReaderContext readerContext = reader.createContext()) {
+ ForwardIndexReaderContext readerContext = forwardIndexReader.createContext()) {
switch (storedType) {
case INT:
for (int docId = 0; docId < numDocs; docId++) {
- rawIndexCreator.putInt(dictionary.getIntValue(reader.getDictId(docId, readerContext)));
+ rawIndexCreator.putInt(dictionary.getIntValue(forwardIndexReader.getDictId(docId, readerContext)));
}
break;
case LONG:
for (int docId = 0; docId < numDocs; docId++) {
- rawIndexCreator.putLong(dictionary.getLongValue(reader.getDictId(docId, readerContext)));
+ rawIndexCreator.putLong(dictionary.getLongValue(forwardIndexReader.getDictId(docId, readerContext)));
}
break;
case FLOAT:
for (int docId = 0; docId < numDocs; docId++) {
- rawIndexCreator.putFloat(dictionary.getFloatValue(reader.getDictId(docId, readerContext)));
+ rawIndexCreator.putFloat(dictionary.getFloatValue(forwardIndexReader.getDictId(docId, readerContext)));
}
break;
case DOUBLE:
for (int docId = 0; docId < numDocs; docId++) {
- rawIndexCreator.putDouble(dictionary.getDoubleValue(reader.getDictId(docId, readerContext)));
+ rawIndexCreator.putDouble(dictionary.getDoubleValue(forwardIndexReader.getDictId(docId, readerContext)));
}
break;
case STRING:
for (int docId = 0; docId < numDocs; docId++) {
- rawIndexCreator.putString(dictionary.getStringValue(reader.getDictId(docId, readerContext)));
+ rawIndexCreator.putString(dictionary.getStringValue(forwardIndexReader.getDictId(docId, readerContext)));
}
break;
case BYTES:
for (int docId = 0; docId < numDocs; docId++) {
- rawIndexCreator.putBytes(dictionary.getBytesValue(reader.getDictId(docId, readerContext)));
+ rawIndexCreator.putBytes(dictionary.getBytesValue(forwardIndexReader.getDictId(docId, readerContext)));
}
break;
default:
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/ScanBasedFilterOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/ScanBasedFilterOperator.java
index 126431c9ce..89305d24fb 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/ScanBasedFilterOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/ScanBasedFilterOperator.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.core.operator.filter;
+import com.google.common.base.Preconditions;
import java.util.Collections;
import java.util.List;
import org.apache.pinot.core.common.Operator;
@@ -43,6 +44,9 @@ public class ScanBasedFilterOperator extends BaseFilterOperator {
_dataSource = dataSource;
_numDocs = numDocs;
_nullHandlingEnabled = nullHandlingEnabled;
+ Preconditions.checkState(_dataSource.getForwardIndex() != null,
+ "Forward index disabled for column: %s, scan based filtering not supported!",
+ _dataSource.getDataSourceMetadata().getFieldSpec().getName());
}
@Override
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/ForwardIndexDisabledMultiValueQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/ForwardIndexDisabledMultiValueQueriesTest.java
new file mode 100644
index 0000000000..a1c0418b33
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/ForwardIndexDisabledMultiValueQueriesTest.java
@@ -0,0 +1,885 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.queries;
+
+import java.io.File;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
+import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.segment.spi.ColumnMetadata;
+import org.apache.pinot.segment.spi.ImmutableSegment;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.segment.spi.creator.SegmentIndexCreationDriver;
+import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.TimeGranularitySpec;
+import org.apache.pinot.spi.utils.ReadMode;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+
+
+/**
+ * The <code>ForwardIndexDisabledMultiValueQueriesTest</code> class sets up the index segment for the no forward
+ * index multi-value queries test.
+ * <p>There are totally 14 columns, 100000 records inside the original Avro file where 10 columns are selected to build
+ * the index segment. Selected columns information are as following:
+ * <ul>
+ * ColumnName, FieldType, DataType, Cardinality, IsSorted, HasInvertedIndex, IsMultiValue, FwdIndexDisabled: S1, S2
+ * <li>column1, METRIC, INT, 51594, F, F, F, F, F</li>
+ * <li>column2, METRIC, INT, 42242, F, F, F, F, F</li>
+ * <li>column3, DIMENSION, STRING, 5, F, T, F, F, F</li>
+ * <li>column5, DIMENSION, STRING, 9, F, F, F, F, F</li>
+ * <li>column6, DIMENSION, INT, 18499, F, T, T, T, T</li>
+ * <li>column7, DIMENSION, INT, 359, F, T, T, T, F</li>
+ * <li>column8, DIMENSION, INT, 850, F, T, F, F, F</li>
+ * <li>column9, METRIC, INT, 146, F, T, F, F, F</li>
+ * <li>column10, METRIC, INT, 3960, F, F, F, F, F</li>
+ * <li>daysSinceEpoch, TIME, INT, 1, T, F, F, F, F</li>
+ * </ul>
+ */
+public class ForwardIndexDisabledMultiValueQueriesTest extends BaseQueriesTest {
+ private static final String AVRO_DATA = "data" + File.separator + "test_data-mv.avro";
+ private static final String SEGMENT_NAME_1 = "testTable_1756015688_1756015688";
+ private static final String SEGMENT_NAME_2 = "testTable_1756015689_1756015689";
+ private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(),
+ "ForwardIndexDisabledMultiValueQueriesTest");
+
+ private static final String SELECT_STAR_QUERY = "SELECT * FROM testTable";
+
+ // Hard-coded query filter.
+ protected static final String FILTER = " WHERE column1 > 100000000"
+ + " AND column2 BETWEEN 20000000 AND 1000000000"
+ + " AND column3 <> 'w'"
+ + " AND (column6 < 500000 OR column7 NOT IN (225, 407))"
+ + " AND daysSinceEpoch = 1756015683";
+
+ private IndexSegment _indexSegment;
+ // Contains 2 identical index segments.
+ private List<IndexSegment> _indexSegments;
+
+ private TableConfig _tableConfig;
+ private List<String> _invertedIndexColumns;
+ private List<String> _forwardIndexDisabledColumns;
+
+ @BeforeClass
+ public void buildSegment()
+ throws Exception {
+ FileUtils.deleteQuietly(INDEX_DIR);
+
+ // Get resource file path.
+ URL resource = getClass().getClassLoader().getResource(AVRO_DATA);
+ assertNotNull(resource);
+ String filePath = resource.getFile();
+
+ // Build the segment schema.
+ Schema schema = new Schema.SchemaBuilder().setSchemaName("testTable").addMetric("column1", FieldSpec.DataType.INT)
+ .addMetric("column2", FieldSpec.DataType.INT).addSingleValueDimension("column3", FieldSpec.DataType.STRING)
+ .addSingleValueDimension("column5", FieldSpec.DataType.STRING)
+ .addMultiValueDimension("column6", FieldSpec.DataType.INT)
+ .addMultiValueDimension("column7", FieldSpec.DataType.INT)
+ .addSingleValueDimension("column8", FieldSpec.DataType.INT).addMetric("column9", FieldSpec.DataType.INT)
+ .addMetric("column10", FieldSpec.DataType.INT)
+ .addTime(new TimeGranularitySpec(FieldSpec.DataType.INT, TimeUnit.DAYS, "daysSinceEpoch"), null).build();
+
+ createSegment(filePath, SEGMENT_NAME_1, schema);
+ createSegment(filePath, SEGMENT_NAME_2, schema);
+
+ ImmutableSegment immutableSegment1 = loadSegmentWithMetadataChecks(SEGMENT_NAME_1);
+ ImmutableSegment immutableSegment2 = loadSegmentWithMetadataChecks(SEGMENT_NAME_2);
+
+ _indexSegment = immutableSegment1;
+ _indexSegments = Arrays.asList(immutableSegment1, immutableSegment2);
+ }
+
+ private void createSegment(String filePath, String segmentName, Schema schema)
+ throws Exception {
+ // Create field configs for the no forward index columns
+ List<FieldConfig> fieldConfigList = new ArrayList<>();
+ fieldConfigList.add(new FieldConfig("column6", FieldConfig.EncodingType.DICTIONARY, Collections.emptyList(), null,
+ Collections.singletonMap(FieldConfig.FORWARD_INDEX_DISABLED, Boolean.TRUE.toString())));
+ if (segmentName.equals(SEGMENT_NAME_1)) {
+ fieldConfigList.add(new FieldConfig("column7", FieldConfig.EncodingType.DICTIONARY, Collections.emptyList(),
+ null, Collections.singletonMap(FieldConfig.FORWARD_INDEX_DISABLED, Boolean.TRUE.toString())));
+
+ // Build table config based on segment 1 as it contains both columns under no forward index
+ _tableConfig = new TableConfigBuilder(TableType.OFFLINE).setNoDictionaryColumns(Arrays.asList("column5"))
+ .setTableName("testTable").setTimeColumnName("daysSinceEpoch").setFieldConfigList(fieldConfigList).build();
+ }
+
+ // Create the segment generator config.
+ SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(_tableConfig, schema);
+ segmentGeneratorConfig.setInputFilePath(filePath);
+ segmentGeneratorConfig.setTableName("testTable");
+ segmentGeneratorConfig.setOutDir(INDEX_DIR.getAbsolutePath());
+ segmentGeneratorConfig.setSegmentName(segmentName);
+ _invertedIndexColumns = Arrays.asList("column3", "column6", "column7", "column8", "column9");
+ segmentGeneratorConfig.setInvertedIndexCreationColumns(_invertedIndexColumns);
+ _forwardIndexDisabledColumns = new ArrayList<>(Arrays.asList("column6", "column7"));
+ segmentGeneratorConfig.setForwardIndexDisabledColumns(_forwardIndexDisabledColumns);
+ // The segment generation code in SegmentColumnarIndexCreator will throw
+ // exception if start and end time in time column are not in acceptable
+ // range. For this test, we first need to fix the input avro data
+ // to have the time column values in allowed range. Until then, the check
+ // is explicitly disabled
+ segmentGeneratorConfig.setSkipTimeValueCheck(true);
+
+ // Build the index segment.
+ SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
+ driver.init(segmentGeneratorConfig);
+ driver.build();
+ }
+
+ private ImmutableSegment loadSegmentWithMetadataChecks(String segmentName)
+ throws Exception {
+ IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig();
+ indexLoadingConfig.setTableConfig(_tableConfig);
+ indexLoadingConfig.setInvertedIndexColumns(new HashSet<>(_invertedIndexColumns));
+ indexLoadingConfig.setForwardIndexDisabledColumns(new HashSet<>(_forwardIndexDisabledColumns));
+ indexLoadingConfig.setReadMode(ReadMode.heap);
+
+ ImmutableSegment immutableSegment = ImmutableSegmentLoader.load(new File(INDEX_DIR, segmentName),
+ indexLoadingConfig);
+
+ Map<String, ColumnMetadata> columnMetadataMap1 = immutableSegment.getSegmentMetadata().getColumnMetadataMap();
+ columnMetadataMap1.forEach((column, metadata) -> {
+ if (column.equals("column6") || column.equals("column7")) {
+ assertTrue(metadata.hasDictionary());
+ assertFalse(metadata.isSingleValue());
+ assertNull(immutableSegment.getForwardIndex(column));
+ } else {
+ assertNotNull(immutableSegment.getForwardIndex(column));
+ }
+ });
+
+ return immutableSegment;
+ }
+
+ @AfterClass
+ public void deleteAndDestroySegment() {
+ FileUtils.deleteQuietly(INDEX_DIR);
+ _indexSegments.forEach((IndexSegment::destroy));
+ }
+
+ @Override
+ protected String getFilter() {
+ return FILTER;
+ }
+
+ @Override
+ protected IndexSegment getIndexSegment() {
+ return _indexSegment;
+ }
+
+ @Override
+ protected List<IndexSegment> getIndexSegments() {
+ return _indexSegments;
+ }
+
+ @Test
+ public void testSelectStarQueries() {
+ // Select * without any filters
+ try {
+ getBrokerResponse(SELECT_STAR_QUERY);
+ Assert.fail("Select * query should fail since forwardIndexDisabled on a select column");
+ } catch (IllegalStateException e) {
+ assertTrue(e.getMessage().contains("Forward index disabled for column:")
+ && e.getMessage().contains("cannot create DataFetcher!"));
+ }
+
+ // Select * with filters
+ try {
+ getBrokerResponse(SELECT_STAR_QUERY + FILTER);
+ Assert.fail("Select * query should fail since forwardIndexDisabled on a select column");
+ } catch (IllegalStateException e) {
+ assertTrue(e.getMessage().contains("Forward index disabled for column:")
+ && e.getMessage().contains("scan based filtering not supported!"));
+ }
+ }
+
+ @Test
+ public void testSelectQueries() {
+ {
+ // Selection query without filters including a column with forwardIndexDisabled enabled on both segments
+ String query = "SELECT column1, column5, column6, column9, column10 FROM testTable";
+ try {
+ getBrokerResponse(query);
+ Assert.fail("Query should fail since forwardIndexDisabled on a select column");
+ } catch (IllegalStateException e) {
+ assertTrue(e.getMessage().contains("Forward index disabled for column:")
+ && e.getMessage().contains("cannot create DataFetcher!"));
+ }
+ }
+ {
+ // Selection query without filters including a column with forwardIndexDisabled enabled on one segment
+ String query = "SELECT column1, column5, column7, column9, column10 FROM testTable";
+ try {
+ getBrokerResponse(query);
+ Assert.fail("Query should fail since forwardIndexDisabled on a select column");
+ } catch (IllegalStateException e) {
+ assertTrue(e.getMessage().contains("Forward index disabled for column:")
+ && e.getMessage().contains("cannot create DataFetcher!"));
+ }
+ }
+ {
+ // Selection query with filters including a column with forwardIndexDisabled enabled on both segments
+ String query = "SELECT column1, column5, column6, column9, column10 FROM testTable WHERE column6 = 1001";
+ try {
+ getBrokerResponse(query);
+ Assert.fail("Query should fail since forwardIndexDisabled on a select column");
+ } catch (IllegalStateException e) {
+ assertTrue(e.getMessage().contains("Forward index disabled for column:")
+ && e.getMessage().contains("cannot create DataFetcher!"));
+ }
+ }
+ {
+ // Selection query with filters including a column with forwardIndexDisabled enabled on one segment
+ String query = "SELECT column1, column5, column7, column9, column10 FROM testTable WHERE column7 = 2147483647";
+ try {
+ getBrokerResponse(query);
+ Assert.fail("Query should fail since forwardIndexDisabled on a select column");
+ } catch (IllegalStateException e) {
+ assertTrue(e.getMessage().contains("Forward index disabled for column:")
+ && e.getMessage().contains("cannot create DataFetcher!"));
+ }
+ }
+ {
+ // Selection query without filters and without columns with forwardIndexDisabled enabled on either segment
+ String query = "SELECT column1, column5, column9, column10 FROM testTable ORDER BY column1";
+ BrokerResponseNative brokerResponseNative = getBrokerResponse(query);
+ assertTrue(brokerResponseNative.getProcessingExceptions() == null
+ || brokerResponseNative.getProcessingExceptions().size() == 0);
+ ResultTable resultTable = brokerResponseNative.getResultTable();
+ assertEquals(brokerResponseNative.getNumRowsResultSet(), 10);
+ assertEquals(brokerResponseNative.getTotalDocs(), 400_000L);
+ assertEquals(brokerResponseNative.getNumDocsScanned(), 400_000L);
+ assertEquals(brokerResponseNative.getNumSegmentsProcessed(), 4L);
+ assertEquals(brokerResponseNative.getNumSegmentsMatched(), 4L);
+ assertEquals(brokerResponseNative.getNumEntriesScannedPostFilter(), 400_120L);
+ assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 0L);
+ assertNotNull(brokerResponseNative.getProcessingExceptions());
+ assertEquals(brokerResponseNative.getProcessingExceptions().size(), 0);
+ DataSchema dataSchema = new DataSchema(new String[]{"column1", "column5", "column9", "column10"},
+ new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING,
+ DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.INT});
+ assertEquals(resultTable.getDataSchema(), dataSchema);
+ List<Object[]> resultRows = resultTable.getRows();
+ int previousColumn1 = Integer.MIN_VALUE;
+ for (Object[] resultRow : resultRows) {
+ assertEquals(resultRow.length, 4);
+ // Column 1
+ assertTrue((Integer) resultRow[0] >= previousColumn1);
+ previousColumn1 = (Integer) resultRow[0];
+ }
+
+ Object[] firstRow = resultRows.get(0);
+ // Column 5
+ assertEquals((String) firstRow[1], "AKXcXcIqsqOJFsdwxZ");
+ }
+ {
+ // Transform function on a selection clause with a forwardIndexDisabled column in transform
+ String query = "SELECT ARRAYLENGTH(column6) from testTable";
+ try {
+ getBrokerResponse(query);
+ Assert.fail("Query should fail since forwardIndexDisabled on a select column in transform");
+ } catch (IllegalStateException e) {
+ assertTrue(e.getMessage().contains("Forward index disabled for column:")
+ && e.getMessage().contains("cannot create DataFetcher!"));
+ }
+ }
+ {
+ // Selection query with filters (not including forwardIndexDisabled column) and without columns with
+ // forwardIndexDisabled enabled on either segment
+ String query = "SELECT column1, column5, column9, column10 FROM testTable WHERE column1 > 100000000"
+ + " AND column2 BETWEEN 20000000 AND 1000000000"
+ + " AND column3 <> 'w'"
+ + " AND daysSinceEpoch = 1756015683 ORDER BY column1";
+ BrokerResponseNative brokerResponseNative = getBrokerResponse(query);
+ assertTrue(brokerResponseNative.getProcessingExceptions() == null
+ || brokerResponseNative.getProcessingExceptions().size() == 0);
+ ResultTable resultTable = brokerResponseNative.getResultTable();
+ assertEquals(brokerResponseNative.getNumRowsResultSet(), 10);
+ assertEquals(brokerResponseNative.getTotalDocs(), 400_000L);
+ assertEquals(brokerResponseNative.getNumDocsScanned(), 62_700L);
+ assertEquals(brokerResponseNative.getNumSegmentsProcessed(), 4L);
+ assertEquals(brokerResponseNative.getNumSegmentsMatched(), 4L);
+ assertEquals(brokerResponseNative.getNumEntriesScannedPostFilter(), 62_820);
+ assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 304_120L);
+ assertNotNull(brokerResponseNative.getProcessingExceptions());
+ assertEquals(brokerResponseNative.getProcessingExceptions().size(), 0);
+ DataSchema dataSchema = new DataSchema(new String[]{"column1", "column5", "column9", "column10"},
+ new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING,
+ DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.INT});
+ assertEquals(resultTable.getDataSchema(), dataSchema);
+ List<Object[]> resultRows = resultTable.getRows();
+ int previousColumn1 = Integer.MIN_VALUE;
+ for (Object[] resultRow : resultRows) {
+ assertEquals(resultRow.length, 4);
+ assertTrue((Integer) resultRow[0] >= previousColumn1);
+ previousColumn1 = (Integer) resultRow[0];
+ }
+
+ Object[] firstRow = resultRows.get(0);
+ // Column 5
+ assertEquals((String) firstRow[1], "AKXcXcIqsqOJFsdwxZ");
+ }
+ {
+ // Selection query with supported filters (including forwardIndexDisabled column) and without columns with
+ // forwardIndexDisabled enabled on either segment
+ String query = "SELECT column1, column5, column9, column10 FROM testTable WHERE column6 = 1001 "
+ + "ORDER BY column1";
+ BrokerResponseNative brokerResponseNative = getBrokerResponse(query);
+ assertTrue(brokerResponseNative.getProcessingExceptions() == null
+ || brokerResponseNative.getProcessingExceptions().size() == 0);
+ ResultTable resultTable = brokerResponseNative.getResultTable();
+ assertEquals(brokerResponseNative.getNumRowsResultSet(), 8);
+ assertEquals(brokerResponseNative.getTotalDocs(), 400_000L);
+ assertEquals(brokerResponseNative.getNumDocsScanned(), 8L);
+ assertEquals(brokerResponseNative.getNumSegmentsProcessed(), 4L);
+ assertEquals(brokerResponseNative.getNumSegmentsMatched(), 4L);
+ assertEquals(brokerResponseNative.getNumEntriesScannedPostFilter(), 32L);
+ assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 0L);
+ assertNotNull(brokerResponseNative.getProcessingExceptions());
+ assertEquals(brokerResponseNative.getProcessingExceptions().size(), 0);
+ DataSchema dataSchema = new DataSchema(new String[]{"column1", "column5", "column9", "column10"},
+ new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING,
+ DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.INT});
+ assertEquals(resultTable.getDataSchema(), dataSchema);
+ List<Object[]> resultRows = resultTable.getRows();
+ for (Object[] resultRow : resultRows) {
+ assertEquals(resultRow.length, 4);
+ assertEquals(resultRow[0], 1849044734);
+ assertEquals((String) resultRow[1], "AKXcXcIqsqOJFsdwxZ");
+ assertEquals(resultRow[2], 674022574);
+ assertEquals(resultRow[3], 674022574);
+ }
+ }
+ {
+ // Selection query with supported filters (including forwardIndexDisabled column) and without columns with
+ // forwardIndexDisabled enabled on either segment
+ String query = "SELECT column1, column5, column9, column10 FROM testTable WHERE column7 != 201 "
+ + "ORDER BY column1";
+ BrokerResponseNative brokerResponseNative = getBrokerResponse(query);
+ assertTrue(brokerResponseNative.getProcessingExceptions() == null
+ || brokerResponseNative.getProcessingExceptions().size() == 0);
+ ResultTable resultTable = brokerResponseNative.getResultTable();
+ assertEquals(brokerResponseNative.getNumRowsResultSet(), 10);
+ assertEquals(brokerResponseNative.getTotalDocs(), 400_000L);
+ assertEquals(brokerResponseNative.getNumDocsScanned(), 399_896L);
+ assertEquals(brokerResponseNative.getNumSegmentsProcessed(), 4L);
+ assertEquals(brokerResponseNative.getNumSegmentsMatched(), 4L);
+ assertEquals(brokerResponseNative.getNumEntriesScannedPostFilter(), 400_016L);
+ assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 0L);
+ assertNotNull(brokerResponseNative.getProcessingExceptions());
+ assertEquals(brokerResponseNative.getProcessingExceptions().size(), 0);
+ DataSchema dataSchema = new DataSchema(new String[]{"column1", "column5", "column9", "column10"},
+ new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING,
+ DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.INT});
+ assertEquals(resultTable.getDataSchema(), dataSchema);
+ List<Object[]> resultRows = resultTable.getRows();
+ int previousColumn1 = Integer.MIN_VALUE;
+ for (Object[] resultRow : resultRows) {
+ assertEquals(resultRow.length, 4);
+ assertTrue((Integer) resultRow[0] >= previousColumn1);
+ previousColumn1 = (Integer) resultRow[0];
+ }
+
+ Object[] firstRow = resultRows.get(0);
+ // Column 5
+ assertEquals((String) firstRow[1], "AKXcXcIqsqOJFsdwxZ");
+ }
+ {
+ // Selection query with supported filters (including forwardIndexDisabled column) and without columns with
+ // forwardIndexDisabled enabled on either segment
+ String query = "SELECT column1, column5, column9, column10 FROM testTable WHERE column7 IN (201, 2147483647) "
+ + "ORDER BY column1";
+ BrokerResponseNative brokerResponseNative = getBrokerResponse(query);
+ assertTrue(brokerResponseNative.getProcessingExceptions() == null
+ || brokerResponseNative.getProcessingExceptions().size() == 0);
+ ResultTable resultTable = brokerResponseNative.getResultTable();
+ assertEquals(brokerResponseNative.getNumRowsResultSet(), 10);
+ assertEquals(brokerResponseNative.getTotalDocs(), 400_000L);
+ assertEquals(brokerResponseNative.getNumDocsScanned(), 199_860L);
+ assertEquals(brokerResponseNative.getNumSegmentsProcessed(), 4L);
+ assertEquals(brokerResponseNative.getNumSegmentsMatched(), 4L);
+ assertEquals(brokerResponseNative.getNumEntriesScannedPostFilter(), 199_980L);
+ assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 0L);
+ assertNotNull(brokerResponseNative.getProcessingExceptions());
+ assertEquals(brokerResponseNative.getProcessingExceptions().size(), 0);
+ DataSchema dataSchema = new DataSchema(new String[]{"column1", "column5", "column9", "column10"},
+ new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING,
+ DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.INT});
+ assertEquals(resultTable.getDataSchema(), dataSchema);
+ List<Object[]> resultRows = resultTable.getRows();
+ int previousColumn1 = Integer.MIN_VALUE;
+ for (Object[] resultRow : resultRows) {
+ assertEquals(resultRow.length, 4);
+ assertTrue((Integer) resultRow[0] >= previousColumn1);
+ previousColumn1 = (Integer) resultRow[0];
+ }
+
+ Object[] firstRow = resultRows.get(0);
+ // Column 5
+ assertEquals((String) firstRow[1], "AKXcXcIqsqOJFsdwxZ");
+ }
+ {
+ // Selection query with supported filters (including forwardIndexDisabled column) and without columns with
+ // forwardIndexDisabled enabled on either segment
+ String query = "SELECT column1, column5, column9, column10 FROM testTable WHERE column6 NOT IN "
+ + "(1001, 2147483647) ORDER BY column1";
+ BrokerResponseNative brokerResponseNative = getBrokerResponse(query);
+ assertTrue(brokerResponseNative.getProcessingExceptions() == null
+ || brokerResponseNative.getProcessingExceptions().size() == 0);
+ ResultTable resultTable = brokerResponseNative.getResultTable();
+ assertEquals(brokerResponseNative.getNumRowsResultSet(), 10);
+ assertEquals(brokerResponseNative.getTotalDocs(), 400_000L);
+ assertEquals(brokerResponseNative.getNumDocsScanned(), 174_552L);
+ assertEquals(brokerResponseNative.getNumSegmentsProcessed(), 4L);
+ assertEquals(brokerResponseNative.getNumSegmentsMatched(), 4L);
+ assertEquals(brokerResponseNative.getNumEntriesScannedPostFilter(), 174_672L);
+ assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 0L);
+ assertNotNull(brokerResponseNative.getProcessingExceptions());
+ assertEquals(brokerResponseNative.getProcessingExceptions().size(), 0);
+ DataSchema dataSchema = new DataSchema(new String[]{"column1", "column5", "column9", "column10"},
+ new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING,
+ DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.INT});
+ assertEquals(resultTable.getDataSchema(), dataSchema);
+ List<Object[]> resultRows = resultTable.getRows();
+ int previousColumn1 = Integer.MIN_VALUE;
+ for (Object[] resultRow : resultRows) {
+ assertEquals(resultRow.length, 4);
+ assertTrue((Integer) resultRow[0] >= previousColumn1);
+ previousColumn1 = (Integer) resultRow[0];
+ }
+
+ Object[] firstRow = resultRows.get(0);
+ // Column 5
+ assertEquals((String) firstRow[1], "EOFxevm");
+ }
+ {
+ // Query with literal only in SELECT
+ String query = "SELECT 'marvin' from testTable ORDER BY column1";
+ BrokerResponseNative brokerResponseNative = getBrokerResponse(query);
+ assertTrue(brokerResponseNative.getProcessingExceptions() == null
+ || brokerResponseNative.getProcessingExceptions().size() == 0);
+ ResultTable resultTable = brokerResponseNative.getResultTable();
+ assertEquals(brokerResponseNative.getNumRowsResultSet(), 10);
+ assertEquals(brokerResponseNative.getTotalDocs(), 400_000L);
+ assertEquals(brokerResponseNative.getNumDocsScanned(), 400_000L);
+ assertEquals(brokerResponseNative.getNumSegmentsProcessed(), 4L);
+ assertEquals(brokerResponseNative.getNumSegmentsMatched(), 4L);
+ assertEquals(brokerResponseNative.getNumEntriesScannedPostFilter(), 400_000L);
+ assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 0L);
+ assertNotNull(brokerResponseNative.getProcessingExceptions());
+ assertEquals(brokerResponseNative.getProcessingExceptions().size(), 0);
+ assertEquals(resultTable.getDataSchema(), new DataSchema(new String[]{"'marvin'"},
+ new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING}));
+ List<Object[]> resultRows = resultTable.getRows();
+ for (Object[] resultRow : resultRows) {
+ assertEquals(resultRow.length, 1);
+ assertEquals((String) resultRow[0], "marvin");
+ }
+ }
+ {
+ // Selection query with '<' filter on a forwardIndexDisabled column without range index available
+ String query = "SELECT column1, column5, column9, column10 FROM testTable WHERE column6 < 2147483647 AND "
+ + "column6 >= 1001 ORDER BY column1";
+ try {
+ getBrokerResponse(query);
+ Assert.fail("Query should fail since forwardIndexDisabled on a range query column without range index");
+ } catch (IllegalStateException e) {
+ assertTrue(e.getMessage().contains("Forward index disabled for column:")
+ && e.getMessage().contains("scan based filtering not supported!"));
+ }
+ }
+ {
+ // Selection query with '>=' filter on a forwardIndexDisabled column without range index available
+ String query = "SELECT column1, column5, column9, column10 FROM testTable WHERE column7 > 201";
+ try {
+ getBrokerResponse(query);
+ Assert.fail("Query should fail since forwardIndexDisabled on a range query column without range index");
+ } catch (IllegalStateException e) {
+ assertTrue(e.getMessage().contains("Forward index disabled for column:")
+ && e.getMessage().contains("scan based filtering not supported!"));
+ }
+ }
+ {
+ // Select query with a filter on a column which doesn't have forwardIndexDisabled enabled
+ String query = "SELECT column1, column5, column9 from testTable WHERE column9 < 3890167 ORDER BY column1";
+ BrokerResponseNative brokerResponseNative = getBrokerResponse(query);
+ assertTrue(brokerResponseNative.getProcessingExceptions() == null
+ || brokerResponseNative.getProcessingExceptions().size() == 0);
+ ResultTable resultTable = brokerResponseNative.getResultTable();
+ assertEquals(brokerResponseNative.getNumRowsResultSet(), 10);
+ assertEquals(brokerResponseNative.getTotalDocs(), 400_000L);
+ assertEquals(brokerResponseNative.getNumDocsScanned(), 48L);
+ assertEquals(brokerResponseNative.getNumSegmentsProcessed(), 4L);
+ assertEquals(brokerResponseNative.getNumSegmentsMatched(), 4L);
+ assertEquals(brokerResponseNative.getNumEntriesScannedPostFilter(), 128L);
+ assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 400_000L);
+ assertNotNull(brokerResponseNative.getProcessingExceptions());
+ assertEquals(brokerResponseNative.getProcessingExceptions().size(), 0);
+ assertEquals(resultTable.getDataSchema(), new DataSchema(new String[]{"column1", "column5", "column9"},
+ new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.INT
+ }));
+ List<Object[]> resultRows = resultTable.getRows();
+ int previousColumn1Value = Integer.MIN_VALUE;
+ for (Object[] resultRow : resultRows) {
+ assertEquals(resultRow.length, 3);
+ assertTrue((Integer) resultRow[0] >= previousColumn1Value);
+ previousColumn1Value = (Integer) resultRow[0];
+ assertEquals(resultRow[2], 3890166);
+ }
+ }
+ {
+ // Transform function on a filter clause for forwardIndexDisabled column in transform
+ String query = "SELECT column1, column10 from testTable WHERE ARRAYLENGTH(column6) = 2";
+ BrokerResponseNative brokerResponseNative = getBrokerResponse(query);
+ assertTrue(brokerResponseNative.getProcessingExceptions() != null
+ && brokerResponseNative.getProcessingExceptions().size() > 0);
+ }
+ }
+
+ @Test
+ public void testSelectWithDistinctQueries() {
+ // Select a mix of forwardIndexDisabled and non-forwardIndexDisabled columns with distinct
+ String query = "SELECT DISTINCT column1, column6, column9 FROM testTable LIMIT 10";
+ try {
+ getBrokerResponse(query);
+ Assert.fail("Query should fail since forwardIndexDisabled on a column in select distinct");
+ } catch (IllegalStateException e) {
+ assertTrue(e.getMessage().contains("Forward index disabled for column:")
+ && e.getMessage().contains("cannot create DataFetcher!"));
+ }
+ }
+
+ @Test
+ public void testSelectWithGroupByOrderByQueries() {
+ {
+ // Select a mix of forwardIndexDisabled and non-forwardIndexDisabled columns with group by order by
+ String query = "SELECT column1, column6 FROM testTable GROUP BY column1, column6 ORDER BY column1, column6 "
+ + " LIMIT 10";
+ try {
+ getBrokerResponse(query);
+ Assert.fail("Query should fail since forwardIndexDisabled on a column in select group by order by");
+ } catch (IllegalStateException e) {
+ assertTrue(e.getMessage().contains("Forward index disabled for column:")
+ && e.getMessage().contains("cannot create DataFetcher!"));
+ }
+ }
+ {
+ // Select forwardIndexDisabled columns with group by order by
+ String query = "SELECT column7, column6 FROM testTable GROUP BY column7, column6 ORDER BY column7, column6 "
+ + " LIMIT 10";
+ try {
+ getBrokerResponse(query);
+ Assert.fail("Query should fail since forwardIndexDisabled on a column in select group by order by");
+ } catch (IllegalStateException e) {
+ assertTrue(e.getMessage().contains("Forward index disabled for column:")
+ && e.getMessage().contains("cannot create DataFetcher!"));
+ }
+ }
+ {
+ // Select non-forwardIndexDisabled columns with group by order by
+ String query = "SELECT column1, column5 FROM testTable GROUP BY column1, column5 ORDER BY column1, column5 "
+ + " LIMIT 10";
+ BrokerResponseNative brokerResponseNative = getBrokerResponse(query);
+ assertTrue(brokerResponseNative.getProcessingExceptions() == null
+ || brokerResponseNative.getProcessingExceptions().size() == 0);
+ ResultTable resultTable = brokerResponseNative.getResultTable();
+ assertEquals(brokerResponseNative.getNumRowsResultSet(), 10);
+ assertEquals(brokerResponseNative.getTotalDocs(), 400_000L);
+ assertEquals(brokerResponseNative.getNumDocsScanned(), 400000L);
+ assertEquals(brokerResponseNative.getNumSegmentsProcessed(), 4L);
+ assertEquals(brokerResponseNative.getNumSegmentsMatched(), 4L);
+ assertEquals(brokerResponseNative.getNumEntriesScannedPostFilter(), 800000L);
+ assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 0L);
+ assertNotNull(brokerResponseNative.getProcessingExceptions());
+ assertEquals(brokerResponseNative.getProcessingExceptions().size(), 0);
+ assertEquals(resultTable.getDataSchema(), new DataSchema(new String[]{"column1", "column5"},
+ new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING}));
+ List<Object[]> resultRows = resultTable.getRows();
+ int previousVal = -1;
+ for (Object[] resultRow : resultRows) {
+ assertEquals(resultRow.length, 2);
+ assertTrue((int) resultRow[0] > previousVal);
+ previousVal = (int) resultRow[0];
+ }
+ }
+ {
+ // Select forwardIndexDisabled columns using transform with group by order by
+ String query = "SELECT ARRAYLENGTH(column6) FROM testTable GROUP BY ARRAYLENGTH(column6) ORDER BY "
+ + "ARRAYLENGTH(column6) LIMIT 10";
+ try {
+ getBrokerResponse(query);
+ Assert.fail("Query should fail since forwardIndexDisabled on a column in select group by order by");
+ } catch (IllegalStateException e) {
+ assertTrue(e.getMessage().contains("Forward index disabled for column:")
+ && e.getMessage().contains("cannot create DataFetcher!"));
+ }
+ }
+ {
+ // Test a select with a VALUEIN transform function with group by
+ String query = "SELECT VALUEIN(column6, '1001') from testTable WHERE column6 IN (1001) GROUP BY "
+ + "VALUEIN(column6, '1001') LIMIT 10";
+ try {
+ getBrokerResponse(query);
+ Assert.fail("Query should fail since forwardIndexDisabled on a column in select group by order by");
+ } catch (IllegalStateException e) {
+ assertTrue(e.getMessage().contains("Forward index disabled for column:")
+ && e.getMessage().contains("cannot create DataFetcher!"));
+ }
+ }
+ }
+
+ @Test
+ public void testSelectWithAggregationQueries() {
+ {
+ // Allowed aggregation functions on forwardIndexDisabled columns
+ String query = "SELECT maxmv(column7), minmv(column6), minmaxrangemv(column6), distinctcountmv(column7), "
+ + "distinctcounthllmv(column6), distinctcountrawhllmv(column7) from testTable";
+ BrokerResponseNative brokerResponseNative = getBrokerResponse(query);
+ assertTrue(brokerResponseNative.getProcessingExceptions() == null
+ || brokerResponseNative.getProcessingExceptions().size() == 0);
+ ResultTable resultTable = brokerResponseNative.getResultTable();
+ assertEquals(brokerResponseNative.getNumRowsResultSet(), 1);
+ assertEquals(brokerResponseNative.getTotalDocs(), 400_000L);
+ assertEquals(brokerResponseNative.getNumDocsScanned(), 400_000L);
+ assertEquals(brokerResponseNative.getNumSegmentsProcessed(), 4L);
+ assertEquals(brokerResponseNative.getNumSegmentsMatched(), 4L);
+ assertEquals(brokerResponseNative.getNumEntriesScannedPostFilter(), 0L);
+ assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 0L);
+ assertNotNull(brokerResponseNative.getProcessingExceptions());
+ assertEquals(brokerResponseNative.getProcessingExceptions().size(), 0);
+ assertEquals(resultTable.getDataSchema(), new DataSchema(new String[]{"maxmv(column7)", "minmv(column6)",
+ "minmaxrangemv(column6)", "distinctcountmv(column7)", "distinctcounthllmv(column6)",
+ "distinctcountrawhllmv(column7)"},
+ new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE,
+ DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.LONG,
+ DataSchema.ColumnDataType.STRING}));
+ List<Object[]> resultRows = resultTable.getRows();
+ for (Object[] resultRow : resultRows) {
+ assertEquals(resultRow.length, 6);
+ assertEquals(resultRow[0], 2.147483647E9);
+ assertEquals(resultRow[1], 1001.0);
+ assertEquals(resultRow[2], 2.147482646E9);
+ assertEquals(resultRow[3], 359);
+ assertEquals(resultRow[4], 20039L);
+ }
+ }
+ {
+ // Not allowed aggregation functions on forwardIndexDisabled columns
+ String query = "SELECT summv(column7), avgmv(column6) from testTable";
+ try {
+ getBrokerResponse(query);
+ Assert.fail("Query should fail since forwardIndexDisabled on a column in unsupported aggregation query");
+ } catch (IllegalStateException e) {
+ assertTrue(e.getMessage().contains("Forward index disabled for column:")
+ && e.getMessage().contains("cannot create DataFetcher!"));
+ }
+ }
+ {
+ // Allowed aggregation functions on forwardIndexDisabled columns with group by on non-forwardIndexDisabled
+ // column
+ String query = "SELECT column1, maxmv(column6) from testTable GROUP BY column1";
+ try {
+ getBrokerResponse(query);
+ Assert.fail("Query should fail since forwardIndexDisabled on a column in unsupported aggregation query");
+ } catch (IllegalStateException e) {
+ assertTrue(e.getMessage().contains("Forward index disabled for column:")
+ && e.getMessage().contains("cannot create DataFetcher!"));
+ }
+ }
+ {
+ // Allowed aggregation functions on forwardIndexDisabled columns with group by order by on
+ // non-forwardIndexDisabled column
+ String query = "SELECT column1, maxmv(column6) from testTable GROUP BY column1 ORDER BY column1";
+ try {
+ getBrokerResponse(query);
+ Assert.fail("Query should fail since forwardIndexDisabled on a column in unsupported aggregation query");
+ } catch (IllegalStateException e) {
+ assertTrue(e.getMessage().contains("Forward index disabled for column:")
+ && e.getMessage().contains("cannot create DataFetcher!"));
+ }
+ }
+ {
+ // Allowed aggregation functions on non-forwardIndexDisabled columns with group by on non-forwardIndexDisabled
+ // column but order by on allowed aggregation function on forwardIndexDisabled column
+ String query = "SELECT column1, max(column9) from testTable GROUP BY column1 ORDER BY minmv(column6)";
+ try {
+ getBrokerResponse(query);
+ Assert.fail("Query should fail since forwardIndexDisabled on a column in unsupported aggregation query");
+ } catch (IllegalStateException e) {
+ assertTrue(e.getMessage().contains("Forward index disabled for column:")
+ && e.getMessage().contains("cannot create DataFetcher!"));
+ }
+ }
+ {
+ // Allowed aggregation functions on forwardIndexDisabled columns with a filter - results in trying to scan which
+ // fails
+ String query = "SELECT maxmv(column7), minmv(column6) from testTable WHERE column7 = 201";
+ try {
+ getBrokerResponse(query);
+ Assert.fail("Query should fail since forwardIndexDisabled on a column in unsupported aggregation query");
+ } catch (IllegalStateException e) {
+ assertTrue(e.getMessage().contains("Forward index disabled for column:")
+ && e.getMessage().contains("cannot create DataFetcher!"));
+ }
+ }
+ {
+ // Allowed aggregation functions on forwardIndexDisabled columns with a filter - results in trying to scan which
+ // fails
+ String query = "SELECT max(column1), minmv(column6) from testTable WHERE column1 > 15935";
+ try {
+ getBrokerResponse(query);
+ Assert.fail("Query should fail since forwardIndexDisabled on a column in unsupported aggregation query");
+ } catch (IllegalStateException e) {
+ assertTrue(e.getMessage().contains("Forward index disabled for column:")
+ && e.getMessage().contains("cannot create DataFetcher!"));
+ }
+ }
+ {
+ // Allowed aggregation functions on non-forwardIndexDisabled columns with a filter on a forwardIndexDisabled
+ // column
+ String query = "SELECT max(column1), sum(column9) from testTable WHERE column7 = 2147483647";
+ BrokerResponseNative brokerResponseNative = getBrokerResponse(query);
+ assertTrue(brokerResponseNative.getProcessingExceptions() == null
+ || brokerResponseNative.getProcessingExceptions().size() == 0);
+ ResultTable resultTable = brokerResponseNative.getResultTable();
+ assertEquals(brokerResponseNative.getNumRowsResultSet(), 1);
+ assertEquals(brokerResponseNative.getTotalDocs(), 400_000L);
+ assertEquals(brokerResponseNative.getNumDocsScanned(), 199_756L);
+ assertEquals(brokerResponseNative.getNumSegmentsProcessed(), 4L);
+ assertEquals(brokerResponseNative.getNumSegmentsMatched(), 4L);
+ assertEquals(brokerResponseNative.getNumEntriesScannedPostFilter(), 399_512L);
+ assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 0L);
+ assertNotNull(brokerResponseNative.getProcessingExceptions());
+ assertEquals(brokerResponseNative.getProcessingExceptions().size(), 0);
+ assertEquals(resultTable.getDataSchema(), new DataSchema(new String[]{"max(column1)", "sum(column9)"},
+ new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE}));
+ List<Object[]> resultRows = resultTable.getRows();
+ for (Object[] resultRow : resultRows) {
+ assertEquals(resultRow.length, 2);
+ assertEquals(resultRow[0], 2.147313491E9);
+ assertEquals(resultRow[1], 1.38051889779548E14);
+ }
+ }
+ {
+ // Allowed aggregation functions on non-forwardIndexDisabled columns with a filter on a forwardIndexDisabled
+ // column and group by order by on non-forwardIndexDisabled column
+ String query = "SELECT column1, max(column1), sum(column9) from testTable WHERE column7 = 2147483647 GROUP BY "
+ + "column1 ORDER BY column1";
+ BrokerResponseNative brokerResponseNative = getBrokerResponse(query);
+ assertTrue(brokerResponseNative.getProcessingExceptions() == null
+ || brokerResponseNative.getProcessingExceptions().size() == 0);
+ ResultTable resultTable = brokerResponseNative.getResultTable();
+ assertEquals(brokerResponseNative.getNumRowsResultSet(), 10);
+ assertEquals(brokerResponseNative.getTotalDocs(), 400_000L);
+ assertEquals(brokerResponseNative.getNumDocsScanned(), 199_756L);
+ assertEquals(brokerResponseNative.getNumSegmentsProcessed(), 4L);
+ assertEquals(brokerResponseNative.getNumSegmentsMatched(), 4L);
+ assertEquals(brokerResponseNative.getNumEntriesScannedPostFilter(), 399_512L);
+ assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 0L);
+ assertNotNull(brokerResponseNative.getProcessingExceptions());
+ assertEquals(brokerResponseNative.getProcessingExceptions().size(), 0);
+ assertEquals(resultTable.getDataSchema(), new DataSchema(new String[]{"column1", "max(column1)", "sum(column9)"},
+ new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.DOUBLE,
+ DataSchema.ColumnDataType.DOUBLE}));
+ List<Object[]> resultRows = resultTable.getRows();
+ int previousVal = -1;
+ for (Object[] resultRow : resultRows) {
+ assertEquals(resultRow.length, 3);
+ assertTrue((int) resultRow[0] > previousVal);
+ previousVal = (int) resultRow[0];
+ }
+ }
+ {
+ // Allowed aggregation functions on non-forwardIndexDisabled columns with a filter on a forwardIndexDisabled
+ // column and group by on non-forwardIndexDisabled column order by on forwardIndexDisabled aggregation column
+ String query = "SELECT column1, max(column1), sum(column9) from testTable WHERE column7 = 201 GROUP BY "
+ + "column1 ORDER BY minmv(column6)";
+ try {
+ getBrokerResponse(query);
+ Assert.fail("Query should fail since forwardIndexDisabled on a column in unsupported aggregation query");
+ } catch (IllegalStateException e) {
+ assertTrue(e.getMessage().contains("Forward index disabled for column:")
+ && e.getMessage().contains("cannot create DataFetcher!"));
+ }
+ }
+ {
+ // Transform inside aggregation involving a forwardIndexDisabled column
+ String query = "SELECT MAX(ARRAYLENGTH(column6)) from testTable LIMIT 10";
+ try {
+ getBrokerResponse(query);
+ Assert.fail("Query should fail since forwardIndexDisabled on a column in unsupported aggregation query");
+ } catch (IllegalStateException e) {
+ assertTrue(e.getMessage().contains("Forward index disabled for column:")
+ && e.getMessage().contains("cannot create DataFetcher!"));
+ }
+ }
+ {
+ // Transform inside aggregation involving a forwardIndexDisabled column with group by
+ String query = "SELECT column1, MAX(ARRAYLENGTH(column6)) from testTable GROUP BY column1 LIMIT 10";
+ try {
+ getBrokerResponse(query);
+ Assert.fail("Query should fail since forwardIndexDisabled on a column in unsupported aggregation query");
+ } catch (IllegalStateException e) {
+ assertTrue(e.getMessage().contains("Forward index disabled for column:")
+ && e.getMessage().contains("cannot create DataFetcher!"));
+ }
+ }
+ {
+ // Transform inside aggregation involving a forwardIndexDisabled column with group by order by
+ String query = "SELECT column1, MAX(ARRAYLENGTH(column6)) from testTable GROUP BY column1 ORDER BY column1 "
+ + "DESC LIMIT 10";
+ try {
+ getBrokerResponse(query);
+ Assert.fail("Query should fail since forwardIndexDisabled on a column in unsupported aggregation query");
+ } catch (IllegalStateException e) {
+ assertTrue(e.getMessage().contains("Forward index disabled for column:")
+ && e.getMessage().contains("cannot create DataFetcher!"));
+ }
+ }
+ }
+}
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/ForwardIndexDisabledSingleValueQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/ForwardIndexDisabledSingleValueQueriesTest.java
new file mode 100644
index 0000000000..44536e0951
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/ForwardIndexDisabledSingleValueQueriesTest.java
@@ -0,0 +1,1217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.queries;
+
+import java.io.File;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
+import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.segment.spi.ColumnMetadata;
+import org.apache.pinot.segment.spi.ImmutableSegment;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.segment.spi.creator.SegmentIndexCreationDriver;
+import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.TimeGranularitySpec;
+import org.apache.pinot.spi.utils.ReadMode;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+
+
+/**
+ * The <code>ForwardIndexDisabledSingleValueQueriesTest</code> class sets up the index segment for the no forward
+ * index single-value queries test.
+ * <p>There are totally 18 columns, 30000 records inside the original Avro file where 11 columns are selected to build
+ * the index segment. Selected columns information are as following:
+ * <ul>
+ * ColumnName, FieldType, DataType, Cardinality, IsSorted, HasInvertedIndex, FwdIndexDisabled: S1, S2, HasRangeIndex
+ * <li>column1, METRIC, INT, 6582, F, F, F, F, F</li>
+ * <li>column3, METRIC, INT, 21910, F, F, F, F, F</li>
+ * <li>column5, DIMENSION, STRING, 1, T, F, F, F, F</li>
+ * <li>column6, DIMENSION, INT, 608, F, T, T, T, T</li>
+ * <li>column7, DIMENSION, INT, 146, F, T, T, F, F</li>
+ * <li>column9, DIMENSION, INT, 1737, F, F, F, F, F</li>
+ * <li>column11, DIMENSION, STRING, 5, F, T, F, F, F</li>
+ * <li>column12, DIMENSION, STRING, 5, F, F, F, F, F</li>
+ * <li>column17, METRIC, INT, 24, F, T, F, F, F</li>
+ * <li>column18, METRIC, INT, 1440, F, T, F, F, F</li>
+ * <li>daysSinceEpoch, TIME, INT, 2, T, F, F, F, F</li>
+ * </ul>
+ */
+public class ForwardIndexDisabledSingleValueQueriesTest extends BaseQueriesTest {
+ private static final String AVRO_DATA = "data" + File.separator + "test_data-sv.avro";
+ private static final String SEGMENT_NAME_1 = "testTable_126164076_167572857";
+ private static final String SEGMENT_NAME_2 = "testTable_126164076_167572858";
+ private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(),
+ "ForwardIndexDisabledSingleValueQueriesTest");
+
+ private static final String SELECT_STAR_QUERY = "SELECT * FROM testTable";
+
+ // Hard-coded query filter.
+ private static final String FILTER = " WHERE column1 > 100000000"
+ + " AND column3 BETWEEN 20000000 AND 1000000000"
+ + " AND column5 = 'gFuH'"
+ + " AND (column6 < 500000000 OR column11 NOT IN ('t', 'P'))"
+ + " AND daysSinceEpoch = 126164076";
+
+ private IndexSegment _indexSegment;
+ // Contains 2 index segments, one with 2 columns with forward index disabled, and the other with just 1.
+ private List<IndexSegment> _indexSegments;
+
+ private TableConfig _tableConfig;
+ private List<String> _invertedIndexColumns;
+ private List<String> _forwardIndexDisabledColumns;
+
+ @BeforeClass
+ public void buildAndLoadSegment()
+ throws Exception {
+ FileUtils.deleteQuietly(INDEX_DIR);
+
+ // Get resource file path.
+ URL resource = getClass().getClassLoader().getResource(AVRO_DATA);
+ assertNotNull(resource);
+ String filePath = resource.getFile();
+
+ // Build the segment schema.
+ Schema schema = new Schema.SchemaBuilder().setSchemaName("testTable").addMetric("column1", FieldSpec.DataType.INT)
+ .addMetric("column3", FieldSpec.DataType.INT).addSingleValueDimension("column5", FieldSpec.DataType.STRING)
+ .addSingleValueDimension("column6", FieldSpec.DataType.INT)
+ .addSingleValueDimension("column7", FieldSpec.DataType.INT)
+ .addSingleValueDimension("column9", FieldSpec.DataType.INT)
+ .addSingleValueDimension("column11", FieldSpec.DataType.STRING)
+ .addSingleValueDimension("column12", FieldSpec.DataType.STRING).addMetric("column17", FieldSpec.DataType.INT)
+ .addMetric("column18", FieldSpec.DataType.INT)
+ .addTime(new TimeGranularitySpec(FieldSpec.DataType.INT, TimeUnit.DAYS, "daysSinceEpoch"), null).build();
+
+ createSegment(filePath, SEGMENT_NAME_1, schema);
+ createSegment(filePath, SEGMENT_NAME_2, schema);
+
+ ImmutableSegment immutableSegment1 = loadSegmentWithMetadataChecks(SEGMENT_NAME_1);
+ ImmutableSegment immutableSegment2 = loadSegmentWithMetadataChecks(SEGMENT_NAME_2);
+
+ _indexSegment = immutableSegment1;
+ _indexSegments = Arrays.asList(immutableSegment1, immutableSegment2);
+ }
+
+ private void createSegment(String filePath, String segmentName, Schema schema)
+ throws Exception {
+ // Create field configs for the no forward index columns
+ List<FieldConfig> fieldConfigList = new ArrayList<>();
+ fieldConfigList.add(new FieldConfig("column6", FieldConfig.EncodingType.DICTIONARY, Collections.emptyList(), null,
+ Collections.singletonMap(FieldConfig.FORWARD_INDEX_DISABLED, Boolean.TRUE.toString())));
+ if (segmentName.equals(SEGMENT_NAME_1)) {
+ fieldConfigList.add(new FieldConfig("column7", FieldConfig.EncodingType.DICTIONARY, Collections.emptyList(),
+ null, Collections.singletonMap(FieldConfig.FORWARD_INDEX_DISABLED, Boolean.TRUE.toString())));
+
+ // Build table config based on segment 1 as it contains both columns under no forward index
+ _tableConfig =
+ new TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setTimeColumnName("daysSinceEpoch")
+ .setFieldConfigList(fieldConfigList).setRangeIndexColumns(Arrays.asList("column6")).build();
+ }
+
+ // Create the segment generator config.
+ SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(_tableConfig, schema);
+ segmentGeneratorConfig.setInputFilePath(filePath);
+ segmentGeneratorConfig.setTableName("testTable");
+ segmentGeneratorConfig.setOutDir(INDEX_DIR.getAbsolutePath());
+ segmentGeneratorConfig.setSegmentName(segmentName);
+ // The segment generation code in SegmentColumnarIndexCreator will throw
+ // exception if start and end time in time column are not in acceptable
+ // range. For this test, we first need to fix the input avro data
+ // to have the time column values in allowed range. Until then, the check
+ // is explicitly disabled
+ segmentGeneratorConfig.setSkipTimeValueCheck(true);
+ _invertedIndexColumns = Arrays.asList("column6", "column7", "column11", "column17", "column18");
+ segmentGeneratorConfig.setInvertedIndexCreationColumns(_invertedIndexColumns);
+
+ _forwardIndexDisabledColumns = new ArrayList<>(Arrays.asList("column6", "column7"));
+ segmentGeneratorConfig.setForwardIndexDisabledColumns(_forwardIndexDisabledColumns);
+ segmentGeneratorConfig.setRangeIndexCreationColumns(Arrays.asList("column6"));
+
+ // Build the index segment.
+ SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
+ driver.init(segmentGeneratorConfig);
+ driver.build();
+ }
+
+ private ImmutableSegment loadSegmentWithMetadataChecks(String segmentName)
+ throws Exception {
+ IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig();
+ indexLoadingConfig.setTableConfig(_tableConfig);
+ indexLoadingConfig.setInvertedIndexColumns(new HashSet<>(_invertedIndexColumns));
+ indexLoadingConfig.setForwardIndexDisabledColumns(new HashSet<>(_forwardIndexDisabledColumns));
+ indexLoadingConfig.setRangeIndexColumns(new HashSet<>(Arrays.asList("column6")));
+ indexLoadingConfig.setReadMode(ReadMode.heap);
+
+ ImmutableSegment immutableSegment = ImmutableSegmentLoader.load(new File(INDEX_DIR, segmentName),
+ indexLoadingConfig);
+
+ Map<String, ColumnMetadata> columnMetadataMap1 = immutableSegment.getSegmentMetadata().getColumnMetadataMap();
+ columnMetadataMap1.forEach((column, metadata) -> {
+ if (column.equals("column6") || column.equals("column7")) {
+ assertTrue(metadata.hasDictionary());
+ assertTrue(metadata.isSingleValue());
+ assertNull(immutableSegment.getForwardIndex(column));
+ } else {
+ assertNotNull(immutableSegment.getForwardIndex(column));
+ }
+ });
+
+ return immutableSegment;
+ }
+
+ @AfterClass
+ public void deleteAndDestroySegment() {
+ FileUtils.deleteQuietly(INDEX_DIR);
+ _indexSegments.forEach((IndexSegment::destroy));
+ }
+
+ @Override
+ protected String getFilter() {
+ return FILTER;
+ }
+
+ @Override
+ protected IndexSegment getIndexSegment() {
+ return _indexSegment;
+ }
+
+ @Override
+ protected List<IndexSegment> getIndexSegments() {
+ return _indexSegments;
+ }
+
+ @Test
+ public void testSelectStarQueries() {
+ // Select * without any filters
+ try {
+ getBrokerResponse(SELECT_STAR_QUERY);
+ Assert.fail("Select * query should fail since forwardIndexDisabled on a select column");
+ } catch (IllegalStateException e) {
+ assertTrue(e.getMessage().contains("Forward index disabled for column:")
+ && e.getMessage().contains("cannot create DataFetcher!"));
+ }
+
+ // Select * with filters
+ try {
+ getBrokerResponse(SELECT_STAR_QUERY + FILTER);
+ Assert.fail("Select * query should fail since forwardIndexDisabled on a select column");
+ } catch (IllegalStateException e) {
+ assertTrue(e.getMessage().contains("Forward index disabled for column:")
+ && e.getMessage().contains("cannot create DataFetcher!"));
+ }
+ }
+
+ @Test
+ public void testSelectQueries() {
+ {
+ // Selection query without filters including a column with forwardIndexDisabled enabled on both segments
+ String query = "SELECT column1, column5, column6, column9, column11 FROM testTable";
+ try {
+ getBrokerResponse(query);
+ Assert.fail("Query should fail since forwardIndexDisabled on a select column");
+ } catch (IllegalStateException e) {
+ assertTrue(e.getMessage().contains("Forward index disabled for column:")
+ && e.getMessage().contains("cannot create DataFetcher!"));
+ }
+ }
+ {
+ // Selection query without filters including a column with forwardIndexDisabled enabled on one segment
+ String query = "SELECT column1, column5, column7, column9, column11 FROM testTable";
+ try {
+ getBrokerResponse(query);
+ Assert.fail("Query should fail since forwardIndexDisabled on a select column");
+ } catch (IllegalStateException e) {
+ assertTrue(e.getMessage().contains("Forward index disabled for column:")
+ && e.getMessage().contains("cannot create DataFetcher!"));
+ }
+ }
+ {
+ // Selection query with filters including a column with forwardIndexDisabled enabled on both segments
+ String query = "SELECT column1, column5, column6, column9, column11 FROM testTable WHERE column6 = 2147458029";
+ try {
+ getBrokerResponse(query);
+ Assert.fail("Query should fail since forwardIndexDisabled on a select column");
+ } catch (IllegalStateException e) {
+ assertTrue(e.getMessage().contains("Forward index disabled for column:")
+ && e.getMessage().contains("cannot create DataFetcher!"));
+ }
+ }
+ {
+ // Selection query with filters including a column with forwardIndexDisabled enabled on one segment
+ String query = "SELECT column1, column5, column7, column9, column11 FROM testTable WHERE column7 = 675695";
+ try {
+ getBrokerResponse(query);
+ Assert.fail("Query should fail since forwardIndexDisabled on a select column");
+ } catch (IllegalStateException e) {
+ assertTrue(e.getMessage().contains("Forward index disabled for column:")
+ && e.getMessage().contains("cannot create DataFetcher!"));
+ }
+ }
+ {
+ // Selection query without filters and without columns with forwardIndexDisabled enabled on either segment
+ String query = "SELECT column1, column5, column9, column11 FROM testTable ORDER BY column1";
+ BrokerResponseNative brokerResponseNative = getBrokerResponse(query);
+ assertTrue(brokerResponseNative.getProcessingExceptions() == null
+ || brokerResponseNative.getProcessingExceptions().size() == 0);
+ ResultTable resultTable = brokerResponseNative.getResultTable();
+ assertEquals(brokerResponseNative.getNumRowsResultSet(), 10);
+ assertEquals(brokerResponseNative.getTotalDocs(), 120_000L);
+ assertEquals(brokerResponseNative.getNumDocsScanned(), 120_000L);
+ assertEquals(brokerResponseNative.getNumSegmentsProcessed(), 4L);
+ assertEquals(brokerResponseNative.getNumSegmentsMatched(), 4L);
+ assertEquals(brokerResponseNative.getNumEntriesScannedPostFilter(), 120_120L);
+ assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 0L);
+ assertNotNull(brokerResponseNative.getProcessingExceptions());
+ assertEquals(brokerResponseNative.getProcessingExceptions().size(), 0);
+ DataSchema dataSchema = new DataSchema(new String[]{"column1", "column5", "column9", "column11"},
+ new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING,
+ DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING});
+ assertEquals(resultTable.getDataSchema(), dataSchema);
+ List<Object[]> resultRows = resultTable.getRows();
+ int previousColumn1 = Integer.MIN_VALUE;
+ for (Object[] resultRow : resultRows) {
+ assertEquals(resultRow.length, 4);
+ assertEquals((String) resultRow[1], "gFuH");
+ // Column 1
+ assertTrue((Integer) resultRow[0] >= previousColumn1);
+ previousColumn1 = (Integer) resultRow[0];
+ }
+
+ Object[] firstRow = resultRows.get(0);
+ // Column 11
+ assertEquals((String) firstRow[3], "o");
+ }
+ {
+ // Transform function on a selection clause with a forwardIndexDisabled column in transform
+ String query = "SELECT CONCAT(column6, column9, '-') from testTable";
+ try {
+ getBrokerResponse(query);
+ Assert.fail("Query should fail since forwardIndexDisabled on a column in transform");
+ } catch (IllegalStateException e) {
+ assertTrue(e.getMessage().contains("Forward index disabled for column:")
+ && e.getMessage().contains("cannot create DataFetcher!"));
+ }
+ }
+ {
+ // Transform function on a selection clause without a forwardIndexDisabled column in transform
+ String query = "SELECT CONCAT(column5, column9, '-') from testTable ORDER BY column1";
+ BrokerResponseNative brokerResponseNative = getBrokerResponse(query);
+ assertTrue(brokerResponseNative.getProcessingExceptions() == null
+ || brokerResponseNative.getProcessingExceptions().size() == 0);
+ ResultTable resultTable = brokerResponseNative.getResultTable();
+ assertEquals(brokerResponseNative.getNumRowsResultSet(), 10);
+ assertEquals(brokerResponseNative.getTotalDocs(), 120_000L);
+ assertEquals(brokerResponseNative.getNumDocsScanned(), 120_000L);
+ assertEquals(brokerResponseNative.getNumSegmentsProcessed(), 4L);
+ assertEquals(brokerResponseNative.getNumSegmentsMatched(), 4L);
+ assertEquals(brokerResponseNative.getNumEntriesScannedPostFilter(), 120_080L);
+ assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 0L);
+ assertNotNull(brokerResponseNative.getProcessingExceptions());
+ assertEquals(brokerResponseNative.getProcessingExceptions().size(), 0);
+ DataSchema dataSchema = new DataSchema(new String[]{"concat(column5,column9,'-')"},
+ new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING});
+ assertEquals(resultTable.getDataSchema(), dataSchema);
+ List<Object[]> resultRows = resultTable.getRows();
+ for (Object[] resultRow : resultRows) {
+ assertEquals(resultRow.length, 1);
+ assertTrue(resultRow[0].toString().startsWith("gFuH-"));
+ }
+ }
+ {
+ // Selection query with filters (not including forwardIndexDisabled column) and without columns with
+ // forwardIndexDisabled enabled on either segment
+ String query = "SELECT column1, column5, column9, column11 FROM testTable WHERE column1 > 100000000"
+ + " AND column3 BETWEEN 20000000 AND 1000000000"
+ + " AND column5 = 'gFuH'"
+ + " AND daysSinceEpoch = 126164076 ORDER BY column1";
+ BrokerResponseNative brokerResponseNative = getBrokerResponse(query);
+ assertTrue(brokerResponseNative.getProcessingExceptions() == null
+ || brokerResponseNative.getProcessingExceptions().size() == 0);
+ ResultTable resultTable = brokerResponseNative.getResultTable();
+ assertEquals(brokerResponseNative.getNumRowsResultSet(), 10);
+ assertEquals(brokerResponseNative.getTotalDocs(), 120_000L);
+ assertEquals(brokerResponseNative.getNumDocsScanned(), 42_368L);
+ assertEquals(brokerResponseNative.getNumSegmentsProcessed(), 4L);
+ assertEquals(brokerResponseNative.getNumSegmentsMatched(), 4L);
+ assertEquals(brokerResponseNative.getNumEntriesScannedPostFilter(), 42_488);
+ assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 192744L);
+ assertNotNull(brokerResponseNative.getProcessingExceptions());
+ assertEquals(brokerResponseNative.getProcessingExceptions().size(), 0);
+ DataSchema dataSchema = new DataSchema(new String[]{"column1", "column5", "column9", "column11"},
+ new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING,
+ DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING});
+ assertEquals(resultTable.getDataSchema(), dataSchema);
+ List<Object[]> resultRows = resultTable.getRows();
+ int previousColumn1 = Integer.MIN_VALUE;
+ for (Object[] resultRow : resultRows) {
+ assertEquals(resultRow.length, 4);
+ assertEquals((String) resultRow[1], "gFuH");
+ assertTrue((Integer) resultRow[0] >= previousColumn1);
+ previousColumn1 = (Integer) resultRow[0];
+ }
+
+ Object[] firstRow = resultRows.get(0);
+ // Column 11
+ assertEquals((String) firstRow[3], "P");
+ }
+ {
+ // Selection query with supported filters (including forwardIndexDisabled column) and without columns with
+ // forwardIndexDisabled enabled on either segment
+ String query = "SELECT column1, column5, column9, column11 FROM testTable WHERE column6 = 2147458029 "
+ + "ORDER BY column1";
+ BrokerResponseNative brokerResponseNative = getBrokerResponse(query);
+ assertTrue(brokerResponseNative.getProcessingExceptions() == null
+ || brokerResponseNative.getProcessingExceptions().size() == 0);
+ ResultTable resultTable = brokerResponseNative.getResultTable();
+ assertEquals(brokerResponseNative.getNumRowsResultSet(), 10);
+ assertEquals(brokerResponseNative.getTotalDocs(), 120_000L);
+ assertEquals(brokerResponseNative.getNumDocsScanned(), 16L);
+ assertEquals(brokerResponseNative.getNumSegmentsProcessed(), 4L);
+ assertEquals(brokerResponseNative.getNumSegmentsMatched(), 4L);
+ assertEquals(brokerResponseNative.getNumEntriesScannedPostFilter(), 64L);
+ assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 0L);
+ assertNotNull(brokerResponseNative.getProcessingExceptions());
+ assertEquals(brokerResponseNative.getProcessingExceptions().size(), 0);
+ DataSchema dataSchema = new DataSchema(new String[]{"column1", "column5", "column9", "column11"},
+ new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING,
+ DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING});
+ assertEquals(resultTable.getDataSchema(), dataSchema);
+ List<Object[]> resultRows = resultTable.getRows();
+ for (Object[] resultRow : resultRows) {
+ assertEquals(resultRow.length, 4);
+ assertEquals(resultRow[0], 635553468);
+ assertEquals((String) resultRow[1], "gFuH");
+ assertEquals(resultRow[2], 705242697);
+ assertEquals((String) resultRow[3], "P");
+ }
+ }
+ {
+ // Selection query with supported filters (including forwardIndexDisabled column) and without columns with
+ // forwardIndexDisabled enabled on either segment
+ String query = "SELECT column1, column5, column9, column11 FROM testTable WHERE column7 != 675695 "
+ + "ORDER BY column1";
+ BrokerResponseNative brokerResponseNative = getBrokerResponse(query);
+ assertTrue(brokerResponseNative.getProcessingExceptions() == null
+ || brokerResponseNative.getProcessingExceptions().size() == 0);
+ ResultTable resultTable = brokerResponseNative.getResultTable();
+ assertEquals(brokerResponseNative.getNumRowsResultSet(), 10);
+ assertEquals(brokerResponseNative.getTotalDocs(), 120_000L);
+ assertEquals(brokerResponseNative.getNumDocsScanned(), 119_908L);
+ assertEquals(brokerResponseNative.getNumSegmentsProcessed(), 4L);
+ assertEquals(brokerResponseNative.getNumSegmentsMatched(), 4L);
+ assertEquals(brokerResponseNative.getNumEntriesScannedPostFilter(), 120_028L);
+ assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 0L);
+ assertNotNull(brokerResponseNative.getProcessingExceptions());
+ assertEquals(brokerResponseNative.getProcessingExceptions().size(), 0);
+ DataSchema dataSchema = new DataSchema(new String[]{"column1", "column5", "column9", "column11"},
+ new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING,
+ DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING});
+ assertEquals(resultTable.getDataSchema(), dataSchema);
+ List<Object[]> resultRows = resultTable.getRows();
+ int previousColumn1 = Integer.MIN_VALUE;
+ for (Object[] resultRow : resultRows) {
+ assertEquals(resultRow.length, 4);
+ assertEquals((String) resultRow[1], "gFuH");
+ assertTrue((Integer) resultRow[0] >= previousColumn1);
+ previousColumn1 = (Integer) resultRow[0];
+ }
+
+ Object[] firstRow = resultRows.get(0);
+ // Column 11
+ assertEquals((String) firstRow[3], "o");
+ }
+ {
+ // Selection query with supported filters (including forwardIndexDisabled column) and without columns with
+ // forwardIndexDisabled enabled on either segment
+ String query = "SELECT column1, column5, column9, column11 FROM testTable WHERE column7 IN (675695, 2137685743) "
+ + "ORDER BY column1";
+ BrokerResponseNative brokerResponseNative = getBrokerResponse(query);
+ assertTrue(brokerResponseNative.getProcessingExceptions() == null
+ || brokerResponseNative.getProcessingExceptions().size() == 0);
+ ResultTable resultTable = brokerResponseNative.getResultTable();
+ assertEquals(brokerResponseNative.getNumRowsResultSet(), 10);
+ assertEquals(brokerResponseNative.getTotalDocs(), 120_000L);
+ assertEquals(brokerResponseNative.getNumDocsScanned(), 828L);
+ assertEquals(brokerResponseNative.getNumSegmentsProcessed(), 4L);
+ assertEquals(brokerResponseNative.getNumSegmentsMatched(), 4L);
+ assertEquals(brokerResponseNative.getNumEntriesScannedPostFilter(), 948L);
+ assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 0L);
+ assertNotNull(brokerResponseNative.getProcessingExceptions());
+ assertEquals(brokerResponseNative.getProcessingExceptions().size(), 0);
+ DataSchema dataSchema = new DataSchema(new String[]{"column1", "column5", "column9", "column11"},
+ new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING,
+ DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING});
+ assertEquals(resultTable.getDataSchema(), dataSchema);
+ List<Object[]> resultRows = resultTable.getRows();
+ int previousColumn1 = Integer.MIN_VALUE;
+ for (Object[] resultRow : resultRows) {
+ assertEquals(resultRow.length, 4);
+ assertEquals((String) resultRow[1], "gFuH");
+ assertTrue((Integer) resultRow[0] >= previousColumn1);
+ previousColumn1 = (Integer) resultRow[0];
+ }
+
+ Object[] firstRow = resultRows.get(0);
+ // Column 11
+ assertEquals((String) firstRow[3], "P");
+ }
+ {
+ // Selection query with supported filters (including forwardIndexDisabled column) and without columns with
+ // forwardIndexDisabled enabled on either segment
+ String query = "SELECT column1, column5, column9, column11 FROM testTable WHERE column6 NOT IN "
+ + "(1689277, 2147458029) ORDER BY column1";
+ BrokerResponseNative brokerResponseNative = getBrokerResponse(query);
+ assertTrue(brokerResponseNative.getProcessingExceptions() == null
+ || brokerResponseNative.getProcessingExceptions().size() == 0);
+ ResultTable resultTable = brokerResponseNative.getResultTable();
+ assertEquals(brokerResponseNative.getNumRowsResultSet(), 10);
+ assertEquals(brokerResponseNative.getTotalDocs(), 120_000L);
+ assertEquals(brokerResponseNative.getNumDocsScanned(), 119_980L);
+ assertEquals(brokerResponseNative.getNumSegmentsProcessed(), 4L);
+ assertEquals(brokerResponseNative.getNumSegmentsMatched(), 4L);
+ assertEquals(brokerResponseNative.getNumEntriesScannedPostFilter(), 120_100L);
+ assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 0L);
+ assertNotNull(brokerResponseNative.getProcessingExceptions());
+ assertEquals(brokerResponseNative.getProcessingExceptions().size(), 0);
+ DataSchema dataSchema = new DataSchema(new String[]{"column1", "column5", "column9", "column11"},
+ new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING,
+ DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING});
+ assertEquals(resultTable.getDataSchema(), dataSchema);
+ List<Object[]> resultRows = resultTable.getRows();
+ int previousColumn1 = Integer.MIN_VALUE;
+ for (Object[] resultRow : resultRows) {
+ assertEquals(resultRow.length, 4);
+ assertEquals((String) resultRow[1], "gFuH");
+ assertTrue((Integer) resultRow[0] >= previousColumn1);
+ previousColumn1 = (Integer) resultRow[0];
+ }
+
+ Object[] firstRow = resultRows.get(0);
+ // Column 11
+ assertEquals((String) firstRow[3], "o");
+ }
+ {
+ // Query with literal only in SELECT
+ String query = "SELECT 'marvin' from testTable ORDER BY column1";
+ BrokerResponseNative brokerResponseNative = getBrokerResponse(query);
+ assertTrue(brokerResponseNative.getProcessingExceptions() == null
+ || brokerResponseNative.getProcessingExceptions().size() == 0);
+ ResultTable resultTable = brokerResponseNative.getResultTable();
+ assertEquals(brokerResponseNative.getNumRowsResultSet(), 10);
+ assertEquals(brokerResponseNative.getTotalDocs(), 120_000L);
+ assertEquals(brokerResponseNative.getNumDocsScanned(), 120_000L);
+ assertEquals(brokerResponseNative.getNumSegmentsProcessed(), 4L);
+ assertEquals(brokerResponseNative.getNumSegmentsMatched(), 4L);
+ assertEquals(brokerResponseNative.getNumEntriesScannedPostFilter(), 120_000L);
+ assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 0L);
+ assertNotNull(brokerResponseNative.getProcessingExceptions());
+ assertEquals(brokerResponseNative.getProcessingExceptions().size(), 0);
+ assertEquals(resultTable.getDataSchema(), new DataSchema(new String[]{"'marvin'"},
+ new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING}));
+ List<Object[]> resultRows = resultTable.getRows();
+ for (Object[] resultRow : resultRows) {
+ assertEquals(resultRow.length, 1);
+ assertEquals((String) resultRow[0], "marvin");
+ }
+ }
+ {
+ // Selection query with '<' filter on a forwardIndexDisabled column with range index available
+ String query = "SELECT column1, column5, column9, column11 FROM testTable WHERE column6 < 2147458029 AND "
+ + "column6 > 1699000 ORDER BY column1";
+ BrokerResponseNative brokerResponseNative = getBrokerResponse(query);
+ assertTrue(brokerResponseNative.getProcessingExceptions() == null
+ || brokerResponseNative.getProcessingExceptions().size() == 0);
+ ResultTable resultTable = brokerResponseNative.getResultTable();
+ assertEquals(brokerResponseNative.getNumRowsResultSet(), 10);
+ assertEquals(brokerResponseNative.getTotalDocs(), 120_000L);
+ assertEquals(brokerResponseNative.getNumDocsScanned(), 119_980L);
+ assertEquals(brokerResponseNative.getNumSegmentsProcessed(), 4L);
+ assertEquals(brokerResponseNative.getNumSegmentsMatched(), 4L);
+ assertEquals(brokerResponseNative.getNumEntriesScannedPostFilter(), 120_100L);
+ assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 0L);
+ assertNotNull(brokerResponseNative.getProcessingExceptions());
+ assertEquals(brokerResponseNative.getProcessingExceptions().size(), 0);
+ DataSchema dataSchema = new DataSchema(new String[]{"column1", "column5", "column9", "column11"},
+ new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING,
+ DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING});
+ assertEquals(resultTable.getDataSchema(), dataSchema);
+ List<Object[]> resultRows = resultTable.getRows();
+ int previousColumn1 = Integer.MIN_VALUE;
+ for (Object[] resultRow : resultRows) {
+ assertEquals(resultRow.length, 4);
+ assertEquals((String) resultRow[1], "gFuH");
+ assertTrue((Integer) resultRow[0] >= previousColumn1);
+ previousColumn1 = (Integer) resultRow[0];
+ }
+
+ Object[] firstRow = resultRows.get(0);
+ // Column 11
+ assertEquals((String) firstRow[3], "o");
+ }
+ {
+ // Selection query with '>=' filter on a forwardIndexDisabled column without range index available
+ String query = "SELECT column1, column5, column9, column11 FROM testTable WHERE column7 >= 676000";
+ try {
+ getBrokerResponse(query);
+ Assert.fail("Query should fail since forwardIndexDisabled on a range query column without range index");
+ } catch (IllegalStateException e) {
+ assertTrue(e.getMessage().contains("Forward index disabled for column:")
+ && e.getMessage().contains("scan based filtering not supported!"));
+ }
+ }
+ {
+ // Select query with a filter on a column which doesn't have forwardIndexDisabled enabled
+ String query = "SELECT column1, column5, column9 from testTable WHERE column9 < 50000 ORDER BY column1";
+ BrokerResponseNative brokerResponseNative = getBrokerResponse(query);
+ assertTrue(brokerResponseNative.getProcessingExceptions() == null
+ || brokerResponseNative.getProcessingExceptions().size() == 0);
+ ResultTable resultTable = brokerResponseNative.getResultTable();
+ assertEquals(brokerResponseNative.getNumRowsResultSet(), 4);
+ assertEquals(brokerResponseNative.getTotalDocs(), 120_000L);
+ assertEquals(brokerResponseNative.getNumDocsScanned(), 4L);
+ assertEquals(brokerResponseNative.getNumSegmentsProcessed(), 4L);
+ assertEquals(brokerResponseNative.getNumSegmentsMatched(), 4L);
+ assertEquals(brokerResponseNative.getNumEntriesScannedPostFilter(), 12L);
+ assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 120000L);
+ assertNotNull(brokerResponseNative.getProcessingExceptions());
+ assertEquals(brokerResponseNative.getProcessingExceptions().size(), 0);
+ assertEquals(resultTable.getDataSchema(), new DataSchema(new String[]{"column1", "column5", "column9"},
+ new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.INT
+ }));
+ List<Object[]> resultRows = resultTable.getRows();
+ for (Object[] resultRow : resultRows) {
+ assertEquals(resultRow.length, 3);
+ }
+ }
+ {
+ // Transform function on a filter clause for forwardIndexDisabled column in transform
+ String query = "SELECT column1, column11 from testTable WHERE CONCAT(column6, column9, '-') = '1689277-11270'";
+ BrokerResponseNative brokerResponseNative = getBrokerResponse(query);
+ assertTrue(brokerResponseNative.getProcessingExceptions() != null
+ && brokerResponseNative.getProcessingExceptions().size() > 0);
+ }
+ {
+ // Transform function on a filter clause for a non-forwardIndexDisabled column in transform
+ String query = "SELECT column1, column11 from testTable WHERE CONCAT(column5, column9, '-') = 'gFuH-11270' "
+ + "ORDER BY column1";
+ BrokerResponseNative brokerResponseNative = getBrokerResponse(query);
+ assertTrue(brokerResponseNative.getProcessingExceptions() == null
+ || brokerResponseNative.getProcessingExceptions().size() == 0);
+ ResultTable resultTable = brokerResponseNative.getResultTable();
+ assertEquals(brokerResponseNative.getNumRowsResultSet(), 4);
+ assertEquals(brokerResponseNative.getTotalDocs(), 120_000L);
+ assertEquals(brokerResponseNative.getNumDocsScanned(), 4L);
+ assertEquals(brokerResponseNative.getNumSegmentsProcessed(), 4L);
+ assertEquals(brokerResponseNative.getNumSegmentsMatched(), 4L);
+ assertEquals(brokerResponseNative.getNumEntriesScannedPostFilter(), 8L);
+ assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 120000L);
+ assertNotNull(brokerResponseNative.getProcessingExceptions());
+ assertEquals(brokerResponseNative.getProcessingExceptions().size(), 0);
+ assertEquals(resultTable.getDataSchema(), new DataSchema(new String[]{"column1", "column11"},
+ new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
+ }));
+ List<Object[]> resultRows = resultTable.getRows();
+ assertEquals(resultRows.size(), 4);
+ for (Object[] resultRow : resultRows) {
+ assertEquals(resultRow.length, 2);
+ assertEquals(resultRow[0], 815409257);
+ assertEquals(resultRow[1], "P");
+ }
+ }
+ {
+ // Transform function on a filter clause for forwardIndexDisabled column in transform
+ String query = "SELECT column1, column11 from testTable WHERE CONCAT(ADD(column6, column1), column9, '-') = "
+ + "'1689277-11270'";
+ BrokerResponseNative brokerResponseNative = getBrokerResponse(query);
+ assertTrue(brokerResponseNative.getProcessingExceptions() != null
+ && brokerResponseNative.getProcessingExceptions().size() > 0);
+ }
+ {
+ // Transform function on a filter clause for a non-forwardIndexDisabled column in transform
+ String query = "SELECT column1, column11 from testTable WHERE CONCAT(column5, ADD(column9, column1), '-') = "
+ + "'gFuH-2.96708164E8' ORDER BY column1";
+ BrokerResponseNative brokerResponseNative = getBrokerResponse(query);
+ assertTrue(brokerResponseNative.getProcessingExceptions() == null
+ || brokerResponseNative.getProcessingExceptions().size() == 0);
+ ResultTable resultTable = brokerResponseNative.getResultTable();
+ assertEquals(brokerResponseNative.getNumRowsResultSet(), 10);
+ assertEquals(brokerResponseNative.getTotalDocs(), 120_000L);
+ assertEquals(brokerResponseNative.getNumDocsScanned(), 28L);
+ assertEquals(brokerResponseNative.getNumSegmentsProcessed(), 4L);
+ assertEquals(brokerResponseNative.getNumSegmentsMatched(), 4L);
+ assertEquals(brokerResponseNative.getNumEntriesScannedPostFilter(), 56L);
+ assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 120000L);
+ assertNotNull(brokerResponseNative.getProcessingExceptions());
+ assertEquals(brokerResponseNative.getProcessingExceptions().size(), 0);
+ assertEquals(resultTable.getDataSchema(), new DataSchema(new String[]{"column1", "column11"},
+ new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
+ }));
+ List<Object[]> resultRows = resultTable.getRows();
+ assertEquals(resultRows.size(), 10);
+ for (Object[] resultRow : resultRows) {
+ assertEquals(resultRow.length, 2);
+ assertEquals(resultRow[0], 240528);
+ assertEquals(resultRow[1], "o");
+ }
+ }
+ }
+
+ @Test
+ public void testSelectWithDistinctQueries() {
+ {
+ // Select a mix of forwardIndexDisabled and non-forwardIndexDisabled columns with distinct
+ String query = "SELECT DISTINCT column1, column6, column9 FROM testTable LIMIT 10";
+ try {
+ getBrokerResponse(query);
+ Assert.fail("Query should fail since forwardIndexDisabled on a column in select distinct");
+ } catch (IllegalStateException e) {
+ assertTrue(e.getMessage().contains("Forward index disabled for column:")
+ && e.getMessage().contains("cannot create DataFetcher!"));
+ }
+ }
+ {
+ // Select non-forwardIndexDisabled columns with distinct
+ String query = "SELECT DISTINCT column1, column5, column9 FROM testTable ORDER BY column1 LIMIT 10";
+ BrokerResponseNative brokerResponseNative = getBrokerResponse(query);
+ assertTrue(brokerResponseNative.getProcessingExceptions() == null
+ || brokerResponseNative.getProcessingExceptions().size() == 0);
+ ResultTable resultTable = brokerResponseNative.getResultTable();
+ assertEquals(brokerResponseNative.getNumRowsResultSet(), 10);
+ assertEquals(brokerResponseNative.getTotalDocs(), 120_000L);
+ assertEquals(brokerResponseNative.getNumDocsScanned(), 120_000L);
+ assertEquals(brokerResponseNative.getNumSegmentsProcessed(), 4L);
+ assertEquals(brokerResponseNative.getNumSegmentsMatched(), 4L);
+ assertEquals(brokerResponseNative.getNumEntriesScannedPostFilter(), 360_000L);
+ assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 0L);
+ assertNotNull(brokerResponseNative.getProcessingExceptions());
+ assertEquals(brokerResponseNative.getProcessingExceptions().size(), 0);
+ assertEquals(resultTable.getDataSchema(), new DataSchema(new String[]{"column1", "column5", "column9"},
+ new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING,
+ DataSchema.ColumnDataType.INT}));
+ List<Object[]> resultRows = resultTable.getRows();
+ int previousColumn1 = Integer.MIN_VALUE;
+ for (Object[] resultRow : resultRows) {
+ assertEquals(resultRow.length, 3);
+ assertEquals(resultRow[1], "gFuH");
+ assertTrue((Integer) resultRow[0] >= previousColumn1);
+ previousColumn1 = (Integer) resultRow[0];
+ }
+ }
+ }
+
+ @Test
+ public void testSelectWithGroupByOrderByQueries() {
+ {
+ // Select a mix of forwardIndexDisabled and non-forwardIndexDisabled columns with group by order by
+ String query = "SELECT column1, column6 FROM testTable GROUP BY column1, column6 ORDER BY column1, column6 "
+ + " LIMIT 10";
+ try {
+ getBrokerResponse(query);
+ Assert.fail("Query should fail since forwardIndexDisabled on a column in select group by order by");
+ } catch (IllegalStateException e) {
+ assertTrue(e.getMessage().contains("Forward index disabled for column:")
+ && e.getMessage().contains("cannot create DataFetcher!"));
+ }
+ }
+ {
+ // Select forwardIndexDisabled columns with group by order by
+ String query = "SELECT column7, column6 FROM testTable GROUP BY column7, column6 ORDER BY column7, column6 "
+ + " LIMIT 10";
+ try {
+ getBrokerResponse(query);
+ Assert.fail("Query should fail since forwardIndexDisabled on a column in select group by order by");
+ } catch (IllegalStateException e) {
+ assertTrue(e.getMessage().contains("Forward index disabled for column:")
+ && e.getMessage().contains("cannot create DataFetcher!"));
+ }
+ }
+ {
+ // Select non-forwardIndexDisabled columns with group by order by
+ String query = "SELECT column1, column5 FROM testTable GROUP BY column1, column5 ORDER BY column1, column5 "
+ + " LIMIT 10";
+ BrokerResponseNative brokerResponseNative = getBrokerResponse(query);
+ assertTrue(brokerResponseNative.getProcessingExceptions() == null
+ || brokerResponseNative.getProcessingExceptions().size() == 0);
+ ResultTable resultTable = brokerResponseNative.getResultTable();
+ assertEquals(brokerResponseNative.getNumRowsResultSet(), 10);
+ assertEquals(brokerResponseNative.getTotalDocs(), 120_000L);
+ assertEquals(brokerResponseNative.getNumDocsScanned(), 120000L);
+ assertEquals(brokerResponseNative.getNumSegmentsProcessed(), 4L);
+ assertEquals(brokerResponseNative.getNumSegmentsMatched(), 4L);
+ assertEquals(brokerResponseNative.getNumEntriesScannedPostFilter(), 240000L);
+ assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 0L);
+ assertNotNull(brokerResponseNative.getProcessingExceptions());
+ assertEquals(brokerResponseNative.getProcessingExceptions().size(), 0);
+ assertEquals(resultTable.getDataSchema(), new DataSchema(new String[]{"column1", "column5"},
+ new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING}));
+ List<Object[]> resultRows = resultTable.getRows();
+ int previousVal = -1;
+ for (Object[] resultRow : resultRows) {
+ assertEquals(resultRow.length, 2);
+ assertEquals(resultRow[1], "gFuH");
+ assertTrue((int) resultRow[0] > previousVal);
+ previousVal = (int) resultRow[0];
+ }
+ }
+ {
+ // Select forwardIndexDisabled columns using transform with group by order by
+ String query = "SELECT CONCAT(column1, column6, '-') FROM testTable GROUP BY CONCAT(column1, column6, '-') "
+ + "ORDER BY CONCAT(column1, column6, '-') LIMIT 10";
+ try {
+ getBrokerResponse(query);
+ Assert.fail("Query should fail since forwardIndexDisabled on a transformed column in group by order by");
+ } catch (IllegalStateException e) {
+ assertTrue(e.getMessage().contains("Forward index disabled for column:")
+ && e.getMessage().contains("cannot create DataFetcher!"));
+ }
+ }
+ {
+ // Select non-forwardIndexDisabled columns using transform with group by order by
+ String query = "SELECT CONCAT(column1, column5, '-') FROM testTable GROUP BY CONCAT(column1, column5, '-') "
+ + "ORDER BY CONCAT(column1, column5, '-') LIMIT 10";
+ BrokerResponseNative brokerResponseNative = getBrokerResponse(query);
+ assertTrue(brokerResponseNative.getProcessingExceptions() == null
+ || brokerResponseNative.getProcessingExceptions().size() == 0);
+ ResultTable resultTable = brokerResponseNative.getResultTable();
+ assertEquals(brokerResponseNative.getNumRowsResultSet(), 10);
+ assertEquals(brokerResponseNative.getTotalDocs(), 120_000L);
+ assertEquals(brokerResponseNative.getNumDocsScanned(), 120000L);
+ assertEquals(brokerResponseNative.getNumSegmentsProcessed(), 4L);
+ assertEquals(brokerResponseNative.getNumSegmentsMatched(), 4L);
+ assertEquals(brokerResponseNative.getNumEntriesScannedPostFilter(), 240000L);
+ assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 0L);
+ assertNotNull(brokerResponseNative.getProcessingExceptions());
+ assertEquals(brokerResponseNative.getProcessingExceptions().size(), 0);
+ assertEquals(resultTable.getDataSchema(), new DataSchema(new String[]{"concat(column1,column5,'-')"},
+ new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING}));
+ List<Object[]> resultRows = resultTable.getRows();
+ for (Object[] resultRow : resultRows) {
+ assertEquals(resultRow.length, 1);
+ assertTrue(resultRow[0].toString().endsWith("-gFuH"));
+ }
+ }
+ {
+ // Select forwardIndexDisabled columns using nested transform with group by order by
+ String query = "SELECT CONCAT(ADD(column1, column6), column5, '-') FROM testTable GROUP BY "
+ + "CONCAT(ADD(column1, column6), column5, '-') ORDER BY CONCAT(ADD(column1, column6), column5, '-') LIMIT 10";
+ try {
+ getBrokerResponse(query);
+ Assert.fail("Query should fail for forwardIndexDisabled on a nested transformed column in group by order by");
+ } catch (IllegalStateException e) {
+ assertTrue(e.getMessage().contains("Forward index disabled for column:")
+ && e.getMessage().contains("cannot create DataFetcher!"));
+ }
+ }
+ {
+ // Select non-forwardIndexDisabled columns using nested transform with group by order by
+ String query = "SELECT CONCAT(ADD(column1, column9), column5, '-') FROM testTable GROUP BY "
+ + "CONCAT(ADD(column1, column9), column5, '-') ORDER BY CONCAT(ADD(column1, column9), column5, '-') LIMIT 10";
+ BrokerResponseNative brokerResponseNative = getBrokerResponse(query);
+ assertTrue(brokerResponseNative.getProcessingExceptions() == null
+ || brokerResponseNative.getProcessingExceptions().size() == 0);
+ ResultTable resultTable = brokerResponseNative.getResultTable();
+ assertEquals(brokerResponseNative.getNumRowsResultSet(), 10);
+ assertEquals(brokerResponseNative.getTotalDocs(), 120_000L);
+ assertEquals(brokerResponseNative.getNumDocsScanned(), 120000L);
+ assertEquals(brokerResponseNative.getNumSegmentsProcessed(), 4L);
+ assertEquals(brokerResponseNative.getNumSegmentsMatched(), 4L);
+ assertEquals(brokerResponseNative.getNumEntriesScannedPostFilter(), 360000L);
+ assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 0L);
+ assertNotNull(brokerResponseNative.getProcessingExceptions());
+ assertEquals(brokerResponseNative.getProcessingExceptions().size(), 0);
+ assertEquals(resultTable.getDataSchema(), new DataSchema(new String[]{"concat(add(column1,column9),column5,'-')"},
+ new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING}));
+ List<Object[]> resultRows = resultTable.getRows();
+ for (Object[] resultRow : resultRows) {
+ assertEquals(resultRow.length, 1);
+ assertTrue(resultRow[0].toString().endsWith("-gFuH"));
+ }
+ }
+ }
+
+ @Test
+ public void testSelectWithAggregationQueries() {
+ {
+ // Allowed aggregation functions on forwardIndexDisabled columns
+ String query = "SELECT max(column7), min(column7), count(column7), minmaxrange(column7), "
+ + "distinctcount(column7), distinctcounthll(column6), distinctcountrawhll(column7), "
+ + "distinctcountsmarthll(column6) from testTable";
+ BrokerResponseNative brokerResponseNative = getBrokerResponse(query);
+ assertTrue(brokerResponseNative.getProcessingExceptions() == null
+ || brokerResponseNative.getProcessingExceptions().size() == 0);
+ ResultTable resultTable = brokerResponseNative.getResultTable();
+ assertEquals(brokerResponseNative.getNumRowsResultSet(), 1);
+ assertEquals(brokerResponseNative.getTotalDocs(), 120_000L);
+ assertEquals(brokerResponseNative.getNumDocsScanned(), 120_000L);
+ assertEquals(brokerResponseNative.getNumSegmentsProcessed(), 4L);
+ assertEquals(brokerResponseNative.getNumSegmentsMatched(), 4L);
+ assertEquals(brokerResponseNative.getNumEntriesScannedPostFilter(), 0L);
+ assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 0L);
+ assertNotNull(brokerResponseNative.getProcessingExceptions());
+ assertEquals(brokerResponseNative.getProcessingExceptions().size(), 0);
+ assertEquals(resultTable.getDataSchema(), new DataSchema(new String[]{"max(column7)", "min(column7)", "count(*)",
+ "minmaxrange(column7)", "distinctcount(column7)", "distinctcounthll(column6)", "distinctcountrawhll(column7)",
+ "distinctcountsmarthll(column6)"},
+ new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE,
+ DataSchema.ColumnDataType.LONG, DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.INT,
+ DataSchema.ColumnDataType.LONG, DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.INT}));
+ List<Object[]> resultRows = resultTable.getRows();
+ for (Object[] resultRow : resultRows) {
+ assertEquals(resultRow.length, 8);
+ assertEquals(resultRow[0], 2.137685743E9);
+ assertEquals(resultRow[1], 675695.0);
+ assertEquals(resultRow[2], 120000L);
+ assertEquals(resultRow[3], 2.137010048E9);
+ assertEquals(resultRow[4], 146);
+ assertEquals(resultRow[5], 695L);
+ assertEquals(resultRow[7], 608);
+ }
+ }
+ {
+ // Not allowed aggregation functions on forwardIndexDisabled columns
+ String query = "SELECT sum(column7), avg(column6) from testTable";
+ try {
+ getBrokerResponse(query);
+ Assert.fail("Query should fail since forwardIndexDisabled on a column in unsupported aggregation query");
+ } catch (IllegalStateException e) {
+ assertTrue(e.getMessage().contains("Forward index disabled for column:")
+ && e.getMessage().contains("cannot create DataFetcher!"));
+ }
+ }
+ {
+ // Allowed aggregation functions on forwardIndexDisabled columns with group by on non-forwardIndexDisabled
+ // column
+ String query = "SELECT column1, max(column6) from testTable GROUP BY column1";
+ try {
+ getBrokerResponse(query);
+ Assert.fail("Query should fail since forwardIndexDisabled on a column in unsupported aggregation query");
+ } catch (IllegalStateException e) {
+ assertTrue(e.getMessage().contains("Forward index disabled for column:")
+ && e.getMessage().contains("cannot create DataFetcher!"));
+ }
+ }
+ {
+ // Allowed aggregation functions on forwardIndexDisabled columns with group by order by on
+ // non-forwardIndexDisabled column
+ String query = "SELECT column1, max(column6) from testTable GROUP BY column1 ORDER BY column1";
+ try {
+ getBrokerResponse(query);
+ Assert.fail("Query should fail since forwardIndexDisabled on a column in unsupported aggregation query");
+ } catch (IllegalStateException e) {
+ assertTrue(e.getMessage().contains("Forward index disabled for column:")
+ && e.getMessage().contains("cannot create DataFetcher!"));
+ }
+ }
+ {
+ // Allowed aggregation functions on non-forwardIndexDisabled columns with group by on non-forwardIndexDisabled
+ // column but order by on allowed aggregation function on forwardIndexDisabled column
+ String query = "SELECT column1, max(column9) from testTable GROUP BY column1 ORDER BY min(column6)";
+ try {
+ getBrokerResponse(query);
+ Assert.fail("Query should fail since forwardIndexDisabled on a column in unsupported aggregation query");
+ } catch (IllegalStateException e) {
+ assertTrue(e.getMessage().contains("Forward index disabled for column:")
+ && e.getMessage().contains("cannot create DataFetcher!"));
+ }
+ }
+ {
+ // Allowed aggregation functions on forwardIndexDisabled columns with a filter - results in trying to scan which
+ // fails
+ String query = "SELECT max(column7), min(column6) from testTable WHERE column7 = 675695";
+ try {
+ getBrokerResponse(query);
+ Assert.fail("Query should fail since forwardIndexDisabled on a column in unsupported aggregation query");
+ } catch (IllegalStateException e) {
+ assertTrue(e.getMessage().contains("Forward index disabled for column:")
+ && e.getMessage().contains("cannot create DataFetcher!"));
+ }
+ }
+ {
+ // Allowed aggregation functions on forwardIndexDisabled columns with a filter - results in trying to scan which
+ // fails
+ String query = "SELECT max(column1), min(column6) from testTable WHERE column1 > 675695";
+ try {
+ getBrokerResponse(query);
+ Assert.fail("Query should fail since forwardIndexDisabled on a column in unsupported aggregation query");
+ } catch (IllegalStateException e) {
+ assertTrue(e.getMessage().contains("Forward index disabled for column:")
+ && e.getMessage().contains("cannot create DataFetcher!"));
+ }
+ }
+ {
+ // Allowed aggregation functions on non-forwardIndexDisabled columns with a filter on a forwardIndexDisabled
+ // column
+ String query = "SELECT max(column1), sum(column9) from testTable WHERE column7 = 675695";
+ BrokerResponseNative brokerResponseNative = getBrokerResponse(query);
+ assertTrue(brokerResponseNative.getProcessingExceptions() == null
+ || brokerResponseNative.getProcessingExceptions().size() == 0);
+ ResultTable resultTable = brokerResponseNative.getResultTable();
+ assertEquals(brokerResponseNative.getNumRowsResultSet(), 1);
+ assertEquals(brokerResponseNative.getTotalDocs(), 120_000L);
+ assertEquals(brokerResponseNative.getNumDocsScanned(), 92L);
+ assertEquals(brokerResponseNative.getNumSegmentsProcessed(), 4L);
+ assertEquals(brokerResponseNative.getNumSegmentsMatched(), 4L);
+ assertEquals(brokerResponseNative.getNumEntriesScannedPostFilter(), 184L);
+ assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 0L);
+ assertNotNull(brokerResponseNative.getProcessingExceptions());
+ assertEquals(brokerResponseNative.getProcessingExceptions().size(), 0);
+ assertEquals(resultTable.getDataSchema(), new DataSchema(new String[]{"max(column1)", "sum(column9)"},
+ new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE}));
+ List<Object[]> resultRows = resultTable.getRows();
+ for (Object[] resultRow : resultRows) {
+ assertEquals(resultRow.length, 2);
+ assertEquals(resultRow[0], 2.106334109E9);
+ assertEquals(resultRow[1], 5.7037961104E10);
+ }
+ }
+ {
+ // Allowed aggregation functions on non-forwardIndexDisabled columns with a filter on a forwardIndexDisabled
+ // column and group by order by on non-forwardIndexDisabled column
+ String query = "SELECT column1, max(column1), sum(column9) from testTable WHERE column7 = 675695 GROUP BY "
+ + "column1 ORDER BY column1";
+ BrokerResponseNative brokerResponseNative = getBrokerResponse(query);
+ assertTrue(brokerResponseNative.getProcessingExceptions() == null
+ || brokerResponseNative.getProcessingExceptions().size() == 0);
+ ResultTable resultTable = brokerResponseNative.getResultTable();
+ assertEquals(brokerResponseNative.getNumRowsResultSet(), 10);
+ assertEquals(brokerResponseNative.getTotalDocs(), 120_000L);
+ assertEquals(brokerResponseNative.getNumDocsScanned(), 92L);
+ assertEquals(brokerResponseNative.getNumSegmentsProcessed(), 4L);
+ assertEquals(brokerResponseNative.getNumSegmentsMatched(), 4L);
+ assertEquals(brokerResponseNative.getNumEntriesScannedPostFilter(), 184L);
+ assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 0L);
+ assertNotNull(brokerResponseNative.getProcessingExceptions());
+ assertEquals(brokerResponseNative.getProcessingExceptions().size(), 0);
+ assertEquals(resultTable.getDataSchema(), new DataSchema(new String[]{"column1", "max(column1)", "sum(column9)"},
+ new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.DOUBLE,
+ DataSchema.ColumnDataType.DOUBLE}));
+ List<Object[]> resultRows = resultTable.getRows();
+ int previousVal = -1;
+ for (Object[] resultRow : resultRows) {
+ assertEquals(resultRow.length, 3);
+ assertTrue((int) resultRow[0] > previousVal);
+ previousVal = (int) resultRow[0];
+ }
+ }
+ {
+ // Allowed aggregation functions on non-forwardIndexDisabled columns with a filter on a forwardIndexDisabled
+ // column and group by on non-forwardIndexDisabled column order by on forwardIndexDisabled aggregation column
+ String query = "SELECT column1, max(column1), sum(column9) from testTable WHERE column7 = 675695 GROUP BY "
+ + "column1 ORDER BY max(column6)";
+ try {
+ getBrokerResponse(query);
+ Assert.fail("Query should fail since forwardIndexDisabled on a column in unsupported aggregation query");
+ } catch (IllegalStateException e) {
+ assertTrue(e.getMessage().contains("Forward index disabled for column:")
+ && e.getMessage().contains("cannot create DataFetcher!"));
+ }
+ }
+ {
+ // Transform inside aggregation involving a forwardIndexDisabled column
+ String query = "SELECT MAX(ADD(column6, column9)) from testTable LIMIT 10";
+ try {
+ getBrokerResponse(query);
+ Assert.fail("Query should fail since forwardIndexDisabled on a column in unsupported aggregation query");
+ } catch (IllegalStateException e) {
+ assertTrue(e.getMessage().contains("Forward index disabled for column:")
+ && e.getMessage().contains("cannot create DataFetcher!"));
+ }
+ }
+ {
+ // Transform inside aggregation not involving any forwardIndexDisabled columns
+ String query = "SELECT MAX(ADD(column1, column9)) from testTable LIMIT 10";
+ BrokerResponseNative brokerResponseNative = getBrokerResponse(query);
+ assertTrue(brokerResponseNative.getProcessingExceptions() == null
+ || brokerResponseNative.getProcessingExceptions().size() == 0);
+ ResultTable resultTable = brokerResponseNative.getResultTable();
+ assertEquals(brokerResponseNative.getNumRowsResultSet(), 1);
+ assertEquals(brokerResponseNative.getTotalDocs(), 120_000L);
+ assertEquals(brokerResponseNative.getNumDocsScanned(), 120_000L);
+ assertEquals(brokerResponseNative.getNumSegmentsProcessed(), 4L);
+ assertEquals(brokerResponseNative.getNumSegmentsMatched(), 4L);
+ assertEquals(brokerResponseNative.getNumEntriesScannedPostFilter(), 240000L);
+ assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 0L);
+ assertNotNull(brokerResponseNative.getProcessingExceptions());
+ assertEquals(brokerResponseNative.getProcessingExceptions().size(), 0);
+ assertEquals(resultTable.getDataSchema(), new DataSchema(new String[]{"max(add(column1,column9))"},
+ new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.DOUBLE}));
+ List<Object[]> resultRows = resultTable.getRows();
+ for (Object[] resultRow : resultRows) {
+ assertEquals(resultRow.length, 1);
+ assertEquals(resultRow[0], 4.264013718E9);
+ }
+ }
+ {
+ // Transform inside aggregation involving a forwardIndexDisabled column with group by
+ String query = "SELECT column1, MAX(ADD(column6, column9)) from testTable GROUP BY column1 LIMIT 10";
+ try {
+ getBrokerResponse(query);
+ Assert.fail("Query should fail since forwardIndexDisabled on a column in unsupported aggregation query");
+ } catch (IllegalStateException e) {
+ assertTrue(e.getMessage().contains("Forward index disabled for column:")
+ && e.getMessage().contains("cannot create DataFetcher!"));
+ }
+ }
+ {
+ // Transform inside aggregation not involving any forwardIndexDisabled column with group by
+ String query = "SELECT column1, MAX(ADD(column1, column9)) from testTable GROUP BY column1 LIMIT 10";
+ BrokerResponseNative brokerResponseNative = getBrokerResponse(query);
+ assertTrue(brokerResponseNative.getProcessingExceptions() == null
+ || brokerResponseNative.getProcessingExceptions().size() == 0);
+ ResultTable resultTable = brokerResponseNative.getResultTable();
+ assertEquals(brokerResponseNative.getNumRowsResultSet(), 10);
+ assertEquals(brokerResponseNative.getTotalDocs(), 120_000L);
+ assertEquals(brokerResponseNative.getNumDocsScanned(), 120_000L);
+ assertEquals(brokerResponseNative.getNumSegmentsProcessed(), 4L);
+ assertEquals(brokerResponseNative.getNumSegmentsMatched(), 4L);
+ assertEquals(brokerResponseNative.getNumEntriesScannedPostFilter(), 240000L);
+ assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 0L);
+ assertNotNull(brokerResponseNative.getProcessingExceptions());
+ assertEquals(brokerResponseNative.getProcessingExceptions().size(), 0);
+ assertEquals(resultTable.getDataSchema(), new DataSchema(new String[]{"column1", "max(add(column1,column9))"},
+ new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.DOUBLE}));
+ List<Object[]> resultRows = resultTable.getRows();
+ for (Object[] resultRow : resultRows) {
+ assertEquals(resultRow.length, 2);
+ }
+ }
+ {
+ // Transform inside aggregation involving a forwardIndexDisabled column with group by order by
+ String query = "SELECT column1, MAX(ADD(column6, column9)) from testTable GROUP BY column1 ORDER BY column1 "
+ + "DESC LIMIT 10";
+ try {
+ getBrokerResponse(query);
+ Assert.fail("Query should fail since forwardIndexDisabled on a column in unsupported aggregation query");
+ } catch (IllegalStateException e) {
+ assertTrue(e.getMessage().contains("Forward index disabled for column:")
+ && e.getMessage().contains("cannot create DataFetcher!"));
+ }
+ }
+ {
+ // Transform inside aggregation not involving any forwardIndexDisabled column with group by order by
+ String query = "SELECT column1, MAX(ADD(column1, column9)) from testTable GROUP BY column1 ORDER BY column1 "
+ + "DESC LIMIT 10";
+ BrokerResponseNative brokerResponseNative = getBrokerResponse(query);
+ assertTrue(brokerResponseNative.getProcessingExceptions() == null
+ || brokerResponseNative.getProcessingExceptions().size() == 0);
+ ResultTable resultTable = brokerResponseNative.getResultTable();
+ assertEquals(brokerResponseNative.getNumRowsResultSet(), 10);
+ assertEquals(brokerResponseNative.getTotalDocs(), 120_000L);
+ assertEquals(brokerResponseNative.getNumDocsScanned(), 120_000L);
+ assertEquals(brokerResponseNative.getNumSegmentsProcessed(), 4L);
+ assertEquals(brokerResponseNative.getNumSegmentsMatched(), 4L);
+ assertEquals(brokerResponseNative.getNumEntriesScannedPostFilter(), 240000L);
+ assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 0L);
+ assertNotNull(brokerResponseNative.getProcessingExceptions());
+ assertEquals(brokerResponseNative.getProcessingExceptions().size(), 0);
+ assertEquals(resultTable.getDataSchema(), new DataSchema(new String[]{"column1", "max(add(column1,column9))"},
+ new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.DOUBLE}));
+ List<Object[]> resultRows = resultTable.getRows();
+ int previousVal = Integer.MAX_VALUE;
+ for (Object[] resultRow : resultRows) {
+ assertEquals(resultRow.length, 2);
+ assertTrue((int) resultRow[0] <= previousVal);
+ previousVal = (int) resultRow[0];
+ }
+ }
+ }
+
+ @Test
+ public void testSelectWithAggregationGroupByHaving() {
+ {
+ // forwardIndexDisabled column used in HAVING clause
+ String query = "SELECT min(column7), max(column6) from testTable GROUP BY column1 HAVING min(column7) > 675695 "
+ + "ORDER BY column1 LIMIT 10";
+ try {
+ getBrokerResponse(query);
+ Assert.fail("Query should fail since forwardIndexDisabled on a column in HAVING clause");
+ } catch (IllegalStateException e) {
+ assertTrue(e.getMessage().contains("Forward index disabled for column:")
+ && e.getMessage().contains("cannot create DataFetcher!"));
+ }
+ }
+ {
+ // forwardIndexDisabled column not used in HAVING clause but used in aggregation select
+ String query = "SELECT max(column6), min(column9) from testTable GROUP BY column1 HAVING min(column9) > 11270 "
+ + "ORDER BY column1 LIMIT 10";
+ try {
+ getBrokerResponse(query);
+ Assert.fail("Query should fail since forwardIndexDisabled on a column in HAVING clause");
+ } catch (IllegalStateException e) {
+ assertTrue(e.getMessage().contains("Forward index disabled for column:")
+ && e.getMessage().contains("cannot create DataFetcher!"));
+ }
+ }
+ {
+ // forwardIndexDisabled column not used in HAVING clause or aggregation select
+ String query = "SELECT min(column9), column1 from testTable GROUP BY column1, column9 HAVING min(column9) "
+ + "> 11270 ORDER BY column9 DESC LIMIT 10";
+ BrokerResponseNative brokerResponseNative = getBrokerResponse(query);
+ assertTrue(brokerResponseNative.getProcessingExceptions() == null
+ || brokerResponseNative.getProcessingExceptions().size() == 0);
+ ResultTable resultTable = brokerResponseNative.getResultTable();
+ assertEquals(brokerResponseNative.getNumRowsResultSet(), 10);
+ assertEquals(brokerResponseNative.getTotalDocs(), 120_000L);
+ assertEquals(brokerResponseNative.getNumDocsScanned(), 120_000L);
+ assertEquals(brokerResponseNative.getNumSegmentsProcessed(), 4L);
+ assertEquals(brokerResponseNative.getNumSegmentsMatched(), 4L);
+ assertEquals(brokerResponseNative.getNumEntriesScannedPostFilter(), 240000L);
+ assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 0L);
+ assertNotNull(brokerResponseNative.getProcessingExceptions());
+ assertEquals(brokerResponseNative.getProcessingExceptions().size(), 0);
+ assertEquals(resultTable.getDataSchema(), new DataSchema(new String[]{"min(column9)", "column1"},
+ new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.INT}));
+ List<Object[]> resultRows = resultTable.getRows();
+ double previousVal = Double.MAX_VALUE;
+ for (Object[] resultRow : resultRows) {
+ assertEquals(resultRow.length, 2);
+ assertTrue((double) resultRow[0] <= previousVal);
+ previousVal = (double) resultRow[0];
+ }
+ }
+ }
+}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/stats/MutableColumnStatistics.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/stats/MutableColumnStatistics.java
index 24ed15aa4c..cff6260b99 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/stats/MutableColumnStatistics.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/stats/MutableColumnStatistics.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.segment.local.realtime.converter.stats;
+import com.google.common.base.Preconditions;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Set;
@@ -145,6 +146,8 @@ public class MutableColumnStatistics implements ColumnStatistics {
// Iterate over all data to figure out whether or not it's in sorted order
MutableForwardIndex mutableForwardIndex = (MutableForwardIndex) _dataSource.getForwardIndex();
+ Preconditions.checkState(mutableForwardIndex != null,
+ String.format("Forward index should not be null for column: %s", dataSourceMetadata.getFieldSpec().getName()));
int numDocs = dataSourceMetadata.getNumDocs();
// Iterate with the sorted order if provided
if (_sortedDocIdIterationOrder != null) {
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/stats/MutableNoDictionaryColStatistics.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/stats/MutableNoDictionaryColStatistics.java
index 55c714224e..c4a3c910b0 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/stats/MutableNoDictionaryColStatistics.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/stats/MutableNoDictionaryColStatistics.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.segment.local.realtime.converter.stats;
+import com.google.common.base.Preconditions;
import java.util.Map;
import java.util.Set;
import org.apache.pinot.segment.spi.creator.ColumnStatistics;
@@ -36,6 +37,8 @@ public class MutableNoDictionaryColStatistics implements ColumnStatistics {
public MutableNoDictionaryColStatistics(DataSource dataSource) {
_dataSourceMetadata = dataSource.getDataSourceMetadata();
_forwardIndex = (MutableForwardIndex) dataSource.getForwardIndex();
+ Preconditions.checkState(_forwardIndex != null,
+ String.format("Forward index should not be null for column: %s", _dataSourceMetadata.getFieldSpec().getName()));
}
@Override
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/DefaultIndexCreatorProvider.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/DefaultIndexCreatorProvider.java
index 7cb36a4db6..f42966fcc9 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/DefaultIndexCreatorProvider.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/DefaultIndexCreatorProvider.java
@@ -28,6 +28,7 @@ import org.apache.pinot.segment.local.segment.creator.impl.bloom.OnHeapGuavaBloo
import org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueFixedByteRawIndexCreator;
import org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueUnsortedForwardIndexCreator;
import org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueVarByteRawIndexCreator;
+import org.apache.pinot.segment.local.segment.creator.impl.fwd.NoOpForwardIndexCreator;
import org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueFixedByteRawIndexCreator;
import org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueSortedForwardIndexCreator;
import org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueUnsortedForwardIndexCreator;
@@ -74,6 +75,7 @@ public final class DefaultIndexCreatorProvider implements IndexCreatorProvider {
public ForwardIndexCreator newForwardIndexCreator(IndexCreationContext.Forward context)
throws Exception {
if (!context.hasDictionary()) {
+ // Dictionary disabled columns
boolean deriveNumDocsPerChunk =
shouldDeriveNumDocsPerChunk(context.getFieldSpec().getName(), context.getColumnProperties());
int writerVersion = getRawIndexWriterVersion(context.getFieldSpec().getName(), context.getColumnProperties());
@@ -88,17 +90,25 @@ public final class DefaultIndexCreatorProvider implements IndexCreatorProvider {
context.getMaxRowLengthInBytes());
}
} else {
- if (context.getFieldSpec().isSingleValueField()) {
- if (context.isSorted()) {
- return new SingleValueSortedForwardIndexCreator(context.getIndexDir(), context.getFieldSpec().getName(),
- context.getCardinality());
+ // Dictionary enabled columns
+ if (context.forwardIndexDisabled() && !context.isSorted()) {
+ // Forward index disabled columns which aren't sorted
+ // Sorted columns treat this option as a no-op
+ return new NoOpForwardIndexCreator(context.getFieldSpec().isSingleValueField());
+ } else {
+ // Forward index enabled columns
+ if (context.getFieldSpec().isSingleValueField()) {
+ if (context.isSorted()) {
+ return new SingleValueSortedForwardIndexCreator(context.getIndexDir(), context.getFieldSpec().getName(),
+ context.getCardinality());
+ } else {
+ return new SingleValueUnsortedForwardIndexCreator(context.getIndexDir(), context.getFieldSpec().getName(),
+ context.getCardinality(), context.getTotalDocs());
+ }
} else {
- return new SingleValueUnsortedForwardIndexCreator(context.getIndexDir(), context.getFieldSpec().getName(),
- context.getCardinality(), context.getTotalDocs());
+ return new MultiValueUnsortedForwardIndexCreator(context.getIndexDir(), context.getFieldSpec().getName(),
+ context.getCardinality(), context.getTotalDocs(), context.getTotalNumberOfEntries());
}
- } else {
- return new MultiValueUnsortedForwardIndexCreator(context.getIndexDir(), context.getFieldSpec().getName(),
- context.getCardinality(), context.getTotalDocs(), context.getTotalNumberOfEntries());
}
}
}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
index d27b33e7a5..ba6f668e36 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
@@ -35,6 +35,7 @@ import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.pinot.common.utils.FileUtils;
import org.apache.pinot.segment.local.io.util.PinotDataBitSet;
+import org.apache.pinot.segment.local.segment.creator.impl.inv.BitSlicedRangeIndexCreator;
import org.apache.pinot.segment.local.segment.creator.impl.nullvalue.NullValueVectorCreator;
import org.apache.pinot.segment.local.segment.store.TextIndexUtils;
import org.apache.pinot.segment.local.utils.GeometrySerializer;
@@ -174,6 +175,13 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
"Cannot create json index for column: %s because it is not in schema", columnName);
}
+ Set<String> forwardIndexDisabledColumns = new HashSet<>();
+ for (String columnName : _config.getForwardIndexDisabledColumns()) {
+ Preconditions.checkState(schema.hasColumn(columnName), String.format("Invalid config. Can't disable "
+ + "forward index creation for a column: %s that does not exist in schema", columnName));
+ forwardIndexDisabledColumns.add(columnName);
+ }
+
Map<String, H3IndexConfig> h3IndexConfigs = _config.getH3IndexConfigs();
for (String columnName : h3IndexConfigs.keySet()) {
Preconditions.checkState(schema.hasColumn(columnName),
@@ -196,6 +204,10 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
Preconditions.checkState(dictEnabledColumn || !invertedIndexColumns.contains(columnName),
"Cannot create inverted index for raw index column: %s", columnName);
+ boolean forwardIndexDisabled = forwardIndexDisabledColumns.contains(columnName);
+ validateForwardIndexDisabledIndexCompatibility(columnName, forwardIndexDisabled, dictEnabledColumn,
+ columnIndexCreationInfo, invertedIndexColumns, rangeIndexColumns, rangeIndexVersion, fieldSpec);
+
IndexCreationContext.Common context = IndexCreationContext.builder()
.withIndexDir(_indexDir)
.withCardinality(columnIndexCreationInfo.getDistinctValueCount())
@@ -208,6 +220,7 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
.withColumnIndexCreationInfo(columnIndexCreationInfo)
.sorted(columnIndexCreationInfo.isSorted())
.onHeap(segmentCreationSpec.isOnHeap())
+ .withforwardIndexDisabled(forwardIndexDisabled)
.build();
// Initialize forward index creator
ChunkCompressionType chunkCompressionType =
@@ -298,6 +311,45 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
}
}
+ /**
+ * Validates the compatibility of the indexes if the column has the forward index disabled. Throws exceptions due to
+ * compatibility mismatch. The checks performed are:
+ * - Validate dictionary is enabled.
+ * - Validate inverted index is enabled.
+ * - Validate that either no range index exists for column or the range index version is at least 2 and isn't a
+ * multi-value column (since multi-value defaults to index v1).
+ *
+ * @param columnName Name of the column
+ * @param forwardIndexDisabled Whether the forward index is disabled for column or not
+ * @param dictEnabledColumn Whether the column is dictionary enabled or not
+ * @param columnIndexCreationInfo Column index creation info
+ * @param invertedIndexColumns Set of columns with inverted index enabled
+ * @param rangeIndexColumns Set of columns with range index enabled
+ * @param rangeIndexVersion Range index version
+ * @param fieldSpec FieldSpec of column
+ */
+ private void validateForwardIndexDisabledIndexCompatibility(String columnName, boolean forwardIndexDisabled,
+ boolean dictEnabledColumn, ColumnIndexCreationInfo columnIndexCreationInfo, Set<String> invertedIndexColumns,
+ Set<String> rangeIndexColumns, int rangeIndexVersion, FieldSpec fieldSpec) {
+ if (!forwardIndexDisabled) {
+ return;
+ }
+
+ Preconditions.checkState(dictEnabledColumn,
+ String.format("Cannot disable forward index for column %s without dictionary", columnName));
+ Preconditions.checkState(invertedIndexColumns.contains(columnName),
+ String.format("Cannot disable forward index for column %s without inverted index enabled", columnName));
+ if (rangeIndexColumns.contains(columnName)) {
+ Preconditions.checkState(fieldSpec.isSingleValueField(),
+ String.format("Feature not supported for multi-value columns with range index. Cannot disable forward index "
+ + "for column %s. Disable range index on this column to use this feature", columnName));
+ Preconditions.checkState(rangeIndexVersion == BitSlicedRangeIndexCreator.VERSION,
+ String.format("Feature not supported for single-value columns with range index version < 2. Cannot disable "
+ + "forward index for column %s. Either disable range index or create range index with version >= 2 to "
+ + "use this feature", columnName));
+ }
+ }
+
/**
* Returns true if dictionary should be created for a column, false otherwise.
* Currently there are two sources for this config:
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/NoOpForwardIndexCreator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/NoOpForwardIndexCreator.java
new file mode 100644
index 0000000000..bc990ac616
--- /dev/null
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/NoOpForwardIndexCreator.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.creator.impl.fwd;
+
+import java.io.IOException;
+import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator;
+import org.apache.pinot.spi.data.FieldSpec;
+
+
+/**
+ * Forward index creator for dictionary-encoded single and multi-value columns with forward index disabled.
+ * This is a no-op.
+ */
+public class NoOpForwardIndexCreator implements ForwardIndexCreator {
+ private final boolean _isSingleValue;
+
+ public NoOpForwardIndexCreator(boolean isSingleValue) {
+ _isSingleValue = isSingleValue;
+ }
+
+ @Override
+ public boolean isDictionaryEncoded() {
+ return true;
+ }
+
+ @Override
+ public boolean isSingleValue() {
+ return _isSingleValue;
+ }
+
+ @Override
+ public FieldSpec.DataType getValueType() {
+ return FieldSpec.DataType.INT;
+ }
+
+ @Override
+ public void putDictId(int dictId) {
+ }
+
+ @Override
+ public void putDictIdMV(int[] dictIds) {
+ }
+
+ @Override
+ public void close()
+ throws IOException {
+ }
+}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/PhysicalColumnIndexContainer.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/PhysicalColumnIndexContainer.java
index beeb768a5c..3a32aa8ada 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/PhysicalColumnIndexContainer.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/PhysicalColumnIndexContainer.java
@@ -133,7 +133,10 @@ public final class PhysicalColumnIndexContainer implements ColumnIndexContainer
_rangeIndex = null;
}
- PinotDataBuffer fwdIndexBuffer = segmentReader.getIndexFor(columnName, ColumnIndexType.FORWARD_INDEX);
+ // Setting the 'fwdIndexBuffer' to null if forward index is disabled
+ PinotDataBuffer fwdIndexBuffer = segmentReader.hasIndexFor(columnName, ColumnIndexType.FORWARD_INDEX)
+ ? segmentReader.getIndexFor(columnName, ColumnIndexType.FORWARD_INDEX) : null;
+
if (metadata.hasDictionary()) {
// Dictionary-based index
_dictionary = loadDictionary(segmentReader.getIndexFor(columnName, ColumnIndexType.DICTIONARY), metadata,
@@ -142,6 +145,7 @@ public final class PhysicalColumnIndexContainer implements ColumnIndexContainer
// Single-value
if (metadata.isSorted()) {
// Sorted
+ // No need to check for null 'fwdIndexBuffer' as for sorted columns this is a no-op
SortedIndexReader<?> sortedIndexReader = indexReaderProvider.newSortedIndexReader(fwdIndexBuffer, metadata);
_forwardIndex = sortedIndexReader;
_invertedIndex = sortedIndexReader;
@@ -149,7 +153,12 @@ public final class PhysicalColumnIndexContainer implements ColumnIndexContainer
return;
}
}
- _forwardIndex = indexReaderProvider.newForwardIndexReader(fwdIndexBuffer, metadata);
+ if (fwdIndexBuffer != null) {
+ _forwardIndex = indexReaderProvider.newForwardIndexReader(fwdIndexBuffer, metadata);
+ } else {
+ // Forward index disabled
+ _forwardIndex = null;
+ }
if (loadInvertedIndex) {
_invertedIndex = indexReaderProvider.newInvertedIndexReader(
segmentReader.getIndexFor(columnName, ColumnIndexType.INVERTED_INDEX), metadata);
@@ -266,7 +275,9 @@ public final class PhysicalColumnIndexContainer implements ColumnIndexContainer
@Override
public void close()
throws IOException {
- _forwardIndex.close();
+ if (_forwardIndex != null) {
+ _forwardIndex.close();
+ }
if (_invertedIndex != null) {
_invertedIndex.close();
}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/datasource/BaseDataSource.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/datasource/BaseDataSource.java
index 828b7e4b51..cb40a4a453 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/datasource/BaseDataSource.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/datasource/BaseDataSource.java
@@ -45,7 +45,7 @@ public abstract class BaseDataSource implements DataSource {
private final BloomFilterReader _bloomFilter;
private final NullValueVectorReader _nullValueVector;
- public BaseDataSource(DataSourceMetadata dataSourceMetadata, ForwardIndexReader<?> forwardIndex,
+ public BaseDataSource(DataSourceMetadata dataSourceMetadata, @Nullable ForwardIndexReader<?> forwardIndex,
@Nullable Dictionary dictionary, @Nullable InvertedIndexReader<?> invertedIndex,
@Nullable RangeIndexReader<?> rangeIndex, @Nullable TextIndexReader textIndex,
@Nullable TextIndexReader fstIndex, @Nullable JsonIndexReader jsonIndex, @Nullable H3IndexReader h3Index,
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java
index d83ebbfe58..1d798b5101 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java
@@ -73,6 +73,7 @@ public class IndexLoadingConfig {
private final Map<String, String> _noDictionaryConfig = new HashMap<>();
private final Set<String> _varLengthDictionaryColumns = new HashSet<>();
private Set<String> _onHeapDictionaryColumns = new HashSet<>();
+ private Set<String> _forwardIndexDisabledColumns = new HashSet<>();
private Map<String, BloomFilterConfig> _bloomFilterConfigs = new HashMap<>();
private boolean _enableDynamicStarTreeCreation;
private List<StarTreeIndexConfig> _starTreeIndexConfigs;
@@ -169,6 +170,7 @@ public class IndexLoadingConfig {
extractTextIndexColumnsFromTableConfig(tableConfig);
extractFSTIndexColumnsFromTableConfig(tableConfig);
extractH3IndexConfigsFromTableConfig(tableConfig);
+ extractForwardIndexDisabledColumnsFromTableConfig(tableConfig);
Map<String, List<TimestampIndexGranularity>> timestampIndexConfigs =
SegmentGeneratorConfig.extractTimestampIndexConfigsFromTableConfig(tableConfig);
@@ -329,6 +331,30 @@ public class IndexLoadingConfig {
_segmentDirectoryLoader = instanceDataManagerConfig.getSegmentDirectoryLoader();
}
+ /**
+ * Forward index disabled info for each column is specified
+ * using {@link FieldConfig} model of indicating per column
+ * encoding and indexing information. Since IndexLoadingConfig
+ * is created from TableConfig, we extract the no forward index info
+ * from fieldConfigList in TableConfig via the properties bag.
+ * @param tableConfig table config
+ */
+ private void extractForwardIndexDisabledColumnsFromTableConfig(TableConfig tableConfig) {
+ List<FieldConfig> fieldConfigList = tableConfig.getFieldConfigList();
+ if (fieldConfigList != null) {
+ for (FieldConfig fieldConfig : fieldConfigList) {
+ Map<String, String> fieldConfigProperties = fieldConfig.getProperties();
+ if (fieldConfigProperties != null) {
+ boolean forwardIndexDisabled = Boolean.parseBoolean(fieldConfigProperties
+ .getOrDefault(FieldConfig.FORWARD_INDEX_DISABLED, FieldConfig.DEFAULT_FORWARD_INDEX_DISABLED));
+ if (forwardIndexDisabled) {
+ _forwardIndexDisabledColumns.add(fieldConfig.getName());
+ }
+ }
+ }
+ }
+ }
+
/**
* For tests only.
*/
@@ -350,6 +376,19 @@ public class IndexLoadingConfig {
return _sortedColumns;
}
+ /**
+ * For tests only.
+ */
+ @VisibleForTesting
+ public void setSortedColumn(String sortedColumn) {
+ if (sortedColumn != null) {
+ _sortedColumns = new ArrayList<>();
+ _sortedColumns.add(sortedColumn);
+ } else {
+ _sortedColumns = Collections.emptyList();
+ }
+ }
+
public Set<String> getInvertedIndexColumns() {
return _invertedIndexColumns;
}
@@ -480,6 +519,14 @@ public class IndexLoadingConfig {
_onHeapDictionaryColumns = onHeapDictionaryColumns;
}
+ /**
+ * For tests only.
+ */
+ @VisibleForTesting
+ public void setForwardIndexDisabledColumns(Set<String> forwardIndexDisabledColumns) {
+ _forwardIndexDisabledColumns = forwardIndexDisabledColumns;
+ }
+
public Set<String> getNoDictionaryColumns() {
return _noDictionaryColumns;
}
@@ -496,7 +543,7 @@ public class IndexLoadingConfig {
return _compressionConfigs;
}
- public Map<String, String> getnoDictionaryConfig() {
+ public Map<String, String> getNoDictionaryConfig() {
return _noDictionaryConfig;
}
@@ -508,6 +555,10 @@ public class IndexLoadingConfig {
return _onHeapDictionaryColumns;
}
+ public Set<String> getForwardIndexDisabledColumns() {
+ return _forwardIndexDisabledColumns;
+ }
+
public Map<String, BloomFilterConfig> getBloomFilterConfigs() {
return _bloomFilterConfigs;
}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
index ed09a6f299..db234a817a 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
@@ -42,6 +42,7 @@ import org.apache.pinot.segment.local.segment.creator.impl.SegmentDictionaryCrea
import org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueUnsortedForwardIndexCreator;
import org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueSortedForwardIndexCreator;
import org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueUnsortedForwardIndexCreator;
+import org.apache.pinot.segment.local.segment.creator.impl.inv.BitSlicedRangeIndexCreator;
import org.apache.pinot.segment.local.segment.creator.impl.stats.BytesColumnPredIndexStatsCollector;
import org.apache.pinot.segment.local.segment.creator.impl.stats.DoubleColumnPreIndexStatsCollector;
import org.apache.pinot.segment.local.segment.creator.impl.stats.FloatColumnPreIndexStatsCollector;
@@ -224,6 +225,22 @@ public abstract class BaseDefaultColumnHandler implements DefaultColumnHandler {
if (columnMetadata != null) {
// Column exists in the segment, check if we need to update the value.
+ if (_segmentWriter != null && !columnMetadata.isAutoGenerated()) {
+ // Check that forward index disabled isn't enabled / disabled on an existing column (not auto-generated).
+ // TODO: Add support for reloading segments when forward index disabled flag is enabled or disabled
+ boolean forwardIndexDisabled = !_segmentWriter.hasIndexFor(column, ColumnIndexType.FORWARD_INDEX);
+ if (forwardIndexDisabled != _indexLoadingConfig.getForwardIndexDisabledColumns()
+ .contains(column)) {
+ String failureMessage =
+ "Forward index disabled in segment: " + forwardIndexDisabled + " for column: " + column
+ + " does not match forward index disabled flag: "
+ + _indexLoadingConfig.getForwardIndexDisabledColumns().contains(column) + " in the TableConfig, "
+ + "setting this flag on new columns or updating this flag is not supported at the moment. Please "
+ + "backfill or refresh segments to use this feature.";
+ throw new RuntimeException(failureMessage);
+ }
+ }
+
// Only check for auto-generated column.
if (!columnMetadata.isAutoGenerated()) {
continue;
@@ -363,6 +380,13 @@ public abstract class BaseDefaultColumnHandler implements DefaultColumnHandler {
argument);
return false;
}
+ // TODO: Support creation of derived columns from forward index disabled columns
+ if (!_segmentWriter.hasIndexFor(argument, ColumnIndexType.FORWARD_INDEX)) {
+ throw new UnsupportedOperationException(String.format("Operation not supported! Cannot create a derived "
+ + "column %s because argument: %s does not have a forward index. Enable forward index and "
+ + "refresh/backfill the segments to create a derived column from source column %s", column, argument,
+ argument));
+ }
argumentsMetadata.add(columnMetadata);
}
@@ -372,6 +396,12 @@ public abstract class BaseDefaultColumnHandler implements DefaultColumnHandler {
return false;
}
+ // TODO: Support forward index disabled derived column
+ if (_indexLoadingConfig.getForwardIndexDisabledColumns().contains(column)) {
+ LOGGER.warn("Skip creating forward index disabled derived column: {}", column);
+ return false;
+ }
+
try {
createDerivedColumnV1Indices(column, functionEvaluator, argumentsMetadata);
return true;
@@ -388,8 +418,46 @@ public abstract class BaseDefaultColumnHandler implements DefaultColumnHandler {
return true;
}
+ /**
+ * Validates the compatibility of the indexes if the column has the forward index disabled. Throws exceptions due to
+ * compatibility mismatch. The checks performed are:
+ * - Validate dictionary is enabled.
+ * - Validate inverted index is enabled.
+ * - Validate that either no range index exists for column or the range index version is at least 2 and isn't a
+ * multi-value column (since multi-value defaults to index v1).
+ */
+ protected void validateForwardIndexDisabledConfigsIfPresent(String column, boolean forwardIndexDisabled) {
+ if (!forwardIndexDisabled) {
+ return;
+ }
+ LOGGER.warn("Disabling forward index on a new column {} is currently not supported. Treating this as a no-op!",
+ column);
+ FieldSpec fieldSpec = _schema.getFieldSpecFor(column);
+ Preconditions.checkState(_indexLoadingConfig.getInvertedIndexColumns().contains(column),
+ String.format("Inverted index must be enabled for forward index disabled column: %s", column));
+ Preconditions.checkState(!_indexLoadingConfig.getNoDictionaryColumns().contains(column),
+ String.format("Dictionary disabled column: %s cannot disable the forward index", column));
+ if (_indexLoadingConfig.getRangeIndexColumns() != null
+ && _indexLoadingConfig.getRangeIndexColumns().contains(column)) {
+ Preconditions.checkState(fieldSpec.isSingleValueField(),
+ String.format("Multi-value column with range index: %s cannot disable the forward index", column));
+ Preconditions.checkState(_indexLoadingConfig.getRangeIndexVersion() == BitSlicedRangeIndexCreator.VERSION,
+ String.format("Single-value column with range index version < 2: %s cannot disable the forward index",
+ column));
+ }
+ }
+
+ /**
+ * Check and return whether the forward index is disabled for a given column
+ */
+ protected boolean isForwardIndexDisabled(String column) {
+ return _indexLoadingConfig.getForwardIndexDisabledColumns() != null
+ && _indexLoadingConfig.getForwardIndexDisabledColumns().contains(column);
+ }
+
/**
* Helper method to create the V1 indices (dictionary and forward index) for a column with default values.
+ * TODO: Add support for handling the forwardIndexDisabled flag. Today this flag is ignored for default columns
*/
private void createDefaultValueColumnV1Indices(String column)
throws Exception {
@@ -402,6 +470,12 @@ public abstract class BaseDefaultColumnHandler implements DefaultColumnHandler {
boolean isSingleValue = fieldSpec.isSingleValueField();
int maxNumberOfMultiValueElements = isSingleValue ? 0 : 1;
+ // Validate that the forwardIndexDisabled flag, if enabled, is compatible with other indexes and configs
+ // For now the forwardIndexDisabled flag is ignored for default columns but will be handled as part of reload
+ // changes
+ boolean forwardIndexDisabled = isForwardIndexDisabled(column);
+ validateForwardIndexDisabledConfigsIfPresent(column, forwardIndexDisabled);
+
Object sortedArray;
switch (dataType.getStoredType()) {
case INT:
@@ -465,7 +539,10 @@ public abstract class BaseDefaultColumnHandler implements DefaultColumnHandler {
}
}
} else {
+ // TODO: Add support to disable the forward index if the forwardIndexDisabled flag is true and the column is a
+ // multi-value column.
// Multi-value column.
+
try (
MultiValueUnsortedForwardIndexCreator mvFwdIndexCreator = new MultiValueUnsortedForwardIndexCreator(_indexDir,
fieldSpec.getName(), 1/*cardinality*/, totalDocs/*numDocs*/, totalDocs/*totalNumberOfValues*/)) {
@@ -486,6 +563,7 @@ public abstract class BaseDefaultColumnHandler implements DefaultColumnHandler {
* TODO:
* - Support chained derived column
* - Support raw derived column
+ * - Support forward index disabled derived column
*/
private void createDerivedColumnV1Indices(String column, FunctionEvaluator functionEvaluator,
List<ColumnMetadata> argumentsMetadata)
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedBitMVForwardIndexReader.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedBitMVForwardIndexReader.java
index b783dc1f0f..7a6f153768 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedBitMVForwardIndexReader.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedBitMVForwardIndexReader.java
@@ -172,6 +172,7 @@ public final class FixedBitMVForwardIndexReader implements ForwardIndexReader<Fi
return dictIdBuffer;
}
+ @Override
public int getNumValuesMV(int docId, Context context) {
int contextDocId = context._docId;
int contextEndOffset = context._endOffset;
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentColumnReader.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentColumnReader.java
index 87f2149d35..a510bfd128 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentColumnReader.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentColumnReader.java
@@ -42,6 +42,7 @@ public class PinotSegmentColumnReader implements Closeable {
DataSource dataSource = indexSegment.getDataSource(column);
Preconditions.checkArgument(dataSource != null, "Failed to find data source for column: %s", column);
_forwardIndexReader = dataSource.getForwardIndex();
+ Preconditions.checkArgument(_forwardIndexReader != null, "Forward index disabled for column: %s", column);
_forwardIndexReaderContext = _forwardIndexReader.createContext();
_dictionary = dataSource.getDictionary();
_nullValueVectorReader = dataSource.getNullValueVector();
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/startree/v2/store/StarTreeLoaderUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/startree/v2/store/StarTreeLoaderUtils.java
index c966e8318a..899d736542 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/startree/v2/store/StarTreeLoaderUtils.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/startree/v2/store/StarTreeLoaderUtils.java
@@ -128,6 +128,7 @@ public class StarTreeLoaderUtils {
throws IOException {
// NOTE: Close the indexes managed by the star-tree (dictionary is managed inside the ColumnIndexContainer).
for (DataSource dataSource : dataSourceMap.values()) {
+ // Forward index cannot be null here
dataSource.getForwardIndex().close();
}
}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index ed6e4d9095..2f15d9b2b4 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -42,6 +42,7 @@ import org.apache.pinot.common.tier.TierFactory;
import org.apache.pinot.common.utils.config.TagNameUtils;
import org.apache.pinot.segment.local.function.FunctionEvaluator;
import org.apache.pinot.segment.local.function.FunctionEvaluatorFactory;
+import org.apache.pinot.segment.local.segment.creator.impl.inv.BitSlicedRangeIndexCreator;
import org.apache.pinot.segment.spi.AggregationFunctionType;
import org.apache.pinot.segment.spi.index.startree.AggregationFunctionColumnPair;
import org.apache.pinot.spi.config.table.FieldConfig;
@@ -847,6 +848,7 @@ public final class TableConfigUtils {
* Validates the Field Config List in the given TableConfig
* Ensures that every referred column name exists in the corresponding schema
* Additional checks for TEXT and FST index types
+ * Validates index compatibility for forward index disabled columns
*/
private static void validateFieldConfigList(@Nullable List<FieldConfig> fieldConfigList,
@Nullable IndexingConfig indexingConfigs, @Nullable Schema schema) {
@@ -874,6 +876,10 @@ public final class TableConfigUtils {
default:
break;
}
+
+ // Validate the forward index disabled compatibility with other indexes if enabled for this column
+ validateForwardIndexDisabledIndexCompatibility(columnName, fieldConfig, indexingConfigs, noDictionaryColumns,
+ schema);
}
if (CollectionUtils.isNotEmpty(fieldConfig.getIndexTypes())) {
@@ -902,6 +908,44 @@ public final class TableConfigUtils {
}
}
+ /**
+ * Validates the compatibility of the indexes if the column has the forward index disabled. Throws exceptions due to
+ * compatibility mismatch. The checks performed are:
+ * - Validate dictionary is enabled.
+ * - Validate inverted index is enabled.
+ * - Validate that either no range index exists for column or the range index version is at least 2 and isn't a
+ * multi-value column (since mulit-value defaults to index v1).
+ */
+ private static void validateForwardIndexDisabledIndexCompatibility(String columnName, FieldConfig fieldConfig,
+ IndexingConfig indexingConfigs, List<String> noDictionaryColumns, Schema schema) {
+ Map<String, String> fieldConfigProperties = fieldConfig.getProperties();
+ if (fieldConfigProperties == null) {
+ return;
+ }
+
+ boolean forwardIndexDisabled = Boolean.parseBoolean(fieldConfigProperties.get(FieldConfig.FORWARD_INDEX_DISABLED));
+ if (!forwardIndexDisabled) {
+ return;
+ }
+
+ FieldSpec fieldSpec = schema.getFieldSpecFor(columnName);
+ Preconditions.checkState(fieldConfig.getEncodingType() == FieldConfig.EncodingType.DICTIONARY
+ || noDictionaryColumns == null || !noDictionaryColumns.contains(columnName),
+ String.format("Forward index disabled column %s must have dictionary enabled", columnName));
+ Preconditions.checkState(indexingConfigs.getInvertedIndexColumns() != null
+ && indexingConfigs.getInvertedIndexColumns().contains(columnName),
+ String.format("Forward index disabled column %s must have inverted index enabled", columnName));
+ if (indexingConfigs.getRangeIndexColumns() != null && indexingConfigs.getRangeIndexColumns().contains(columnName)) {
+ Preconditions.checkState(fieldSpec.isSingleValueField(), String.format("Feature not supported for multi-value "
+ + "columns with range index. Cannot disable forward index for column %s. Disable range index on this "
+ + "column to use this feature", columnName));
+ Preconditions.checkState(indexingConfigs.getRangeIndexVersion() == BitSlicedRangeIndexCreator.VERSION,
+ String.format("Feature not supported for single-value columns with range index version < 2. Cannot disable "
+ + "forward index for column %s. Either disable range index or create range index with"
+ + " version >= 2 to use this feature", columnName));
+ }
+ }
+
private static void sanitize(TableConfig tableConfig) {
tableConfig.setIndexingConfig(sanitizeIndexingConfig(tableConfig.getIndexingConfig()));
}
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/LoaderTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/LoaderTest.java
index cc00e6ad97..0c09383679 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/LoaderTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/LoaderTest.java
@@ -20,11 +20,13 @@ package org.apache.pinot.segment.local.segment.index.loader;
import java.io.File;
import java.net.URL;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.utils.TarGzCompressionUtils;
import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
@@ -33,6 +35,7 @@ import org.apache.pinot.segment.local.segment.creator.impl.SegmentCreationDriver
import org.apache.pinot.segment.local.segment.index.converter.SegmentV1V2ToV3FormatConverter;
import org.apache.pinot.segment.local.segment.index.readers.StringDictionary;
import org.apache.pinot.segment.spi.ColumnMetadata;
+import org.apache.pinot.segment.spi.ImmutableSegment;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.V1Constants;
import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
@@ -62,6 +65,7 @@ import org.testng.annotations.Test;
import org.testng.collections.Lists;
import static org.apache.pinot.segment.spi.V1Constants.Indexes.FST_INDEX_FILE_EXTENSION;
+import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
@@ -75,6 +79,7 @@ public class LoaderTest {
private static final String TEXT_INDEX_COL_NAME = "column5";
private static final String FST_INDEX_COL_NAME = "column5";
+ private static final String NO_FORWARD_INDEX_COL_NAME = "column4";
private File _avroFile;
private File _indexDir;
@@ -457,6 +462,160 @@ public class LoaderTest {
indexSegment.destroy();
}
+ private void constructSegmentWithForwardIndexDisabled(SegmentVersion segmentVersion, boolean enableInvertedIndex)
+ throws Exception {
+ FileUtils.deleteQuietly(INDEX_DIR);
+ SegmentGeneratorConfig segmentGeneratorConfig =
+ SegmentTestUtils.getSegmentGeneratorConfigWithoutTimeColumn(_avroFile, INDEX_DIR, "testTable");
+ SegmentIndexCreationDriver driver = SegmentCreationDriverFactory.get(null);
+ List<String> forwardIndexDisabledColumns = new ArrayList<>();
+ forwardIndexDisabledColumns.add(NO_FORWARD_INDEX_COL_NAME);
+ segmentGeneratorConfig.setForwardIndexDisabledColumns(forwardIndexDisabledColumns);
+ if (enableInvertedIndex) {
+ segmentGeneratorConfig.createInvertedIndexForColumn(NO_FORWARD_INDEX_COL_NAME);
+ }
+ segmentGeneratorConfig.setSegmentVersion(segmentVersion);
+ driver.init(segmentGeneratorConfig);
+ driver.build();
+
+ _indexDir = new File(INDEX_DIR, driver.getSegmentName());
+ }
+
+ @Test
+ public void testForwardIndexDisabledLoad()
+ throws Exception {
+ // Tests for scenarios by creating on-disk segment in V3 and then loading
+ // the segment with and without specifying segmentVersion in IndexLoadingConfig
+
+ // create on-disk segment in V3
+ // this generates the segment in V1 but converts to V3 as part of post-creation processing
+ constructSegmentWithForwardIndexDisabled(SegmentVersion.v3, true);
+
+ // check that segment on-disk version is V3 after creation
+ Assert.assertEquals(new SegmentMetadataImpl(_indexDir).getVersion(), SegmentVersion.v3);
+ // check that V3 index sub-dir exists
+ Assert.assertTrue(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, SegmentVersion.v3).exists());
+ // check that index dir is not in V1 format (the only subdir it should have is V3)
+ verifyIndexDirIsV3(_indexDir);
+
+ // CASE 1: don't set the segment version to load in IndexLoadingConfig
+ // there should be no conversion done by ImmutableSegmentLoader and it should
+ // be able to create all index readers with on-disk version V3
+ IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig();
+
+ Set<String> forwardIndexDisabledColumns = new HashSet<>();
+ forwardIndexDisabledColumns.add(NO_FORWARD_INDEX_COL_NAME);
+ indexLoadingConfig.setForwardIndexDisabledColumns(forwardIndexDisabledColumns);
+ indexLoadingConfig.setReadMode(ReadMode.mmap);
+ ImmutableSegment indexSegment = ImmutableSegmentLoader.load(_indexDir, indexLoadingConfig);
+ // check that loaded segment version is v3
+ Assert.assertEquals(indexSegment.getSegmentMetadata().getVersion(), SegmentVersion.v3);
+ Assert.assertNull(indexSegment.getForwardIndex(NO_FORWARD_INDEX_COL_NAME));
+ Assert.assertTrue(indexSegment.getSegmentMetadata().getColumnMetadataFor(NO_FORWARD_INDEX_COL_NAME)
+ .hasDictionary());
+ // no change/conversion should have happened in indexDir
+ Assert.assertTrue(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, SegmentVersion.v3).exists());
+ // check that index dir is not in V1 format (the only subdir it should have is V3)
+ verifyIndexDirIsV3(_indexDir);
+ indexSegment.destroy();
+
+ // CASE 2: set the segment version to load in IndexLoadingConfig as V3
+ // there should be no conversion done by ImmutableSegmentLoader since the segmentVersionToLoad
+ // is same as the version of segment on disk (V3)
+ indexLoadingConfig.setSegmentVersion(SegmentVersion.v3);
+ indexSegment = ImmutableSegmentLoader.load(_indexDir, indexLoadingConfig);
+ // check that loaded segment version is v3
+ Assert.assertEquals(indexSegment.getSegmentMetadata().getVersion(), SegmentVersion.v3);
+ Assert.assertNull(indexSegment.getForwardIndex(NO_FORWARD_INDEX_COL_NAME));
+ Assert.assertTrue(indexSegment.getSegmentMetadata().getColumnMetadataFor(NO_FORWARD_INDEX_COL_NAME)
+ .hasDictionary());
+ // no change/conversion should have happened in indexDir
+ Assert.assertTrue(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, SegmentVersion.v3).exists());
+ // check that index dir is not in V1 format (the only subdir it should have is V3)
+ verifyIndexDirIsV3(_indexDir);
+ indexSegment.destroy();
+
+ // Test for scenarios by creating on-disk segment in V1 and then loading
+ // the segment with and without specifying segmentVersion in IndexLoadingConfig
+
+ // create on-disk segment in V1
+ // this generates the segment in V1 and does not convert to V3 as part of post-creation processing
+ constructSegmentWithForwardIndexDisabled(SegmentVersion.v1, true);
+
+ // check that segment on-disk version is V1 after creation
+ Assert.assertEquals(new SegmentMetadataImpl(_indexDir).getVersion(), SegmentVersion.v1);
+ // check that segment v1 dir exists
+ Assert.assertTrue(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, SegmentVersion.v1).exists());
+ // check that v3 index sub-dir does not exist
+ Assert.assertFalse(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, SegmentVersion.v3).exists());
+
+ // CASE 1: don't set the segment version to load in IndexLoadingConfig
+ // there should be no conversion done by ImmutableSegmentLoader and it should
+ // be able to create all index readers with on-disk version V1
+ indexLoadingConfig = new IndexLoadingConfig();
+ forwardIndexDisabledColumns = new HashSet<>();
+ forwardIndexDisabledColumns.add(NO_FORWARD_INDEX_COL_NAME);
+ indexLoadingConfig.setForwardIndexDisabledColumns(forwardIndexDisabledColumns);
+ indexLoadingConfig.setReadMode(ReadMode.mmap);
+ indexSegment = ImmutableSegmentLoader.load(_indexDir, indexLoadingConfig);
+ // check that loaded segment version is v1
+ Assert.assertEquals(indexSegment.getSegmentMetadata().getVersion(), SegmentVersion.v1);
+ Assert.assertNull(indexSegment.getForwardIndex(NO_FORWARD_INDEX_COL_NAME));
+ Assert.assertTrue(indexSegment.getSegmentMetadata().getColumnMetadataFor(NO_FORWARD_INDEX_COL_NAME)
+ .hasDictionary());
+ // no change/conversion should have happened in indexDir
+ Assert.assertTrue(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, SegmentVersion.v1).exists());
+ Assert.assertFalse(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, SegmentVersion.v3).exists());
+ indexSegment.destroy();
+
+ // CASE 2: set the segment version to load in IndexLoadingConfig to V1
+ // there should be no conversion done by ImmutableSegmentLoader since the segmentVersionToLoad
+ // is same as the version of segment on fisk
+ indexLoadingConfig.setSegmentVersion(SegmentVersion.v1);
+ indexSegment = ImmutableSegmentLoader.load(_indexDir, indexLoadingConfig);
+ // check that loaded segment version is v1
+ Assert.assertEquals(indexSegment.getSegmentMetadata().getVersion(), SegmentVersion.v1);
+ Assert.assertNull(indexSegment.getForwardIndex(NO_FORWARD_INDEX_COL_NAME));
+ Assert.assertTrue(indexSegment.getSegmentMetadata().getColumnMetadataFor(NO_FORWARD_INDEX_COL_NAME)
+ .hasDictionary());
+ // no change/conversion should have happened in indexDir
+ Assert.assertTrue(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, SegmentVersion.v1).exists());
+ Assert.assertFalse(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, SegmentVersion.v3).exists());
+ indexSegment.destroy();
+
+ // CASE 3: set the segment version to load in IndexLoadingConfig to V3
+ // there should be conversion done by ImmutableSegmentLoader since the segmentVersionToLoad
+ // is different than the version of segment on disk
+ indexLoadingConfig.setSegmentVersion(SegmentVersion.v3);
+ indexSegment = ImmutableSegmentLoader.load(_indexDir, indexLoadingConfig);
+ // check that loaded segment version is v3
+ Assert.assertEquals(indexSegment.getSegmentMetadata().getVersion(), SegmentVersion.v3);
+ Assert.assertNull(indexSegment.getForwardIndex(NO_FORWARD_INDEX_COL_NAME));
+ Assert.assertTrue(indexSegment.getSegmentMetadata().getColumnMetadataFor(NO_FORWARD_INDEX_COL_NAME)
+ .hasDictionary());
+ // the index dir should exist in v3 format due to conversion
+ Assert.assertTrue(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, SegmentVersion.v3).exists());
+ verifyIndexDirIsV3(_indexDir);
+ indexSegment.destroy();
+
+ // Test scenarios to create a column with no forward index but without enabling inverted index for it
+ try {
+ constructSegmentWithForwardIndexDisabled(SegmentVersion.v3, false);
+ Assert.fail("Disabling forward index without enabling inverted index is not allowed!");
+ } catch (IllegalStateException e) {
+ assertEquals(String.format("Cannot disable forward index for column %s without inverted index enabled",
+ NO_FORWARD_INDEX_COL_NAME), e.getMessage());
+ }
+
+ try {
+ constructSegmentWithForwardIndexDisabled(SegmentVersion.v1, false);
+ Assert.fail("Disabling forward index without enabling inverted index is not allowed!");
+ } catch (IllegalStateException e) {
+ assertEquals(String.format("Cannot disable forward index for column %s without inverted index enabled",
+ NO_FORWARD_INDEX_COL_NAME), e.getMessage());
+ }
+ }
+
private void constructSegmentWithTextIndex(SegmentVersion segmentVersion)
throws Exception {
FileUtils.deleteQuietly(INDEX_DIR);
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java
index 2bcad84f23..babbe646bc 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java
@@ -70,6 +70,7 @@ import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.ByteArray;
import org.apache.pinot.spi.utils.ReadMode;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -100,6 +101,10 @@ public class SegmentPreProcessorTest {
// For create fst index tests
private static final String NEWLY_ADDED_FST_COL_DICT = "newFSTColDict";
+ // For create no forward index column tests
+ private static final String NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_SV = "newForwardIndexDisabledColumnSV";
+ private static final String NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_MV = "newForwardIndexDisabledColumnMV";
+
// For update default value tests.
private static final String NEW_COLUMNS_SCHEMA1 = "data/newColumnsSchema1.json";
private static final String NEW_COLUMNS_SCHEMA2 = "data/newColumnsSchema2.json";
@@ -107,6 +112,8 @@ public class SegmentPreProcessorTest {
private static final String NEW_COLUMNS_SCHEMA_WITH_FST = "data/newColumnsSchemaWithFST.json";
private static final String NEW_COLUMNS_SCHEMA_WITH_TEXT = "data/newColumnsSchemaWithText.json";
private static final String NEW_COLUMNS_SCHEMA_WITH_H3_JSON = "data/newColumnsSchemaWithH3Json.json";
+ private static final String NEW_COLUMNS_SCHEMA_WITH_NO_FORWARD_INDEX =
+ "data/newColumnsSchemaWithForwardIndexDisabled.json";
private static final String NEW_INT_METRIC_COLUMN_NAME = "newIntMetric";
private static final String NEW_LONG_METRIC_COLUMN_NAME = "newLongMetric";
private static final String NEW_FLOAT_METRIC_COLUMN_NAME = "newFloatMetric";
@@ -129,6 +136,7 @@ public class SegmentPreProcessorTest {
private Schema _newColumnsSchemaWithFST;
private Schema _newColumnsSchemaWithText;
private Schema _newColumnsSchemaWithH3Json;
+ private Schema _newColumnsSchemaWithForwardIndexDisabled;
@BeforeMethod
public void setUp()
@@ -186,6 +194,9 @@ public class SegmentPreProcessorTest {
resourceUrl = classLoader.getResource(NEW_COLUMNS_SCHEMA_WITH_H3_JSON);
assertNotNull(resourceUrl);
_newColumnsSchemaWithH3Json = Schema.fromFile(new File(resourceUrl.getFile()));
+ resourceUrl = classLoader.getResource(NEW_COLUMNS_SCHEMA_WITH_NO_FORWARD_INDEX);
+ assertNotNull(resourceUrl);
+ _newColumnsSchemaWithForwardIndexDisabled = Schema.fromFile(new File(resourceUrl.getFile()));
}
@AfterMethod
@@ -344,7 +355,7 @@ public class SegmentPreProcessorTest {
assertNotNull(columnMetadata);
checkTextIndexCreation(EXISTING_STRING_COL_RAW, 5, 3, _schema, false, false, false, 0);
validateIndex(ColumnIndexType.FORWARD_INDEX, EXISTING_STRING_COL_RAW, 5, 3, _schema, false, false, false, 0, true,
- 0, newCompressionType);
+ 0, newCompressionType, false);
// Test4: Change compression on RAW index column. Change another index on another column. Check correctness.
newCompressionType = ChunkCompressionType.ZSTANDARD;
@@ -361,7 +372,7 @@ public class SegmentPreProcessorTest {
checkFSTIndexCreation(EXISTING_STRING_COL_DICT, 9, 4, _newColumnsSchemaWithFST, false, false, 26);
// Check forward index.
validateIndex(ColumnIndexType.FORWARD_INDEX, EXISTING_STRING_COL_RAW, 5, 3, _schema, false, false, false, 0, true,
- 0, newCompressionType);
+ 0, newCompressionType, false);
}
/**
@@ -463,14 +474,14 @@ public class SegmentPreProcessorTest {
boolean isSorted, int dictionaryElementSize)
throws Exception {
createAndValidateIndex(ColumnIndexType.FST_INDEX, column, cardinality, bits, schema, isAutoGenerated, true,
- isSorted, dictionaryElementSize, true, 0, null);
+ isSorted, dictionaryElementSize, true, 0, null, false);
}
private void checkTextIndexCreation(String column, int cardinality, int bits, Schema schema, boolean isAutoGenerated,
boolean hasDictionary, boolean isSorted, int dictionaryElementSize)
throws Exception {
createAndValidateIndex(ColumnIndexType.TEXT_INDEX, column, cardinality, bits, schema, isAutoGenerated,
- hasDictionary, isSorted, dictionaryElementSize, true, 0, null);
+ hasDictionary, isSorted, dictionaryElementSize, true, 0, null, false);
}
private void checkTextIndexCreation(String column, int cardinality, int bits, Schema schema, boolean isAutoGenerated,
@@ -478,7 +489,7 @@ public class SegmentPreProcessorTest {
int maxNumberOfMultiValues)
throws Exception {
createAndValidateIndex(ColumnIndexType.TEXT_INDEX, column, cardinality, bits, schema, isAutoGenerated,
- hasDictionary, isSorted, dictionaryElementSize, isSingleValue, maxNumberOfMultiValues, null);
+ hasDictionary, isSorted, dictionaryElementSize, isSingleValue, maxNumberOfMultiValues, null, false);
}
private void checkForwardIndexCreation(String column, int cardinality, int bits, Schema schema,
@@ -486,12 +497,13 @@ public class SegmentPreProcessorTest {
ChunkCompressionType expectedCompressionType)
throws Exception {
createAndValidateIndex(ColumnIndexType.FORWARD_INDEX, column, cardinality, bits, schema, isAutoGenerated,
- hasDictionary, isSorted, dictionaryElementSize, true, 0, expectedCompressionType);
+ hasDictionary, isSorted, dictionaryElementSize, true, 0, expectedCompressionType, false);
}
private void createAndValidateIndex(ColumnIndexType indexType, String column, int cardinality, int bits,
Schema schema, boolean isAutoGenerated, boolean hasDictionary, boolean isSorted, int dictionaryElementSize,
- boolean isSingleValued, int maxNumberOfMultiValues, ChunkCompressionType expectedCompressionType)
+ boolean isSingleValued, int maxNumberOfMultiValues, ChunkCompressionType expectedCompressionType,
+ boolean forwardIndexDisabled)
throws Exception {
try (SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader()
@@ -500,13 +512,14 @@ public class SegmentPreProcessorTest {
SegmentPreProcessor processor = new SegmentPreProcessor(segmentDirectory, _indexLoadingConfig, schema)) {
processor.process();
validateIndex(indexType, column, cardinality, bits, schema, isAutoGenerated, hasDictionary, isSorted,
- dictionaryElementSize, isSingleValued, maxNumberOfMultiValues, expectedCompressionType);
+ dictionaryElementSize, isSingleValued, maxNumberOfMultiValues, expectedCompressionType, forwardIndexDisabled);
}
}
private void validateIndex(ColumnIndexType indexType, String column, int cardinality, int bits, Schema schema,
boolean isAutoGenerated, boolean hasDictionary, boolean isSorted, int dictionaryElementSize,
- boolean isSingleValued, int maxNumberOfMultiValues, ChunkCompressionType expectedCompressionType)
+ boolean isSingleValued, int maxNumberOfMultiValues, ChunkCompressionType expectedCompressionType,
+ boolean forwardIndexDisabled)
throws Exception {
SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(_indexDir);
ColumnMetadata columnMetadata = segmentMetadata.getColumnMetadataFor(column);
@@ -525,8 +538,18 @@ public class SegmentPreProcessorTest {
.load(_indexDir.toURI(),
new SegmentDirectoryLoaderContext.Builder().setSegmentDirectoryConfigs(_configuration).build());
SegmentDirectory.Reader reader = segmentDirectory1.createReader()) {
- assertTrue(reader.hasIndexFor(column, indexType));
- assertTrue(reader.hasIndexFor(column, ColumnIndexType.FORWARD_INDEX));
+ if (indexType != ColumnIndexType.FORWARD_INDEX || !forwardIndexDisabled) {
+ assertTrue(reader.hasIndexFor(column, indexType));
+ }
+ if (isSingleValued || !forwardIndexDisabled) {
+ assertTrue(reader.hasIndexFor(column, ColumnIndexType.FORWARD_INDEX));
+ if (isSingleValued && forwardIndexDisabled) {
+ assertFalse(reader.hasIndexFor(column, ColumnIndexType.INVERTED_INDEX));
+ }
+ } else {
+ // Default SV columns will have the default sorted forward index so only assert for MV columns here
+ assertFalse(reader.hasIndexFor(column, ColumnIndexType.FORWARD_INDEX));
+ }
// Check if the raw forward index compressionType is correct.
if (expectedCompressionType != null) {
@@ -1454,4 +1477,312 @@ public class SegmentPreProcessorTest {
new HashSet<>(Collections.singletonList("newColumnX"))));
return testCases;
}
+
+ /**
+ * Test to check the behavior of the forward index disabled feature when enabled on a new SV column
+ */
+ @Test
+ public void testForwardIndexDisabledOnNewColumnsSV()
+ throws Exception {
+ Set<String> forwardIndexDisabledColumns = new HashSet<>();
+ forwardIndexDisabledColumns.add(NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_SV);
+ _indexLoadingConfig.setForwardIndexDisabledColumns(forwardIndexDisabledColumns);
+ Set<String> invertedIndexColumns = _indexLoadingConfig.getInvertedIndexColumns();
+ invertedIndexColumns.addAll(forwardIndexDisabledColumns);
+ _indexLoadingConfig.setInvertedIndexColumns(invertedIndexColumns);
+
+ // Create a segment in V3, add a new column with no forward index enabled
+ constructV3Segment();
+ SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(_indexDir);
+ ColumnMetadata columnMetadata = segmentMetadata.getColumnMetadataFor(NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_SV);
+ // should be null since column does not exist in the schema
+ assertNull(columnMetadata);
+
+ // Forward index is always going to be present for default SV columns with forward index disabled. This is because
+ // such default columns are going to be sorted and the forwardIndexDisabled flag is a no-op for sorted columns
+ createAndValidateIndex(ColumnIndexType.FORWARD_INDEX, NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_SV, 1, 1,
+ _newColumnsSchemaWithForwardIndexDisabled, true, true, true, 4, true, 0, null, true);
+
+ // Create a segment in V1, add a column with no forward index enabled
+ constructV1Segment();
+ segmentMetadata = new SegmentMetadataImpl(_indexDir);
+ columnMetadata = segmentMetadata.getColumnMetadataFor(NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_SV);
+ // should be null since column does not exist in the schema
+ assertNull(columnMetadata);
+
+ // Forward index is always going to be present for default SV columns with forward index disabled. This is because
+ // such default columns are going to be sorted and the forwardIndexDisabled flag is a no-op for sorted columns
+ createAndValidateIndex(ColumnIndexType.FORWARD_INDEX, NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_SV, 1, 1,
+ _newColumnsSchemaWithForwardIndexDisabled, true, true, true, 4, true, 0, null, true);
+
+ // Add the column to the no dictionary column list
+ Set<String> existingNoDictionaryColumns = _indexLoadingConfig.getNoDictionaryColumns();
+ _indexLoadingConfig.setNoDictionaryColumns(forwardIndexDisabledColumns);
+
+ // Create a segment in V3, add a new column with no forward index enabled
+ constructV3Segment();
+ segmentMetadata = new SegmentMetadataImpl(_indexDir);
+ columnMetadata = segmentMetadata.getColumnMetadataFor(NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_SV);
+ // should be null since column does not exist in the schema
+ assertNull(columnMetadata);
+
+ try {
+ createAndValidateIndex(ColumnIndexType.FORWARD_INDEX,
+ NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_SV, 1, 1, _newColumnsSchemaWithForwardIndexDisabled, true, true, false,
+ 4, false, 1, null, false);
+ Assert.fail("Forward index cannot be disabled for dictionary disabled columns!");
+ } catch (IllegalStateException e) {
+ assertEquals(e.getMessage(), "Dictionary disabled column: newForwardIndexDisabledColumnSV cannot disable the "
+ + "forward index");
+ }
+
+ constructV1Segment();
+ segmentMetadata = new SegmentMetadataImpl(_indexDir);
+ columnMetadata = segmentMetadata.getColumnMetadataFor(NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_SV);
+ // should be null since column does not exist in the schema
+ assertNull(columnMetadata);
+
+ try {
+ createAndValidateIndex(ColumnIndexType.FORWARD_INDEX, NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_SV, 1, 1,
+ _newColumnsSchemaWithForwardIndexDisabled, true, true, false, 4, false, 1, null, false);
+ Assert.fail("Forward index cannot be disabled for dictionary disabled columns!");
+ } catch (IllegalStateException e) {
+ assertEquals(e.getMessage(), "Dictionary disabled column: newForwardIndexDisabledColumnSV cannot disable the "
+ + "forward index");
+ }
+
+ // Reset the no dictionary columns
+ _indexLoadingConfig.setNoDictionaryColumns(existingNoDictionaryColumns);
+
+ // Add the column to the sorted list
+ List<String> existingSortedColumns = _indexLoadingConfig.getSortedColumns();
+ _indexLoadingConfig.setSortedColumn(NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_SV);
+
+ // Create a segment in V3, add a new column with no forward index enabled
+ constructV3Segment();
+ segmentMetadata = new SegmentMetadataImpl(_indexDir);
+ columnMetadata = segmentMetadata.getColumnMetadataFor(NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_SV);
+ // should be null since column does not exist in the schema
+ assertNull(columnMetadata);
+
+ // Column should be sorted and should be created successfully since for SV columns the forwardIndexDisabled flag
+ // is a no-op
+ createAndValidateIndex(ColumnIndexType.FORWARD_INDEX,
+ NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_SV, 1, 1, _newColumnsSchemaWithForwardIndexDisabled, true, true, true,
+ 4, true, 0, null, false);
+
+ constructV1Segment();
+ segmentMetadata = new SegmentMetadataImpl(_indexDir);
+ columnMetadata = segmentMetadata.getColumnMetadataFor(NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_SV);
+ // should be null since column does not exist in the schema
+ assertNull(columnMetadata);
+
+ // Column should be sorted and should be created successfully since for SV columns the forwardIndexDisabled flag
+ // is a no-op
+ createAndValidateIndex(ColumnIndexType.FORWARD_INDEX,
+ NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_SV, 1, 1, _newColumnsSchemaWithForwardIndexDisabled, true, true, true,
+ 4, true, 0, null, false);
+
+ // Reset the sorted column list
+ _indexLoadingConfig.setSortedColumn(existingSortedColumns.isEmpty() ? null : existingSortedColumns.get(0));
+
+ // Remove the column from the inverted index column list and validate that it fails
+ invertedIndexColumns.removeAll(forwardIndexDisabledColumns);
+ _indexLoadingConfig.setInvertedIndexColumns(invertedIndexColumns);
+
+ // Create a segment in V3, add a new column with no forward index enabled
+ constructV3Segment();
+ segmentMetadata = new SegmentMetadataImpl(_indexDir);
+ columnMetadata = segmentMetadata.getColumnMetadataFor(NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_SV);
+ // should be null since column does not exist in the schema
+ assertNull(columnMetadata);
+
+ try {
+ createAndValidateIndex(ColumnIndexType.FORWARD_INDEX, NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_SV, 1, 1,
+ _newColumnsSchemaWithForwardIndexDisabled, true, true, true, 4, true, 0, null, false);
+ Assert.fail("Forward index cannot be disabled without enabling the inverted index!");
+ } catch (IllegalStateException e) {
+ assertEquals(e.getMessage(), "Inverted index must be enabled for forward index disabled column: "
+ + "newForwardIndexDisabledColumnSV");
+ }
+
+ constructV1Segment();
+ segmentMetadata = new SegmentMetadataImpl(_indexDir);
+ columnMetadata = segmentMetadata.getColumnMetadataFor(NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_SV);
+ // should be null since column does not exist in the schema
+ assertNull(columnMetadata);
+
+ try {
+ createAndValidateIndex(ColumnIndexType.FORWARD_INDEX,
+ NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_SV, 1, 1, _newColumnsSchemaWithForwardIndexDisabled, true, true, true,
+ 4, true, 0, null, false);
+ } catch (IllegalStateException e) {
+ assertEquals(e.getMessage(), "Inverted index must be enabled for forward index disabled column: "
+ + "newForwardIndexDisabledColumnSV");
+ }
+
+ _indexLoadingConfig.setForwardIndexDisabledColumns(new HashSet<>());
+ }
+
+ /**
+ * Test to check the behavior of the forward index disabled feature when enabled on a new MV column
+ * TODO: Add support for handling the forwardIndexDisabled flag on the reload path then enable and fix this test
+ */
+ @Test(enabled = false)
+ public void testForwardIndexDisabledOnNewColumnsMV()
+ throws Exception {
+ Set<String> forwardIndexDisabledColumns = new HashSet<>();
+ forwardIndexDisabledColumns.add(NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_MV);
+ _indexLoadingConfig.setForwardIndexDisabledColumns(forwardIndexDisabledColumns);
+ Set<String> invertedIndexColumns = _indexLoadingConfig.getInvertedIndexColumns();
+ invertedIndexColumns.addAll(forwardIndexDisabledColumns);
+ _indexLoadingConfig.setInvertedIndexColumns(invertedIndexColumns);
+
+ // Create a segment in V3, add a new column with no forward index enabled
+ constructV3Segment();
+ SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(_indexDir);
+ ColumnMetadata columnMetadata = segmentMetadata.getColumnMetadataFor(NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_MV);
+ // should be null since column does not exist in the schema
+ assertNull(columnMetadata);
+
+ // Validate that the forward index doesn't exist and that inverted index does exist
+ createAndValidateIndex(ColumnIndexType.FORWARD_INDEX, NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_MV, 1, 1,
+ _newColumnsSchemaWithForwardIndexDisabled, true, true, false, 4, false, 1, null, true);
+ createAndValidateIndex(ColumnIndexType.INVERTED_INDEX, NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_MV, 1, 1,
+ _newColumnsSchemaWithForwardIndexDisabled, true, true, false, 4, false, 1, null, true);
+
+ // Create a segment in V1, add a column with no forward index enabled
+ constructV1Segment();
+ segmentMetadata = new SegmentMetadataImpl(_indexDir);
+ columnMetadata = segmentMetadata.getColumnMetadataFor(NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_MV);
+ // should be null since column does not exist in the schema
+ assertNull(columnMetadata);
+
+ // Validate that the forward index doesn't exist and that inverted index does exist
+ createAndValidateIndex(ColumnIndexType.FORWARD_INDEX, NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_MV, 1, 1,
+ _newColumnsSchemaWithForwardIndexDisabled, true, true, false, 4, false, 1, null, true);
+ createAndValidateIndex(ColumnIndexType.INVERTED_INDEX, NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_MV, 1, 1,
+ _newColumnsSchemaWithForwardIndexDisabled, true, true, false, 4, false, 1, null, true);
+
+ // Add the column to the no dictionary column list
+ Set<String> existingNoDictionaryColumns = _indexLoadingConfig.getNoDictionaryColumns();
+ _indexLoadingConfig.setNoDictionaryColumns(forwardIndexDisabledColumns);
+
+ // Create a segment in V3, add a new column with no forward index enabled
+ constructV3Segment();
+ segmentMetadata = new SegmentMetadataImpl(_indexDir);
+ columnMetadata = segmentMetadata.getColumnMetadataFor(NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_MV);
+ // should be null since column does not exist in the schema
+ assertNull(columnMetadata);
+
+ assertThrows(IllegalStateException.class, () -> createAndValidateIndex(ColumnIndexType.FORWARD_INDEX,
+ NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_MV, 1, 1, _newColumnsSchemaWithForwardIndexDisabled, true, true, false,
+ 4, false, 1, null, false));
+
+ constructV1Segment();
+ segmentMetadata = new SegmentMetadataImpl(_indexDir);
+ columnMetadata = segmentMetadata.getColumnMetadataFor(NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_MV);
+ // should be null since column does not exist in the schema
+ assertNull(columnMetadata);
+
+ assertThrows(IllegalStateException.class, () -> createAndValidateIndex(ColumnIndexType.FORWARD_INDEX,
+ NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_MV, 1, 1, _newColumnsSchemaWithForwardIndexDisabled, true, true, false,
+ 4, false, 1, null, false));
+
+ // Reset the no dictionary columns
+ _indexLoadingConfig.setNoDictionaryColumns(existingNoDictionaryColumns);
+
+ // Remove the column from the inverted index column list and validate that it fails
+ invertedIndexColumns.removeAll(forwardIndexDisabledColumns);
+ _indexLoadingConfig.setInvertedIndexColumns(invertedIndexColumns);
+
+ // Create a segment in V3, add a new column with no forward index enabled
+ constructV3Segment();
+ segmentMetadata = new SegmentMetadataImpl(_indexDir);
+ columnMetadata = segmentMetadata.getColumnMetadataFor(NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_MV);
+ // should be null since column does not exist in the schema
+ assertNull(columnMetadata);
+
+ assertThrows(IllegalStateException.class, () -> createAndValidateIndex(ColumnIndexType.FORWARD_INDEX,
+ NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_MV, 1, 1, _newColumnsSchemaWithForwardIndexDisabled, true, true, false,
+ 4, false, 1, null, false));
+
+ constructV1Segment();
+ segmentMetadata = new SegmentMetadataImpl(_indexDir);
+ columnMetadata = segmentMetadata.getColumnMetadataFor(NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_MV);
+ // should be null since column does not exist in the schema
+ assertNull(columnMetadata);
+
+ assertThrows(IllegalStateException.class, () -> createAndValidateIndex(ColumnIndexType.FORWARD_INDEX,
+ NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_MV, 1, 1, _newColumnsSchemaWithForwardIndexDisabled, true, true, false,
+ 4, false, 1, null, false));
+
+ _indexLoadingConfig.setForwardIndexDisabledColumns(new HashSet<>());
+ }
+
+ /**
+ * Test to check the behavior of the forward index disabled feature
+ * when enabled on an existing dictionary encoded column
+ */
+ @Test
+ public void testForwardIndexDisabledOnExistingColumnDictEncoded()
+ throws Exception {
+ Set<String> forwardIndexDisabledColumns = new HashSet<>();
+ forwardIndexDisabledColumns.add(EXISTING_STRING_COL_DICT);
+ _indexLoadingConfig.setForwardIndexDisabledColumns(forwardIndexDisabledColumns);
+ Set<String> invertedIndexColumns = _indexLoadingConfig.getInvertedIndexColumns();
+ invertedIndexColumns.addAll(forwardIndexDisabledColumns);
+
+ constructV3Segment();
+ SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(_indexDir);
+ ColumnMetadata columnMetadata = segmentMetadata.getColumnMetadataFor(EXISTING_STRING_COL_DICT);
+ assertNotNull(columnMetadata);
+
+ SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader()
+ .load(_indexDir.toURI(),
+ new SegmentDirectoryLoaderContext.Builder().setSegmentDirectoryConfigs(_configuration).build());
+ SegmentPreProcessor v3Processor =
+ new SegmentPreProcessor(segmentDirectory, _indexLoadingConfig, _newColumnsSchemaWithForwardIndexDisabled);
+ expectThrows(RuntimeException.class, v3Processor::process);
+
+ constructV1Segment();
+ segmentDirectory = SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader().load(_indexDir.toURI(),
+ new SegmentDirectoryLoaderContext.Builder().setSegmentDirectoryConfigs(_configuration).build());
+ SegmentPreProcessor v1Processor =
+ new SegmentPreProcessor(segmentDirectory, _indexLoadingConfig, _newColumnsSchemaWithForwardIndexDisabled);
+ expectThrows(RuntimeException.class, v1Processor::process);
+ }
+
+ /**
+ * Test to check the behavior of the forward index disabled feature
+ * when enabled on an existing raw column
+ */
+ @Test
+ public void testForwardIndexDisabledOnExistingColumnRaw()
+ throws Exception {
+ Set<String> forwardIndexDisabledColumns = new HashSet<>();
+ forwardIndexDisabledColumns.add(EXISTING_STRING_COL_RAW);
+ _indexLoadingConfig.setForwardIndexDisabledColumns(forwardIndexDisabledColumns);
+ Set<String> invertedIndexColumns = _indexLoadingConfig.getInvertedIndexColumns();
+ invertedIndexColumns.addAll(forwardIndexDisabledColumns);
+
+ constructV3Segment();
+ SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(_indexDir);
+ ColumnMetadata columnMetadata = segmentMetadata.getColumnMetadataFor(EXISTING_STRING_COL_RAW);
+ assertNotNull(columnMetadata);
+
+ SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader()
+ .load(_indexDir.toURI(),
+ new SegmentDirectoryLoaderContext.Builder().setSegmentDirectoryConfigs(_configuration).build());
+ SegmentPreProcessor v3Processor =
+ new SegmentPreProcessor(segmentDirectory, _indexLoadingConfig, _newColumnsSchemaWithForwardIndexDisabled);
+ expectThrows(RuntimeException.class, v3Processor::process);
+
+ constructV1Segment();
+ segmentDirectory = SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader().load(_indexDir.toURI(),
+ new SegmentDirectoryLoaderContext.Builder().setSegmentDirectoryConfigs(_configuration).build());
+ SegmentPreProcessor v1Processor =
+ new SegmentPreProcessor(segmentDirectory, _indexLoadingConfig, _newColumnsSchemaWithForwardIndexDisabled);
+ expectThrows(RuntimeException.class, v1Processor::process);
+ }
}
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandlerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandlerTest.java
index beee1f8b73..47ea357894 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandlerTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandlerTest.java
@@ -70,7 +70,7 @@ public class BaseDefaultColumnHandlerTest {
driver.build();
_segmentDirectory = new File(_indexDir, driver.getSegmentName());
_committedSegmentMetadata = new SegmentMetadataImpl(_segmentDirectory);
- _writer = new SegmentLocalFSDirectory(_indexDir, _committedSegmentMetadata, ReadMode.mmap).createWriter();
+ _writer = new SegmentLocalFSDirectory(_segmentDirectory, _committedSegmentMetadata, ReadMode.mmap).createWriter();
}
@AfterMethod
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
index 0ebc6b9239..a563e9be21 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
@@ -895,6 +895,100 @@ public class TableConfigUtilsTest {
} catch (Exception e) {
Assert.assertEquals(e.getMessage(), "Set compression codec to null for dictionary encoding type");
}
+
+ tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
+ .setNoDictionaryColumns(Arrays.asList("myCol1")).build();
+ try {
+ // Enable forward index disabled flag for a raw column
+ Map<String, String> fieldConfigProperties = new HashMap<>();
+ fieldConfigProperties.put(FieldConfig.FORWARD_INDEX_DISABLED, Boolean.TRUE.toString());
+ FieldConfig fieldConfig = new FieldConfig("myCol1", FieldConfig.EncodingType.RAW, null, null, null, null,
+ fieldConfigProperties);
+ tableConfig.setFieldConfigList(Arrays.asList(fieldConfig));
+ TableConfigUtils.validate(tableConfig, schema);
+ Assert.fail("Should fail for myCol1 without dictionary and with forward index disabled");
+ } catch (Exception e) {
+ Assert.assertEquals(e.getMessage(), "Forward index disabled column myCol1 must have dictionary enabled");
+ }
+
+ try {
+ // Enable forward index disabled flag for a column without inverted index
+ Map<String, String> fieldConfigProperties = new HashMap<>();
+ fieldConfigProperties.put(FieldConfig.FORWARD_INDEX_DISABLED, Boolean.TRUE.toString());
+ FieldConfig fieldConfig = new FieldConfig("myCol2", FieldConfig.EncodingType.DICTIONARY, null, null, null, null,
+ fieldConfigProperties);
+ tableConfig.setFieldConfigList(Arrays.asList(fieldConfig));
+ TableConfigUtils.validate(tableConfig, schema);
+ Assert.fail("Should fail for conflicting myCol2 with forward index disabled but no inverted index");
+ } catch (Exception e) {
+ Assert.assertEquals(e.getMessage(), "Forward index disabled column myCol2 must have inverted index enabled");
+ }
+
+ tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
+ .setNoDictionaryColumns(Arrays.asList("myCol1")).setInvertedIndexColumns(Arrays.asList("myCol2")).build();
+ try {
+ // Enable forward index disabled flag for a column with inverted index
+ Map<String, String> fieldConfigProperties = new HashMap<>();
+ fieldConfigProperties.put(FieldConfig.FORWARD_INDEX_DISABLED, Boolean.TRUE.toString());
+ FieldConfig fieldConfig = new FieldConfig("myCol2", FieldConfig.EncodingType.DICTIONARY,
+ FieldConfig.IndexType.INVERTED, null, null, null, fieldConfigProperties);
+ tableConfig.setFieldConfigList(Arrays.asList(fieldConfig));
+ TableConfigUtils.validate(tableConfig, schema);
+ } catch (Exception e) {
+ Assert.fail("Should not fail as myCol2 has forward index disabled but inverted index enabled");
+ }
+
+ tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
+ .setNoDictionaryColumns(Arrays.asList("myCol1")).setInvertedIndexColumns(Arrays.asList("myCol2"))
+ .setSortedColumn("myCol2").build();
+ try {
+ // Enable forward index disabled flag for a column with inverted index and is sorted
+ Map<String, String> fieldConfigProperties = new HashMap<>();
+ fieldConfigProperties.put(FieldConfig.FORWARD_INDEX_DISABLED, Boolean.TRUE.toString());
+ FieldConfig fieldConfig = new FieldConfig("myCol2", FieldConfig.EncodingType.DICTIONARY,
+ FieldConfig.IndexType.INVERTED, null, null, null, fieldConfigProperties);
+ tableConfig.setFieldConfigList(Arrays.asList(fieldConfig));
+ TableConfigUtils.validate(tableConfig, schema);
+ } catch (Exception e) {
+ Assert.fail("Should not fail for myCol2 with forward index disabled but is sorted, this is a no-op");
+ }
+
+ tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
+ .setNoDictionaryColumns(Arrays.asList("myCol1")).setInvertedIndexColumns(Arrays.asList("myCol2"))
+ .setRangeIndexColumns(Arrays.asList("myCol2")).build();
+ try {
+ // Enable forward index disabled flag for a multi-value column with inverted index and range index
+ Map<String, String> fieldConfigProperties = new HashMap<>();
+ fieldConfigProperties.put(FieldConfig.FORWARD_INDEX_DISABLED, Boolean.TRUE.toString());
+ FieldConfig fieldConfig = new FieldConfig("myCol2", FieldConfig.EncodingType.DICTIONARY,
+ FieldConfig.IndexType.INVERTED, Arrays.asList(FieldConfig.IndexType.INVERTED, FieldConfig.IndexType.RANGE),
+ null, null, fieldConfigProperties);
+ tableConfig.setFieldConfigList(Arrays.asList(fieldConfig));
+ TableConfigUtils.validate(tableConfig, schema);
+ Assert.fail("Should fail for MV myCol2 with forward index disabled but has range and inverted index");
+ } catch (Exception e) {
+ Assert.assertEquals(e.getMessage(), "Feature not supported for multi-value columns with range index. "
+ + "Cannot disable forward index for column myCol2. Disable range index on this column to use this feature");
+ }
+
+ tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
+ .setInvertedIndexColumns(Arrays.asList("myCol1")).setRangeIndexColumns(Arrays.asList("myCol1")).build();
+ try {
+ // Enable forward index disabled flag for a singe-value column with inverted index and range index v1
+ Map<String, String> fieldConfigProperties = new HashMap<>();
+ fieldConfigProperties.put(FieldConfig.FORWARD_INDEX_DISABLED, Boolean.TRUE.toString());
+ FieldConfig fieldConfig = new FieldConfig("myCol1", FieldConfig.EncodingType.DICTIONARY,
+ FieldConfig.IndexType.INVERTED, Arrays.asList(FieldConfig.IndexType.INVERTED, FieldConfig.IndexType.RANGE),
+ null, null, fieldConfigProperties);
+ tableConfig.setFieldConfigList(Arrays.asList(fieldConfig));
+ tableConfig.getIndexingConfig().setRangeIndexVersion(1);
+ TableConfigUtils.validate(tableConfig, schema);
+ Assert.fail("Should fail for SV myCol1 with forward index disabled but has range v1 and inverted index");
+ } catch (Exception e) {
+ Assert.assertEquals(e.getMessage(), "Feature not supported for single-value columns with range index version "
+ + "< 2. Cannot disable forward index for column myCol1. Either disable range index or create range index "
+ + "with version >= 2 to use this feature");
+ }
}
@Test
diff --git a/pinot-segment-local/src/test/resources/data/newColumnsSchemaWithForwardIndexDisabled.json b/pinot-segment-local/src/test/resources/data/newColumnsSchemaWithForwardIndexDisabled.json
new file mode 100644
index 0000000000..83ac30f529
--- /dev/null
+++ b/pinot-segment-local/src/test/resources/data/newColumnsSchemaWithForwardIndexDisabled.json
@@ -0,0 +1,77 @@
+{
+ "schemaName": "testDataMV",
+ "dimensionFieldSpecs": [
+ {
+ "name": "column1",
+ "dataType": "INT"
+ },
+ {
+ "name": "column2",
+ "dataType": "INT"
+ },
+ {
+ "name": "column3",
+ "dataType": "STRING"
+ },
+ {
+ "name": "column4",
+ "dataType": "STRING"
+ },
+ {
+ "name": "column5",
+ "dataType": "STRING"
+ },
+ {
+ "name": "newForwardIndexDisabledColumnSV",
+ "dataType": "STRING"
+ },
+ {
+ "name": "newForwardIndexDisabledColumnMV",
+ "dataType": "STRING",
+ "singleValueField": false
+ },
+ {
+ "name": "column6",
+ "dataType": "INT",
+ "singleValueField": false
+ },
+ {
+ "name": "column7",
+ "dataType": "INT",
+ "singleValueField": false
+ },
+ {
+ "name": "column8",
+ "dataType": "INT"
+ },
+ {
+ "name": "column9",
+ "dataType": "INT"
+ },
+ {
+ "name": "column10",
+ "dataType": "INT"
+ },
+ {
+ "name": "column13",
+ "dataType": "INT"
+ },
+ {
+ "name": "weeksSinceEpochSunday",
+ "dataType": "INT"
+ }
+ ],
+ "metricFieldSpecs": [
+ {
+ "name": "count",
+ "dataType": "INT"
+ }
+ ],
+ "timeFieldSpec": {
+ "incomingGranularitySpec": {
+ "timeType": "DAYS",
+ "dataType": "INT",
+ "name": "daysSinceEpoch"
+ }
+ }
+}
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/IndexCreationContext.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/IndexCreationContext.java
index 732ef48cec..b2e2d75c98 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/IndexCreationContext.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/IndexCreationContext.java
@@ -64,6 +64,8 @@ public interface IndexCreationContext {
Comparable<?> getMaxValue();
+ boolean forwardIndexDisabled();
+
final class Builder {
private File _indexDir;
private int _lengthOfLongestEntry;
@@ -78,6 +80,7 @@ public interface IndexCreationContext {
private boolean _hasDictionary = true;
private Comparable<?> _minValue;
private Comparable<?> _maxValue;
+ private boolean _forwardIndexDisabled;
public Builder withColumnIndexCreationInfo(ColumnIndexCreationInfo columnIndexCreationInfo) {
return withLengthOfLongestEntry(columnIndexCreationInfo.getLengthOfLongestEntry())
@@ -161,10 +164,15 @@ public interface IndexCreationContext {
return this;
}
+ public Builder withforwardIndexDisabled(boolean forwardIndexDisabled) {
+ _forwardIndexDisabled = forwardIndexDisabled;
+ return this;
+ }
+
public Common build() {
return new Common(Objects.requireNonNull(_indexDir), _lengthOfLongestEntry, _maxNumberOfMultiValueElements,
_maxRowLengthInBytes, _onHeap, Objects.requireNonNull(_fieldSpec), _sorted, _cardinality,
- _totalNumberOfEntries, _totalDocs, _hasDictionary, _minValue, _maxValue);
+ _totalNumberOfEntries, _totalDocs, _hasDictionary, _minValue, _maxValue, _forwardIndexDisabled);
}
}
@@ -187,11 +195,13 @@ public interface IndexCreationContext {
private final boolean _hasDictionary;
private final Comparable<?> _minValue;
private final Comparable<?> _maxValue;
+ private final boolean _forwardIndexDisabled;
public Common(File indexDir, int lengthOfLongestEntry,
int maxNumberOfMultiValueElements, int maxRowLengthInBytes, boolean onHeap,
FieldSpec fieldSpec, boolean sorted, int cardinality, int totalNumberOfEntries,
- int totalDocs, boolean hasDictionary, Comparable<?> minValue, Comparable<?> maxValue) {
+ int totalDocs, boolean hasDictionary, Comparable<?> minValue, Comparable<?> maxValue,
+ boolean forwardIndexDisabled) {
_indexDir = indexDir;
_lengthOfLongestEntry = lengthOfLongestEntry;
_maxNumberOfMultiValueElements = maxNumberOfMultiValueElements;
@@ -205,6 +215,7 @@ public interface IndexCreationContext {
_hasDictionary = hasDictionary;
_minValue = minValue;
_maxValue = maxValue;
+ _forwardIndexDisabled = forwardIndexDisabled;
}
public FieldSpec getFieldSpec() {
@@ -261,6 +272,11 @@ public interface IndexCreationContext {
return _maxValue;
}
+ @Override
+ public boolean forwardIndexDisabled() {
+ return _forwardIndexDisabled;
+ }
+
public BloomFilter forBloomFilter(BloomFilterConfig bloomFilterConfig) {
return new BloomFilter(this, bloomFilterConfig);
}
@@ -367,6 +383,11 @@ public interface IndexCreationContext {
public Comparable getMaxValue() {
return _delegate.getMaxValue();
}
+
+ @Override
+ public boolean forwardIndexDisabled() {
+ return _delegate.forwardIndexDisabled();
+ }
}
class BloomFilter extends Wrapper {
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java
index b5539b8b17..da5fbc9dda 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java
@@ -82,6 +82,7 @@ public class SegmentGeneratorConfig implements Serializable {
private final List<String> _textIndexCreationColumns = new ArrayList<>();
private final List<String> _fstIndexCreationColumns = new ArrayList<>();
private final Map<String, JsonIndexConfig> _jsonIndexConfigs = new HashMap<>();
+ private final List<String> _forwardIndexDisabledColumns = new ArrayList<>();
private final Map<String, H3IndexConfig> _h3IndexConfigs = new HashMap<>();
private final Map<String, List<TimestampIndexGranularity>> _timestampIndexConfigs = new HashMap<>();
private final List<String> _columnSortOrder = new ArrayList<>();
@@ -232,6 +233,7 @@ public class SegmentGeneratorConfig implements Serializable {
extractFSTIndexColumnsFromTableConfig(tableConfig);
extractH3IndexConfigsFromTableConfig(tableConfig);
extractCompressionCodecConfigsFromTableConfig(tableConfig);
+ extractForwardIndexDisabledColumnsFromTableConfig(tableConfig);
_fstTypeForFSTIndex = indexingConfig.getFSTIndexType();
_nullHandlingEnabled = indexingConfig.isNullHandlingEnabled();
@@ -360,6 +362,30 @@ public class SegmentGeneratorConfig implements Serializable {
}
}
+ /**
+ * Forward index disabled info for each column is specified
+ * using {@link FieldConfig} model of indicating per column
+ * encoding and indexing information. Since SegmentGeneratorConfig
+ * is created from TableConfig, we extract the forward index disabled info
+ * from fieldConfigList in TableConfig via the properties bag.
+ * @param tableConfig table config
+ */
+ private void extractForwardIndexDisabledColumnsFromTableConfig(TableConfig tableConfig) {
+ List<FieldConfig> fieldConfigList = tableConfig.getFieldConfigList();
+ if (fieldConfigList != null) {
+ for (FieldConfig fieldConfig : fieldConfigList) {
+ Map<String, String> fieldConfigProperties = fieldConfig.getProperties();
+ if (fieldConfigProperties != null) {
+ boolean forwardIndexDisabled = Boolean.parseBoolean(fieldConfigProperties
+ .getOrDefault(FieldConfig.FORWARD_INDEX_DISABLED, FieldConfig.DEFAULT_FORWARD_INDEX_DISABLED));
+ if (forwardIndexDisabled) {
+ _forwardIndexDisabledColumns.add(fieldConfig.getName());
+ }
+ }
+ }
+ }
+ }
+
public Map<String, String> getCustomProperties() {
return _customProperties;
}
@@ -426,6 +452,10 @@ public class SegmentGeneratorConfig implements Serializable {
return _jsonIndexConfigs;
}
+ public List<String> getForwardIndexDisabledColumns() {
+ return _forwardIndexDisabledColumns;
+ }
+
public Map<String, H3IndexConfig> getH3IndexConfigs() {
return _h3IndexConfigs;
}
@@ -461,6 +491,20 @@ public class SegmentGeneratorConfig implements Serializable {
}
}
+ @VisibleForTesting
+ public void setRangeIndexCreationColumns(List<String> rangeIndexCreationColumns) {
+ if (rangeIndexCreationColumns != null) {
+ _rangeIndexCreationColumns.addAll(rangeIndexCreationColumns);
+ }
+ }
+
+ @VisibleForTesting
+ public void setForwardIndexDisabledColumns(List<String> forwardIndexDisabledColumns) {
+ if (forwardIndexDisabledColumns != null) {
+ _forwardIndexDisabledColumns.addAll(forwardIndexDisabledColumns);
+ }
+ }
+
@VisibleForTesting
public void setColumnProperties(Map<String, Map<String, String>> columnProperties) {
_columnProperties = columnProperties;
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/FieldConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/FieldConfig.java
index 82bae1b46c..08215a84b9 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/FieldConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/FieldConfig.java
@@ -47,6 +47,9 @@ public class FieldConfig extends BaseJsonConfig {
// "native" for native, default is Lucene
public static final String TEXT_FST_TYPE = "fstType";
public static final String TEXT_NATIVE_FST_LITERAL = "native";
+ // Config to disable forward index
+ public static final String FORWARD_INDEX_DISABLED = "forwardIndexDisabled";
+ public static final String DEFAULT_FORWARD_INDEX_DISABLED = Boolean.FALSE.toString();
private final String _name;
private final EncodingType _encodingType;
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/DictionaryToRawIndexConverter.java b/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/DictionaryToRawIndexConverter.java
index 0367203eb9..fc937bc1d0 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/DictionaryToRawIndexConverter.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/DictionaryToRawIndexConverter.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.tools.segment.converter;
+import com.google.common.base.Preconditions;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Field;
@@ -295,7 +296,9 @@ public class DictionaryToRawIndexConverter {
private void convertOneColumn(IndexSegment segment, String column, File newSegment)
throws IOException {
DataSource dataSource = segment.getDataSource(column);
- ForwardIndexReader reader = dataSource.getForwardIndex();
+ ForwardIndexReader forwardIndexReader = dataSource.getForwardIndex();
+ Preconditions.checkState(forwardIndexReader != null,
+ "Forward index disabled for column: %s, cannot convert column!", column);
Dictionary dictionary = dataSource.getDictionary();
if (dictionary == null) {
@@ -317,36 +320,36 @@ public class DictionaryToRawIndexConverter {
try (ForwardIndexCreator rawIndexCreator = DefaultIndexCreatorProvider
.getRawIndexCreatorForSVColumn(newSegment, compressionType, column, storedType, numDocs, lengthOfLongestEntry,
false, BaseChunkSVForwardIndexWriter.DEFAULT_VERSION);
- ForwardIndexReaderContext readerContext = reader.createContext()) {
+ ForwardIndexReaderContext readerContext = forwardIndexReader.createContext()) {
switch (storedType) {
case INT:
for (int docId = 0; docId < numDocs; docId++) {
- rawIndexCreator.putInt(dictionary.getIntValue(reader.getDictId(docId, readerContext)));
+ rawIndexCreator.putInt(dictionary.getIntValue(forwardIndexReader.getDictId(docId, readerContext)));
}
break;
case LONG:
for (int docId = 0; docId < numDocs; docId++) {
- rawIndexCreator.putLong(dictionary.getLongValue(reader.getDictId(docId, readerContext)));
+ rawIndexCreator.putLong(dictionary.getLongValue(forwardIndexReader.getDictId(docId, readerContext)));
}
break;
case FLOAT:
for (int docId = 0; docId < numDocs; docId++) {
- rawIndexCreator.putFloat(dictionary.getFloatValue(reader.getDictId(docId, readerContext)));
+ rawIndexCreator.putFloat(dictionary.getFloatValue(forwardIndexReader.getDictId(docId, readerContext)));
}
break;
case DOUBLE:
for (int docId = 0; docId < numDocs; docId++) {
- rawIndexCreator.putDouble(dictionary.getDoubleValue(reader.getDictId(docId, readerContext)));
+ rawIndexCreator.putDouble(dictionary.getDoubleValue(forwardIndexReader.getDictId(docId, readerContext)));
}
break;
case STRING:
for (int docId = 0; docId < numDocs; docId++) {
- rawIndexCreator.putString(dictionary.getStringValue(reader.getDictId(docId, readerContext)));
+ rawIndexCreator.putString(dictionary.getStringValue(forwardIndexReader.getDictId(docId, readerContext)));
}
break;
case BYTES:
for (int docId = 0; docId < numDocs; docId++) {
- rawIndexCreator.putBytes(dictionary.getBytesValue(reader.getDictId(docId, readerContext)));
+ rawIndexCreator.putBytes(dictionary.getBytesValue(forwardIndexReader.getDictId(docId, readerContext)));
}
break;
default:
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org