You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2021/10/20 14:50:22 UTC

[GitHub] [pinot] kishoreg commented on a change in pull request #7595: MV fwd index + MV `BYTES`

kishoreg commented on a change in pull request #7595:
URL: https://github.com/apache/pinot/pull/7595#discussion_r732861201



##########
File path: pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/BaseChunkSVForwardIndexWriter.java
##########
@@ -66,19 +68,21 @@
    * @param chunkSize Size of chunk
    * @param sizeOfEntry Size of entry (in bytes), max size for variable byte implementation.
    * @param version version of File
-   * @throws FileNotFoundException
+   * @throws IOException if the file isn't found or can't be mapped
    */
   protected BaseChunkSVForwardIndexWriter(File file, ChunkCompressionType compressionType, int totalDocs,
       int numDocsPerChunk, int chunkSize, int sizeOfEntry, int version)
-      throws FileNotFoundException {
+      throws IOException {
     Preconditions.checkArgument(version == DEFAULT_VERSION || version == CURRENT_VERSION);
+    _file = file;
+    _headerEntryChunkOffsetSize = getHeaderEntryChunkOffsetSize(version);
+    _dataOffset = headerSize(totalDocs, numDocsPerChunk, _headerEntryChunkOffsetSize);
     _chunkSize = chunkSize;
     _chunkCompressor = ChunkCompressorFactory.getCompressor(compressionType);
-    _headerEntryChunkOffsetSize = getHeaderEntryChunkOffsetSize(version);
-    _dataOffset = writeHeader(compressionType, totalDocs, numDocsPerChunk, sizeOfEntry, version);
     _chunkBuffer = ByteBuffer.allocateDirect(chunkSize);
-    _compressedBuffer = ByteBuffer.allocateDirect(chunkSize * 2);
-    _dataFile = new RandomAccessFile(file, "rw").getChannel();
+    _dataChannel = new RandomAccessFile(file, "rw").getChannel();
+    _header = _dataChannel.map(FileChannel.MapMode.READ_WRITE, 0, _dataOffset);

Review comment:
       where is the compressedBuffer getting initialized or we dont need that anymore?

##########
File path: pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
##########
@@ -452,12 +457,119 @@ public void indexRow(GenericRow row)
           }
         }
       } else {
-        // MV column (always dictionary encoded)
-        int[] dictIds = dictionaryCreator.indexOfMV(columnValueToIndex);
-        forwardIndexCreator.putDictIdMV(dictIds);
-        DictionaryBasedInvertedIndexCreator invertedIndexCreator = _invertedIndexCreatorMap.get(columnName);
-        if (invertedIndexCreator != null) {
-          invertedIndexCreator.add(dictIds, dictIds.length);
+        if (dictionaryCreator != null) {
+          //dictionary encoded
+          int[] dictIds = dictionaryCreator.indexOfMV(columnValueToIndex);
+          forwardIndexCreator.putDictIdMV(dictIds);
+          DictionaryBasedInvertedIndexCreator invertedIndexCreator = _invertedIndexCreatorMap
+              .get(columnName);
+          if (invertedIndexCreator != null) {
+            invertedIndexCreator.add(dictIds, dictIds.length);
+          }
+        } else {
+          // for text index on raw columns, check the config to determine if actual raw value should
+          // be stored or not
+          if (textIndexCreator != null && !shouldStoreRawValueForTextIndex(columnName)) {
+            Object value = _columnProperties.get(columnName)
+                .get(FieldConfig.TEXT_INDEX_RAW_VALUE);
+            if (value == null) {
+              value = FieldConfig.TEXT_INDEX_DEFAULT_RAW_VALUE;
+            }
+            if (forwardIndexCreator.getValueType().getStoredType() == DataType.STRING) {
+              value = String.valueOf(value);
+              int length = ((String[]) columnValueToIndex).length;
+              columnValueToIndex = new String[length];
+              Arrays.fill((String[]) columnValueToIndex, value);
+            } else if (forwardIndexCreator.getValueType().getStoredType() == DataType.BYTES) {
+              int length = ((byte[][]) columnValueToIndex).length;
+              columnValueToIndex = new byte[length][];
+              Arrays.fill((byte[][]) columnValueToIndex, String.valueOf(value).getBytes());
+            } else {
+              throw new RuntimeException("Text Index is only supported for STRING and BYTES stored type");
+            }
+          }
+          switch (forwardIndexCreator.getValueType()) {
+            case INT:
+              if (columnValueToIndex instanceof int[]) {
+                forwardIndexCreator.putIntMV((int[]) columnValueToIndex);
+              } else if (columnValueToIndex instanceof Object[]) {
+                int[] array = new int[((Object[]) columnValueToIndex).length];
+                for (int i = 0; i < array.length; i++) {
+                  array[i] = (Integer) ((Object[]) columnValueToIndex)[i];
+                }
+                forwardIndexCreator.putIntMV(array);
+              } else {
+                //TODO: is this possible?

Review comment:
       I looked at the code and it should not enter this path

##########
File path: pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/BaseChunkSVForwardIndexWriter.java
##########
@@ -166,13 +172,15 @@ private int writeHeader(ChunkCompressionType compressionType, int totalDocs, int
    *
    */
   protected void writeChunk() {
-    int sizeToWrite;
+    int sizeWritten;
     _chunkBuffer.flip();
 
-    try {
-      sizeToWrite = _chunkCompressor.compress(_chunkBuffer, _compressedBuffer);
-      _dataFile.write(_compressedBuffer, _dataOffset);
-      _compressedBuffer.clear();
+    int maxCompressedSize = _chunkCompressor.maxCompressedSize(_chunkBuffer.limit());
+    // compress directly in to the mapped output rather keep a large buffer to compress into
+    try (PinotDataBuffer compressedBuffer = PinotDataBuffer.mapFile(_file, false, _dataOffset,

Review comment:
       I think the existing ones were written using BIG_ENDIAN, we might need to up the version if we are changing the byteorder and handle both cases for backwards compatibility.
   
   Might be better to keep the same byteorder as the existing one.
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org