You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2022/12/03 22:51:03 UTC

[GitHub] [pinot] siddharthteotia commented on a diff in pull request #9810: Default column handling of noForwardIndex and regeneration of forward index on reload path

siddharthteotia commented on code in PR #9810:
URL: https://github.com/apache/pinot/pull/9810#discussion_r1038881456


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/InvertedIndexAndDictionaryBasedForwardIndexCreator.java:
##########
@@ -0,0 +1,619 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.index.loader;
+
+import com.google.common.base.Preconditions;
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.segment.local.segment.index.readers.BitmapInvertedIndexReader;
+import org.apache.pinot.segment.spi.ColumnMetadata;
+import org.apache.pinot.segment.spi.SegmentMetadata;
+import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.creator.IndexCreationContext;
+import org.apache.pinot.segment.spi.creator.IndexCreatorProvider;
+import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator;
+import org.apache.pinot.segment.spi.index.reader.Dictionary;
+import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
+import org.apache.pinot.segment.spi.store.ColumnIndexType;
+import org.apache.pinot.segment.spi.store.SegmentDirectory;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.utils.BigDecimalUtils;
+import org.apache.pinot.spi.utils.ByteArray;
+import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.pinot.segment.spi.V1Constants.MetadataKeys.Column.DICTIONARY_ELEMENT_SIZE;
+import static org.apache.pinot.segment.spi.V1Constants.MetadataKeys.Column.HAS_DICTIONARY;
+import static org.apache.pinot.segment.spi.V1Constants.MetadataKeys.Column.MAX_MULTI_VALUE_ELEMENTS;
+import static org.apache.pinot.segment.spi.V1Constants.MetadataKeys.Column.TOTAL_NUMBER_OF_ENTRIES;
+import static org.apache.pinot.segment.spi.V1Constants.MetadataKeys.Column.getKeyFor;
+
+
+/**
+ * Helper classed used by the {@link SegmentPreProcessor} to generate the forward index from inverted index and
+ * dictionary when the forward index is enabled for columns where it was previously disabled. This is also invoked by
+ * the {@link IndexHandler} code in scenarios where the forward index needs to be temporarily created to generate other
+ * indexes for the given column. In such cases the forward index will be cleaned up after the {@link IndexHandler} code
+ * completes.
+ *
+ * For multi-value columns the following invariants cannot be maintained:
+ * - Ordering of elements within a given multi-value row. This will always be a limitation.
+ * - Data loss is possible if there repeats for elements within a given multi-value row. This limitation will be
+ *   addressed as a future change
+ *
+ * TODO: Currently for multi-value columns generating the forward index can lead to a data loss as frequency information
+ *       is not available for repeats within a given row. This needs to be addressed by tracking the frequency data
+ *       as part of an on-disk structure when forward index is disabled for a column.
+ */
+public class InvertedIndexAndDictionaryBasedForwardIndexCreator implements AutoCloseable {
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(InvertedIndexAndDictionaryBasedForwardIndexCreator.class);
+
+  // Use MMapBuffer if the value buffer size is larger than 2G
+  private static final int NUM_VALUES_THRESHOLD_FOR_MMAP_BUFFER = 500_000_000;
+
+  private static final String FORWARD_INDEX_VALUE_BUFFER_SUFFIX = ".fwd.idx.val.buf";
+  private static final String FORWARD_INDEX_LENGTH_BUFFER_SUFFIX = ".fwd.idx.len.buf";
+  private static final String FORWARD_INDEX_MAX_SIZE_BUFFER_SUFFIX = ".fwd.idx.maxsize.buf";
+
+  private final String _columnName;
+  private final SegmentMetadata _segmentMetadata;
+  private final IndexLoadingConfig _indexLoadingConfig;
+  private final SegmentDirectory.Writer _segmentWriter;
+  private final IndexCreatorProvider _indexCreatorProvider;
+  private final boolean _isTemporaryForwardIndex;
+
+  // Metadata
+  private final ColumnMetadata _columnMetadata;
+  private final boolean _singleValue;
+  private final int _cardinality;
+  private final int _numDocs;
+  private final int _maxNumberOfMultiValues;
+  private final int _totalNumberOfEntries;
+  private final boolean _dictionaryEnabled;
+  private final ChunkCompressionType _chunkCompressionType;
+  private final boolean _useMMapBuffer;
+
+  // Files and temporary buffers
+  private final File _forwardIndexFile;
+  private final File _forwardIndexValueBufferFile;
+  private final File _forwardIndexLengthBufferFile;
+  private final File _forwardIndexMaxSizeBufferFile;
+
+  // Forward index buffers (to store the dictId at the correct docId)
+  private PinotDataBuffer _forwardIndexValueBuffer;
+  // For multi-valued column only because each docId can have multiple dictIds
+  private PinotDataBuffer _forwardIndexLengthBuffer;
+  private int _nextValueId;
+  // For multi-valued column only to track max row size
+  private PinotDataBuffer _forwardIndexMaxSizeBuffer;
+
+  public InvertedIndexAndDictionaryBasedForwardIndexCreator(String columnName, SegmentMetadata segmentMetadata,
+      IndexLoadingConfig indexLoadingConfig, SegmentDirectory.Writer segmentWriter,
+      IndexCreatorProvider indexCreatorProvider, boolean isTemporaryForwardIndex)
+      throws IOException {
+    _columnName = columnName;
+    _segmentMetadata = segmentMetadata;
+    _indexLoadingConfig = indexLoadingConfig;
+    _segmentWriter = segmentWriter;
+    _indexCreatorProvider = indexCreatorProvider;
+    _isTemporaryForwardIndex = isTemporaryForwardIndex;
+
+    _columnMetadata = segmentMetadata.getColumnMetadataFor(columnName);
+    _singleValue = _columnMetadata.isSingleValue();
+    _cardinality = _columnMetadata.getCardinality();
+    _numDocs = _columnMetadata.getTotalDocs();
+    _totalNumberOfEntries = _columnMetadata.getTotalNumberOfEntries();
+    _maxNumberOfMultiValues = _columnMetadata.getMaxNumberOfMultiValues();
+    _dictionaryEnabled = !_indexLoadingConfig.getNoDictionaryColumns().contains(columnName);
+    _chunkCompressionType = getColumnCompressionType();
+    int numValues = _singleValue ? _numDocs : _totalNumberOfEntries;
+    _useMMapBuffer = numValues > NUM_VALUES_THRESHOLD_FOR_MMAP_BUFFER;
+
+    // Sorted columns should never need recreation of the forward index as the forwardIndexDisabled flag is treated as
+    // a no-op for sorted columns
+    File indexDir = segmentMetadata.getIndexDir();
+    String fileExtension;
+    if (_dictionaryEnabled) {
+      fileExtension = _singleValue ? V1Constants.Indexes.UNSORTED_SV_FORWARD_INDEX_FILE_EXTENSION
+          : V1Constants.Indexes.UNSORTED_MV_FORWARD_INDEX_FILE_EXTENSION;
+    } else {
+      fileExtension = _singleValue ? V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION
+          : V1Constants.Indexes.RAW_MV_FORWARD_INDEX_FILE_EXTENSION;
+    }
+    _forwardIndexFile = new File(indexDir, columnName + fileExtension);
+    _forwardIndexValueBufferFile = new File(indexDir, columnName + FORWARD_INDEX_VALUE_BUFFER_SUFFIX);
+    _forwardIndexLengthBufferFile = new File(indexDir, columnName + FORWARD_INDEX_LENGTH_BUFFER_SUFFIX);
+    _forwardIndexMaxSizeBufferFile = new File(indexDir, columnName + FORWARD_INDEX_MAX_SIZE_BUFFER_SUFFIX);
+
+    // Create the temporary buffers needed
+    try {
+      _forwardIndexValueBuffer = createTempBuffer((long) numValues * Integer.BYTES, _forwardIndexValueBufferFile);
+      if (!_singleValue) {
+        _forwardIndexLengthBuffer = createTempBuffer((long) _numDocs * Integer.BYTES, _forwardIndexLengthBufferFile);
+        for (int i = 0; i < _numDocs; i++) {
+          // We need to clear the forward index length buffer because we rely on the initial value of 0, and keep
+          // updating the value instead of directly setting the value
+          _forwardIndexLengthBuffer.putInt((long) i * Integer.BYTES, 0);
+        }
+        _forwardIndexMaxSizeBuffer = createTempBuffer((long) _numDocs * Integer.BYTES, _forwardIndexMaxSizeBufferFile);
+        for (int i = 0; i < _numDocs; i++) {
+          // We need to clear the forward index max size buffer because we rely on the initial value of 0, and keep
+          // updating the value instead of directly setting the value
+          _forwardIndexMaxSizeBuffer.putInt((long) i * Integer.BYTES, 0);
+        }
+      }
+    } catch (Exception e) {
+      destroyBuffer(_forwardIndexValueBuffer, _forwardIndexValueBufferFile);
+      destroyBuffer(_forwardIndexLengthBuffer, _forwardIndexLengthBufferFile);
+      destroyBuffer(_forwardIndexMaxSizeBuffer, _forwardIndexMaxSizeBufferFile);
+      throw new IOException("Couldn't create temp buffers to construct forward index", e);
+    }
+  }
+
+  private ChunkCompressionType getColumnCompressionType() {
+    if (!_indexLoadingConfig.getNoDictionaryColumns().contains(_columnName)) {

Review Comment:
   (nit) I think this is already checked in the constructor and stored away in `_dictionaryEnabled` so you can instead do
   
   `if (_dictionaryEnabled)`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org