You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2023/07/03 23:19:13 UTC
[pinot] branch master updated: Ensure min/max value generation in the segment metadata. (#10891)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new ffbc5b0e52 Ensure min/max value generation in the segment metadata. (#10891)
ffbc5b0e52 is described below
commit ffbc5b0e52e2c8ba507f088ec1fd39996f22e8c1
Author: Abhishek Sharma <ab...@spothero.com>
AuthorDate: Mon Jul 3 19:19:07 2023 -0400
Ensure min/max value generation in the segment metadata. (#10891)
---
.../ColumnMinMaxValueGenerator.java | 287 +++++++++++++++++----
.../index/loader/SegmentPreProcessorTest.java | 9 +-
2 files changed, 245 insertions(+), 51 deletions(-)
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/columnminmaxvalue/ColumnMinMaxValueGenerator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/columnminmaxvalue/ColumnMinMaxValueGenerator.java
index 2298d72cd9..5cfa637ee2 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/columnminmaxvalue/ColumnMinMaxValueGenerator.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/columnminmaxvalue/ColumnMinMaxValueGenerator.java
@@ -22,6 +22,7 @@ import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.segment.local.segment.creator.impl.SegmentColumnarIndexCreator;
import org.apache.pinot.segment.local.segment.index.readers.BytesDictionary;
import org.apache.pinot.segment.local.segment.index.readers.DoubleDictionary;
@@ -29,6 +30,11 @@ import org.apache.pinot.segment.local.segment.index.readers.FloatDictionary;
import org.apache.pinot.segment.local.segment.index.readers.IntDictionary;
import org.apache.pinot.segment.local.segment.index.readers.LongDictionary;
import org.apache.pinot.segment.local.segment.index.readers.StringDictionary;
+import org.apache.pinot.segment.local.segment.index.readers.forward.ChunkReaderContext;
+import org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkMVForwardIndexReader;
+import org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkSVForwardIndexReader;
+import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkMVForwardIndexReader;
+import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkSVForwardIndexReader;
import org.apache.pinot.segment.spi.ColumnMetadata;
import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.segment.spi.index.StandardIndexes;
@@ -37,6 +43,7 @@ import org.apache.pinot.segment.spi.store.SegmentDirectory;
import org.apache.pinot.segment.spi.utils.SegmentMetadataUtils;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.ByteArray;
import static org.apache.pinot.spi.data.FieldSpec.DataType;
@@ -119,65 +126,257 @@ public class ColumnMinMaxValueGenerator {
private boolean needAddColumnMinMaxValueForColumn(String columnName) {
ColumnMetadata columnMetadata = _segmentMetadata.getColumnMetadataFor(columnName);
- return columnMetadata.hasDictionary() && columnMetadata.getMinValue() == null
- && columnMetadata.getMaxValue() == null && !columnMetadata.isMinMaxValueInvalid();
+ return columnMetadata.getMinValue() == null && columnMetadata.getMaxValue() == null
+ && !columnMetadata.isMinMaxValueInvalid();
}
private void addColumnMinMaxValueForColumn(String columnName)
throws Exception {
- // Skip column without dictionary or with min/max value already set
+ // Skip column with min/max value already set
ColumnMetadata columnMetadata = _segmentMetadata.getColumnMetadataFor(columnName);
- if (!columnMetadata.hasDictionary() || columnMetadata.getMinValue() != null
- || columnMetadata.getMaxValue() != null) {
+ if (columnMetadata.getMinValue() != null || columnMetadata.getMaxValue() != null) {
return;
}
- PinotDataBuffer dictionaryBuffer = _segmentWriter.getIndexFor(columnName, StandardIndexes.dictionary());
DataType dataType = columnMetadata.getDataType().getStoredType();
- int length = columnMetadata.getCardinality();
- switch (dataType) {
- case INT:
- try (IntDictionary intDictionary = new IntDictionary(dictionaryBuffer, length)) {
- SegmentColumnarIndexCreator.addColumnMinMaxValueInfo(_segmentProperties, columnName,
- intDictionary.getStringValue(0), intDictionary.getStringValue(length - 1));
- }
- break;
- case LONG:
- try (LongDictionary longDictionary = new LongDictionary(dictionaryBuffer, length)) {
+ if (columnMetadata.hasDictionary()) {
+ PinotDataBuffer dictionaryBuffer = _segmentWriter.getIndexFor(columnName, StandardIndexes.dictionary());
+ int length = columnMetadata.getCardinality();
+ switch (dataType) {
+ case INT:
+ try (IntDictionary intDictionary = new IntDictionary(dictionaryBuffer, length)) {
+ SegmentColumnarIndexCreator.addColumnMinMaxValueInfo(_segmentProperties, columnName,
+ intDictionary.getStringValue(0), intDictionary.getStringValue(length - 1));
+ }
+ break;
+ case LONG:
+ try (LongDictionary longDictionary = new LongDictionary(dictionaryBuffer, length)) {
+ SegmentColumnarIndexCreator.addColumnMinMaxValueInfo(_segmentProperties, columnName,
+ longDictionary.getStringValue(0), longDictionary.getStringValue(length - 1));
+ }
+ break;
+ case FLOAT:
+ try (FloatDictionary floatDictionary = new FloatDictionary(dictionaryBuffer, length)) {
+ SegmentColumnarIndexCreator.addColumnMinMaxValueInfo(_segmentProperties, columnName,
+ floatDictionary.getStringValue(0), floatDictionary.getStringValue(length - 1));
+ }
+ break;
+ case DOUBLE:
+ try (DoubleDictionary doubleDictionary = new DoubleDictionary(dictionaryBuffer, length)) {
+ SegmentColumnarIndexCreator.addColumnMinMaxValueInfo(_segmentProperties, columnName,
+ doubleDictionary.getStringValue(0), doubleDictionary.getStringValue(length - 1));
+ }
+ break;
+ case STRING:
+ try (StringDictionary stringDictionary = new StringDictionary(dictionaryBuffer, length,
+ columnMetadata.getColumnMaxLength())) {
+ SegmentColumnarIndexCreator.addColumnMinMaxValueInfo(_segmentProperties, columnName,
+ stringDictionary.getStringValue(0), stringDictionary.getStringValue(length - 1));
+ }
+ break;
+ case BYTES:
+ try (BytesDictionary bytesDictionary = new BytesDictionary(dictionaryBuffer, length,
+ columnMetadata.getColumnMaxLength())) {
+ SegmentColumnarIndexCreator.addColumnMinMaxValueInfo(_segmentProperties, columnName,
+ bytesDictionary.getStringValue(0), bytesDictionary.getStringValue(length - 1));
+ }
+ break;
+ default:
+ throw new IllegalStateException("Unsupported data type: " + dataType + " for column: " + columnName);
+ }
+ } else {
+ // setting min/max for non-dictionary columns.
+ int numDocs = columnMetadata.getTotalDocs();
+ boolean isSingleValueField = _segmentMetadata.getSchema().getFieldSpecFor(columnName).isSingleValueField();
+ PinotDataBuffer forwardBuffer = _segmentWriter.getIndexFor(columnName, StandardIndexes.forward());
+ switch (dataType) {
+ case INT: {
+ int min = Integer.MAX_VALUE;
+ int max = Integer.MIN_VALUE;
+ if (isSingleValueField) {
+ try (FixedByteChunkSVForwardIndexReader rawIndexReader = new FixedByteChunkSVForwardIndexReader(
+ forwardBuffer, DataType.INT); ChunkReaderContext readerContext = rawIndexReader.createContext()) {
+ for (int docId = 0; docId < numDocs; docId++) {
+ int value = rawIndexReader.getInt(docId, readerContext);
+ min = Math.min(min, value);
+ max = Math.max(max, value);
+ }
+ }
+ } else {
+ try (FixedByteChunkMVForwardIndexReader rawIndexReader = new FixedByteChunkMVForwardIndexReader(
+ forwardBuffer, DataType.INT); ChunkReaderContext readerContext = rawIndexReader.createContext()) {
+ for (int docId = 0; docId < numDocs; docId++) {
+ int[] value = rawIndexReader.getIntMV(docId, readerContext);
+ for (int i = 0; i < value.length; i++) {
+ min = Math.min(min, value[i]);
+ max = Math.max(max, value[i]);
+ }
+ }
+ }
+ }
SegmentColumnarIndexCreator.addColumnMinMaxValueInfo(_segmentProperties, columnName,
- longDictionary.getStringValue(0), longDictionary.getStringValue(length - 1));
- }
- break;
- case FLOAT:
- try (FloatDictionary floatDictionary = new FloatDictionary(dictionaryBuffer, length)) {
+ String.valueOf(min), String.valueOf(max));
+ }
+ break;
+ case LONG: {
+ long min = Long.MAX_VALUE;
+ long max = Long.MIN_VALUE;
+ if (isSingleValueField) {
+ try (FixedByteChunkSVForwardIndexReader rawIndexReader = new FixedByteChunkSVForwardIndexReader(
+ forwardBuffer, DataType.LONG); ChunkReaderContext readerContext = rawIndexReader.createContext()) {
+ for (int docId = 0; docId < numDocs; docId++) {
+ long value = rawIndexReader.getLong(docId, readerContext);
+ min = Math.min(min, value);
+ max = Math.max(max, value);
+ }
+ }
+ } else {
+ try (FixedByteChunkMVForwardIndexReader rawIndexReader = new FixedByteChunkMVForwardIndexReader(
+ forwardBuffer, DataType.LONG); ChunkReaderContext readerContext = rawIndexReader.createContext()) {
+ for (int docId = 0; docId < numDocs; docId++) {
+ long[] value = rawIndexReader.getLongMV(docId, readerContext);
+ for (int i = 0; i < value.length; i++) {
+ min = Math.min(min, value[i]);
+ max = Math.max(max, value[i]);
+ }
+ }
+ }
+ }
SegmentColumnarIndexCreator.addColumnMinMaxValueInfo(_segmentProperties, columnName,
- floatDictionary.getStringValue(0), floatDictionary.getStringValue(length - 1));
- }
- break;
- case DOUBLE:
- try (DoubleDictionary doubleDictionary = new DoubleDictionary(dictionaryBuffer, length)) {
+ String.valueOf(min), String.valueOf(max));
+ }
+ break;
+ case FLOAT: {
+ float min = Float.MAX_VALUE;
+ float max = Float.MIN_VALUE;
+ if (isSingleValueField) {
+ try (FixedByteChunkSVForwardIndexReader rawIndexReader = new FixedByteChunkSVForwardIndexReader(
+ forwardBuffer, DataType.FLOAT); ChunkReaderContext readerContext = rawIndexReader.createContext()) {
+ for (int docId = 0; docId < numDocs; docId++) {
+ float value = rawIndexReader.getFloat(docId, readerContext);
+ min = Math.min(min, value);
+ max = Math.max(max, value);
+ }
+ }
+ } else {
+ try (FixedByteChunkMVForwardIndexReader rawIndexReader = new FixedByteChunkMVForwardIndexReader(
+ forwardBuffer, DataType.FLOAT); ChunkReaderContext readerContext = rawIndexReader.createContext()) {
+ for (int docId = 0; docId < numDocs; docId++) {
+ float[] value = rawIndexReader.getFloatMV(docId, readerContext);
+ for (int i = 0; i < value.length; i++) {
+ min = Math.min(min, value[i]);
+ max = Math.max(max, value[i]);
+ }
+ }
+ }
+ }
SegmentColumnarIndexCreator.addColumnMinMaxValueInfo(_segmentProperties, columnName,
- doubleDictionary.getStringValue(0), doubleDictionary.getStringValue(length - 1));
- }
- break;
- case STRING:
- try (StringDictionary stringDictionary = new StringDictionary(dictionaryBuffer, length,
- columnMetadata.getColumnMaxLength())) {
+ String.valueOf(min), String.valueOf(max));
+ }
+ break;
+ case DOUBLE: {
+ double min = Double.MAX_VALUE;
+ double max = Double.MIN_VALUE;
+ if (isSingleValueField) {
+ try (FixedByteChunkSVForwardIndexReader rawIndexReader = new FixedByteChunkSVForwardIndexReader(
+ forwardBuffer, DataType.DOUBLE); ChunkReaderContext readerContext = rawIndexReader.createContext()) {
+ for (int docId = 0; docId < numDocs; docId++) {
+ double value = rawIndexReader.getDouble(docId, readerContext);
+ min = Math.min(min, value);
+ max = Math.max(max, value);
+ }
+ }
+ } else {
+ try (FixedByteChunkMVForwardIndexReader rawIndexReader = new FixedByteChunkMVForwardIndexReader(
+ forwardBuffer, DataType.DOUBLE); ChunkReaderContext readerContext = rawIndexReader.createContext()) {
+ for (int docId = 0; docId < numDocs; docId++) {
+ double[] value = rawIndexReader.getDoubleMV(docId, readerContext);
+ for (int i = 0; i < value.length; i++) {
+ min = Math.min(min, value[i]);
+ max = Math.max(max, value[i]);
+ }
+ }
+ }
+ }
SegmentColumnarIndexCreator.addColumnMinMaxValueInfo(_segmentProperties, columnName,
- stringDictionary.getStringValue(0), stringDictionary.getStringValue(length - 1));
- }
- break;
- case BYTES:
- try (BytesDictionary bytesDictionary = new BytesDictionary(dictionaryBuffer, length,
- columnMetadata.getColumnMaxLength())) {
+ String.valueOf(min), String.valueOf(max));
+ }
+ break;
+ case STRING: {
+ String min = null;
+ String max = null;
+ if (isSingleValueField) {
+ try (VarByteChunkSVForwardIndexReader rawIndexReader = new VarByteChunkSVForwardIndexReader(forwardBuffer,
+ DataType.STRING); ChunkReaderContext readerContext = rawIndexReader.createContext()) {
+ for (int docId = 0; docId < numDocs; docId++) {
+ String value = rawIndexReader.getString(docId, readerContext);
+ if (min == null || StringUtils.compare(min, value) > 0) {
+ min = value;
+ }
+ if (max == null || StringUtils.compare(max, value) < 0) {
+ max = value;
+ }
+ }
+ }
+ } else {
+ try (VarByteChunkMVForwardIndexReader rawIndexReader = new VarByteChunkMVForwardIndexReader(forwardBuffer,
+ DataType.STRING); ChunkReaderContext readerContext = rawIndexReader.createContext()) {
+ for (int docId = 0; docId < numDocs; docId++) {
+ String[] value = rawIndexReader.getStringMV(docId, readerContext);
+ for (int i = 0; i < value.length; i++) {
+ if (min == null || StringUtils.compare(min, value[i]) > 0) {
+ min = value[i];
+ }
+ if (max == null || StringUtils.compare(max, value[i]) < 0) {
+ max = value[i];
+ }
+ }
+ }
+ }
+ }
+ SegmentColumnarIndexCreator.addColumnMinMaxValueInfo(_segmentProperties, columnName, min, max);
+ }
+ break;
+ case BYTES: {
+ byte[] min = null;
+ byte[] max = null;
+ if (isSingleValueField) {
+ try (VarByteChunkSVForwardIndexReader rawIndexReader = new VarByteChunkSVForwardIndexReader(forwardBuffer,
+ DataType.BYTES); ChunkReaderContext readerContext = rawIndexReader.createContext()) {
+ for (int docId = 0; docId < numDocs; docId++) {
+ byte[] value = rawIndexReader.getBytes(docId, readerContext);
+ if (min == null || ByteArray.compare(value, min) > 0) {
+ min = value;
+ }
+ if (max == null || ByteArray.compare(value, max) < 0) {
+ max = value;
+ }
+ }
+ }
+ } else {
+ try (VarByteChunkMVForwardIndexReader rawIndexReader = new VarByteChunkMVForwardIndexReader(forwardBuffer,
+ DataType.BYTES); ChunkReaderContext readerContext = rawIndexReader.createContext()) {
+ for (int docId = 0; docId < numDocs; docId++) {
+ byte[][] value = rawIndexReader.getBytesMV(docId, readerContext);
+ for (int i = 0; i < value.length; i++) {
+ if (min == null || ByteArray.compare(value[i], min) > 0) {
+ min = value[i];
+ }
+ if (max == null || ByteArray.compare(value[i], max) < 0) {
+ max = value[i];
+ }
+ }
+ }
+ }
+ }
SegmentColumnarIndexCreator.addColumnMinMaxValueInfo(_segmentProperties, columnName,
- bytesDictionary.getStringValue(0), bytesDictionary.getStringValue(length - 1));
- }
- break;
- default:
- throw new IllegalStateException("Unsupported data type: " + dataType + " for column: " + columnName);
+ String.valueOf(new ByteArray(min)), String.valueOf(new ByteArray(max)));
+ }
+ break;
+ default:
+ throw new IllegalStateException("Unsupported data type: " + dataType + " for column: " + columnName);
+ }
}
-
_minMaxValueAdded = true;
}
}
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java
index 16d265c04d..0bdaefca0a 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java
@@ -1755,13 +1755,8 @@ public class SegmentPreProcessorTest {
}
segmentMetadata = new SegmentMetadataImpl(_indexDir);
segmentMetadata.getColumnMetadataMap().forEach((k, v) -> {
- if (v.hasDictionary()) {
- assertNotNull(v.getMinValue(), "checking column: " + k);
- assertNotNull(v.getMaxValue(), "checking column: " + k);
- } else {
- assertNull(v.getMinValue(), "checking column: " + k);
- assertNull(v.getMaxValue(), "checking column: " + k);
- }
+ assertNotNull(v.getMinValue(), "checking column: " + k);
+ assertNotNull(v.getMaxValue(), "checking column: " + k);
});
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org