You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2023/07/03 23:19:13 UTC

[pinot] branch master updated: Ensure min/max value generation in the segment metadata. (#10891)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new ffbc5b0e52 Ensure min/max value generation in the segment metadata.  (#10891)
ffbc5b0e52 is described below

commit ffbc5b0e52e2c8ba507f088ec1fd39996f22e8c1
Author: Abhishek Sharma <ab...@spothero.com>
AuthorDate: Mon Jul 3 19:19:07 2023 -0400

    Ensure min/max value generation in the segment metadata.  (#10891)
---
 .../ColumnMinMaxValueGenerator.java                | 287 +++++++++++++++++----
 .../index/loader/SegmentPreProcessorTest.java      |   9 +-
 2 files changed, 245 insertions(+), 51 deletions(-)

diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/columnminmaxvalue/ColumnMinMaxValueGenerator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/columnminmaxvalue/ColumnMinMaxValueGenerator.java
index 2298d72cd9..5cfa637ee2 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/columnminmaxvalue/ColumnMinMaxValueGenerator.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/columnminmaxvalue/ColumnMinMaxValueGenerator.java
@@ -22,6 +22,7 @@ import com.google.common.base.Preconditions;
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pinot.segment.local.segment.creator.impl.SegmentColumnarIndexCreator;
 import org.apache.pinot.segment.local.segment.index.readers.BytesDictionary;
 import org.apache.pinot.segment.local.segment.index.readers.DoubleDictionary;
@@ -29,6 +30,11 @@ import org.apache.pinot.segment.local.segment.index.readers.FloatDictionary;
 import org.apache.pinot.segment.local.segment.index.readers.IntDictionary;
 import org.apache.pinot.segment.local.segment.index.readers.LongDictionary;
 import org.apache.pinot.segment.local.segment.index.readers.StringDictionary;
+import org.apache.pinot.segment.local.segment.index.readers.forward.ChunkReaderContext;
+import org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkMVForwardIndexReader;
+import org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkSVForwardIndexReader;
+import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkMVForwardIndexReader;
+import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkSVForwardIndexReader;
 import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.SegmentMetadata;
 import org.apache.pinot.segment.spi.index.StandardIndexes;
@@ -37,6 +43,7 @@ import org.apache.pinot.segment.spi.store.SegmentDirectory;
 import org.apache.pinot.segment.spi.utils.SegmentMetadataUtils;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.ByteArray;
 
 import static org.apache.pinot.spi.data.FieldSpec.DataType;
 
@@ -119,65 +126,257 @@ public class ColumnMinMaxValueGenerator {
 
   private boolean needAddColumnMinMaxValueForColumn(String columnName) {
     ColumnMetadata columnMetadata = _segmentMetadata.getColumnMetadataFor(columnName);
-    return columnMetadata.hasDictionary() && columnMetadata.getMinValue() == null
-        && columnMetadata.getMaxValue() == null && !columnMetadata.isMinMaxValueInvalid();
+    return columnMetadata.getMinValue() == null && columnMetadata.getMaxValue() == null
+        && !columnMetadata.isMinMaxValueInvalid();
   }
 
   private void addColumnMinMaxValueForColumn(String columnName)
       throws Exception {
-    // Skip column without dictionary or with min/max value already set
+    // Skip column with min/max value already set
     ColumnMetadata columnMetadata = _segmentMetadata.getColumnMetadataFor(columnName);
-    if (!columnMetadata.hasDictionary() || columnMetadata.getMinValue() != null
-        || columnMetadata.getMaxValue() != null) {
+    if (columnMetadata.getMinValue() != null || columnMetadata.getMaxValue() != null) {
       return;
     }
 
-    PinotDataBuffer dictionaryBuffer = _segmentWriter.getIndexFor(columnName, StandardIndexes.dictionary());
     DataType dataType = columnMetadata.getDataType().getStoredType();
-    int length = columnMetadata.getCardinality();
-    switch (dataType) {
-      case INT:
-        try (IntDictionary intDictionary = new IntDictionary(dictionaryBuffer, length)) {
-          SegmentColumnarIndexCreator.addColumnMinMaxValueInfo(_segmentProperties, columnName,
-              intDictionary.getStringValue(0), intDictionary.getStringValue(length - 1));
-        }
-        break;
-      case LONG:
-        try (LongDictionary longDictionary = new LongDictionary(dictionaryBuffer, length)) {
+    if (columnMetadata.hasDictionary()) {
+      PinotDataBuffer dictionaryBuffer = _segmentWriter.getIndexFor(columnName, StandardIndexes.dictionary());
+      int length = columnMetadata.getCardinality();
+      switch (dataType) {
+        case INT:
+          try (IntDictionary intDictionary = new IntDictionary(dictionaryBuffer, length)) {
+            SegmentColumnarIndexCreator.addColumnMinMaxValueInfo(_segmentProperties, columnName,
+                intDictionary.getStringValue(0), intDictionary.getStringValue(length - 1));
+          }
+          break;
+        case LONG:
+          try (LongDictionary longDictionary = new LongDictionary(dictionaryBuffer, length)) {
+            SegmentColumnarIndexCreator.addColumnMinMaxValueInfo(_segmentProperties, columnName,
+                longDictionary.getStringValue(0), longDictionary.getStringValue(length - 1));
+          }
+          break;
+        case FLOAT:
+          try (FloatDictionary floatDictionary = new FloatDictionary(dictionaryBuffer, length)) {
+            SegmentColumnarIndexCreator.addColumnMinMaxValueInfo(_segmentProperties, columnName,
+                floatDictionary.getStringValue(0), floatDictionary.getStringValue(length - 1));
+          }
+          break;
+        case DOUBLE:
+          try (DoubleDictionary doubleDictionary = new DoubleDictionary(dictionaryBuffer, length)) {
+            SegmentColumnarIndexCreator.addColumnMinMaxValueInfo(_segmentProperties, columnName,
+                doubleDictionary.getStringValue(0), doubleDictionary.getStringValue(length - 1));
+          }
+          break;
+        case STRING:
+          try (StringDictionary stringDictionary = new StringDictionary(dictionaryBuffer, length,
+              columnMetadata.getColumnMaxLength())) {
+            SegmentColumnarIndexCreator.addColumnMinMaxValueInfo(_segmentProperties, columnName,
+                stringDictionary.getStringValue(0), stringDictionary.getStringValue(length - 1));
+          }
+          break;
+        case BYTES:
+          try (BytesDictionary bytesDictionary = new BytesDictionary(dictionaryBuffer, length,
+              columnMetadata.getColumnMaxLength())) {
+            SegmentColumnarIndexCreator.addColumnMinMaxValueInfo(_segmentProperties, columnName,
+                bytesDictionary.getStringValue(0), bytesDictionary.getStringValue(length - 1));
+          }
+          break;
+        default:
+          throw new IllegalStateException("Unsupported data type: " + dataType + " for column: " + columnName);
+      }
+    } else {
+      // setting min/max for non-dictionary columns.
+      int numDocs = columnMetadata.getTotalDocs();
+      boolean isSingleValueField = _segmentMetadata.getSchema().getFieldSpecFor(columnName).isSingleValueField();
+      PinotDataBuffer forwardBuffer = _segmentWriter.getIndexFor(columnName, StandardIndexes.forward());
+      switch (dataType) {
+        case INT: {
+          int min = Integer.MAX_VALUE;
+          int max = Integer.MIN_VALUE;
+          if (isSingleValueField) {
+            try (FixedByteChunkSVForwardIndexReader rawIndexReader = new FixedByteChunkSVForwardIndexReader(
+                forwardBuffer, DataType.INT); ChunkReaderContext readerContext = rawIndexReader.createContext()) {
+                for (int docId = 0; docId < numDocs; docId++) {
+                  int value = rawIndexReader.getInt(docId, readerContext);
+                  min = Math.min(min, value);
+                  max = Math.max(max, value);
+                }
+            }
+          } else {
+            try (FixedByteChunkMVForwardIndexReader rawIndexReader = new FixedByteChunkMVForwardIndexReader(
+                forwardBuffer, DataType.INT); ChunkReaderContext readerContext = rawIndexReader.createContext()) {
+                for (int docId = 0; docId < numDocs; docId++) {
+                  int[] value = rawIndexReader.getIntMV(docId, readerContext);
+                  for (int i = 0; i < value.length; i++) {
+                    min = Math.min(min, value[i]);
+                    max = Math.max(max, value[i]);
+                  }
+                }
+            }
+          }
           SegmentColumnarIndexCreator.addColumnMinMaxValueInfo(_segmentProperties, columnName,
-              longDictionary.getStringValue(0), longDictionary.getStringValue(length - 1));
-        }
-        break;
-      case FLOAT:
-        try (FloatDictionary floatDictionary = new FloatDictionary(dictionaryBuffer, length)) {
+              String.valueOf(min), String.valueOf(max));
+         }
+         break;
+        case LONG: {
+          long min = Long.MAX_VALUE;
+          long max = Long.MIN_VALUE;
+          if (isSingleValueField) {
+            try (FixedByteChunkSVForwardIndexReader rawIndexReader = new FixedByteChunkSVForwardIndexReader(
+                forwardBuffer, DataType.LONG); ChunkReaderContext readerContext = rawIndexReader.createContext()) {
+                for (int docId = 0; docId < numDocs; docId++) {
+                  long value = rawIndexReader.getLong(docId, readerContext);
+                  min = Math.min(min, value);
+                  max = Math.max(max, value);
+                }
+            }
+          } else {
+            try (FixedByteChunkMVForwardIndexReader rawIndexReader = new FixedByteChunkMVForwardIndexReader(
+                forwardBuffer, DataType.LONG); ChunkReaderContext readerContext = rawIndexReader.createContext()) {
+                for (int docId = 0; docId < numDocs; docId++) {
+                  long[] value = rawIndexReader.getLongMV(docId, readerContext);
+                  for (int i = 0; i < value.length; i++) {
+                    min = Math.min(min, value[i]);
+                    max = Math.max(max, value[i]);
+                  }
+                }
+            }
+          }
           SegmentColumnarIndexCreator.addColumnMinMaxValueInfo(_segmentProperties, columnName,
-              floatDictionary.getStringValue(0), floatDictionary.getStringValue(length - 1));
-        }
-        break;
-      case DOUBLE:
-        try (DoubleDictionary doubleDictionary = new DoubleDictionary(dictionaryBuffer, length)) {
+                String.valueOf(min), String.valueOf(max));
+         }
+         break;
+        case FLOAT: {
+          float min = Float.MAX_VALUE;
+          float max = Float.MIN_VALUE;
+          if (isSingleValueField) {
+            try (FixedByteChunkSVForwardIndexReader rawIndexReader = new FixedByteChunkSVForwardIndexReader(
+                forwardBuffer, DataType.FLOAT); ChunkReaderContext readerContext = rawIndexReader.createContext()) {
+                for (int docId = 0; docId < numDocs; docId++) {
+                  float value = rawIndexReader.getFloat(docId, readerContext);
+                  min = Math.min(min, value);
+                  max = Math.max(max, value);
+                }
+            }
+          } else {
+            try (FixedByteChunkMVForwardIndexReader rawIndexReader = new FixedByteChunkMVForwardIndexReader(
+                forwardBuffer, DataType.FLOAT); ChunkReaderContext readerContext = rawIndexReader.createContext()) {
+                for (int docId = 0; docId < numDocs; docId++) {
+                  float[] value = rawIndexReader.getFloatMV(docId, readerContext);
+                  for (int i = 0; i < value.length; i++) {
+                    min = Math.min(min, value[i]);
+                    max = Math.max(max, value[i]);
+                  }
+                }
+            }
+          }
           SegmentColumnarIndexCreator.addColumnMinMaxValueInfo(_segmentProperties, columnName,
-              doubleDictionary.getStringValue(0), doubleDictionary.getStringValue(length - 1));
-        }
-        break;
-      case STRING:
-        try (StringDictionary stringDictionary = new StringDictionary(dictionaryBuffer, length,
-            columnMetadata.getColumnMaxLength())) {
+                String.valueOf(min), String.valueOf(max));
+         }
+         break;
+        case DOUBLE: {
+          double min = Double.MAX_VALUE;
+          double max = Double.MIN_VALUE;
+          if (isSingleValueField) {
+            try (FixedByteChunkSVForwardIndexReader rawIndexReader = new FixedByteChunkSVForwardIndexReader(
+                forwardBuffer, DataType.DOUBLE); ChunkReaderContext readerContext = rawIndexReader.createContext()) {
+                for (int docId = 0; docId < numDocs; docId++) {
+                  double value = rawIndexReader.getDouble(docId, readerContext);
+                  min = Math.min(min, value);
+                  max = Math.max(max, value);
+                }
+            }
+          } else {
+            try (FixedByteChunkMVForwardIndexReader rawIndexReader = new FixedByteChunkMVForwardIndexReader(
+                forwardBuffer, DataType.DOUBLE); ChunkReaderContext readerContext = rawIndexReader.createContext()) {
+                for (int docId = 0; docId < numDocs; docId++) {
+                  double[] value = rawIndexReader.getDoubleMV(docId, readerContext);
+                  for (int i = 0; i < value.length; i++) {
+                    min = Math.min(min, value[i]);
+                    max = Math.max(max, value[i]);
+                  }
+                }
+            }
+          }
           SegmentColumnarIndexCreator.addColumnMinMaxValueInfo(_segmentProperties, columnName,
-              stringDictionary.getStringValue(0), stringDictionary.getStringValue(length - 1));
-        }
-        break;
-      case BYTES:
-        try (BytesDictionary bytesDictionary = new BytesDictionary(dictionaryBuffer, length,
-            columnMetadata.getColumnMaxLength())) {
+                String.valueOf(min), String.valueOf(max));
+          }
+          break;
+        case STRING: {
+          String min = null;
+          String max = null;
+          if (isSingleValueField) {
+            try (VarByteChunkSVForwardIndexReader rawIndexReader = new VarByteChunkSVForwardIndexReader(forwardBuffer,
+                DataType.STRING); ChunkReaderContext readerContext = rawIndexReader.createContext()) {
+                for (int docId = 0; docId < numDocs; docId++) {
+                  String value = rawIndexReader.getString(docId, readerContext);
+                  if (min == null || StringUtils.compare(min, value) > 0) {
+                    min = value;
+                  }
+                  if (max == null || StringUtils.compare(max, value) < 0) {
+                    max = value;
+                  }
+                }
+            }
+          } else {
+            try (VarByteChunkMVForwardIndexReader rawIndexReader = new VarByteChunkMVForwardIndexReader(forwardBuffer,
+                DataType.STRING); ChunkReaderContext readerContext = rawIndexReader.createContext()) {
+                for (int docId = 0; docId < numDocs; docId++) {
+                  String[] value = rawIndexReader.getStringMV(docId, readerContext);
+                  for (int i = 0; i < value.length; i++) {
+                    if (min == null || StringUtils.compare(min, value[i]) > 0) {
+                      min = value[i];
+                    }
+                    if (max == null || StringUtils.compare(max, value[i]) < 0) {
+                      max = value[i];
+                    }
+                  }
+                }
+            }
+          }
+          SegmentColumnarIndexCreator.addColumnMinMaxValueInfo(_segmentProperties, columnName, min, max);
+          }
+          break;
+        case BYTES: {
+          byte[] min = null;
+          byte[] max = null;
+          if (isSingleValueField) {
+            try (VarByteChunkSVForwardIndexReader rawIndexReader = new VarByteChunkSVForwardIndexReader(forwardBuffer,
+                DataType.BYTES); ChunkReaderContext readerContext = rawIndexReader.createContext()) {
+                for (int docId = 0; docId < numDocs; docId++) {
+                  byte[] value = rawIndexReader.getBytes(docId, readerContext);
+                  if (min == null || ByteArray.compare(value, min) > 0) {
+                    min = value;
+                  }
+                  if (max == null || ByteArray.compare(value, max) < 0) {
+                    max = value;
+                  }
+                }
+            }
+          } else {
+            try (VarByteChunkMVForwardIndexReader rawIndexReader = new VarByteChunkMVForwardIndexReader(forwardBuffer,
+                DataType.BYTES); ChunkReaderContext readerContext = rawIndexReader.createContext()) {
+                for (int docId = 0; docId < numDocs; docId++) {
+                  byte[][] value = rawIndexReader.getBytesMV(docId, readerContext);
+                  for (int i = 0; i < value.length; i++) {
+                    if (min == null || ByteArray.compare(value[i], min) > 0) {
+                      min = value[i];
+                    }
+                    if (max == null || ByteArray.compare(value[i], max) < 0) {
+                      max = value[i];
+                    }
+                  }
+                }
+            }
+          }
           SegmentColumnarIndexCreator.addColumnMinMaxValueInfo(_segmentProperties, columnName,
-              bytesDictionary.getStringValue(0), bytesDictionary.getStringValue(length - 1));
-        }
-        break;
-      default:
-        throw new IllegalStateException("Unsupported data type: " + dataType + " for column: " + columnName);
+              String.valueOf(new ByteArray(min)), String.valueOf(new ByteArray(max)));
+          }
+          break;
+        default:
+          throw new IllegalStateException("Unsupported data type: " + dataType + " for column: " + columnName);
+      }
     }
-
     _minMaxValueAdded = true;
   }
 }
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java
index 16d265c04d..0bdaefca0a 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java
@@ -1755,13 +1755,8 @@ public class SegmentPreProcessorTest {
     }
     segmentMetadata = new SegmentMetadataImpl(_indexDir);
     segmentMetadata.getColumnMetadataMap().forEach((k, v) -> {
-      if (v.hasDictionary()) {
-        assertNotNull(v.getMinValue(), "checking column: " + k);
-        assertNotNull(v.getMaxValue(), "checking column: " + k);
-      } else {
-        assertNull(v.getMinValue(), "checking column: " + k);
-        assertNull(v.getMaxValue(), "checking column: " + k);
-      }
+      assertNotNull(v.getMinValue(), "checking column: " + k);
+      assertNotNull(v.getMaxValue(), "checking column: " + k);
     });
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org