You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2020/07/22 13:53:10 UTC

[GitHub] [incubator-pinot] mayankshriv commented on a change in pull request #5722: Introduce IndexContainer in MutableSegmentImpl to reduce map lookups

mayankshriv commented on a change in pull request #5722:
URL: https://github.com/apache/incubator-pinot/pull/5722#discussion_r458805300



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
##########
@@ -1033,13 +960,89 @@ void updateMVEntry(int numValuesInMVEntry) {
       _numValues += numValuesInMVEntry;
       _maxNumValuesPerMVEntry = Math.max(_maxNumValuesPerMVEntry, numValuesInMVEntry);
     }
+  }
+
+  private class IndexContainer implements Closeable {
+    final FieldSpec _fieldSpec;
+    final PartitionFunction _partitionFunction;
+    final int _partitionId;
+    final NumValuesInfo _numValuesInfo;
+    final MutableForwardIndex _forwardIndex;
+    final BaseMutableDictionary _dictionary;
+    final RealtimeInvertedIndexReader _invertedIndex;
+    final InvertedIndexReader _rangeIndex;
+    final RealtimeLuceneTextIndexReader _textIndex;
+    final BloomFilterReader _bloomFilter;
+    final MutableNullValueVector _nullValueVector;
+
+    volatile Comparable _minValue;
+    volatile Comparable _maxValue;
+
+    // Hold the dictionary id for the latest record
+    int _dictId = Integer.MIN_VALUE;
+    int[] _dictIds;
+
+    IndexContainer(FieldSpec fieldSpec, PartitionFunction partitionFunction, int partitionId,
+        NumValuesInfo numValuesInfo, MutableForwardIndex forwardIndex, BaseMutableDictionary dictionary,
+        RealtimeInvertedIndexReader invertedIndex, InvertedIndexReader rangeIndex,
+        RealtimeLuceneTextIndexReader textIndex, BloomFilterReader bloomFilter,
+        MutableNullValueVector nullValueVector) {
+      _fieldSpec = fieldSpec;
+      _partitionFunction = partitionFunction;
+      _partitionId = partitionId;
+      _numValuesInfo = numValuesInfo;
+      _forwardIndex = forwardIndex;
+      _dictionary = dictionary;
+      _invertedIndex = invertedIndex;
+      _rangeIndex = rangeIndex;
+      _textIndex = textIndex;
+      _bloomFilter = bloomFilter;
+      _nullValueVector = nullValueVector;
+    }
 
-    int getNumValues() {
-      return _numValues;
+    DataSource toDataSource() {
+      return new MutableDataSource(_fieldSpec, _numDocsIndexed, _numValuesInfo._numValues,
+          _numValuesInfo._maxNumValuesPerMVEntry, _partitionFunction, _partitionId, _minValue, _maxValue, _forwardIndex,
+          _dictionary, _invertedIndex, _rangeIndex, _textIndex, _bloomFilter, _nullValueVector);
     }
 
-    int getMaxNumValuesPerMVEntry() {
-      return _maxNumValuesPerMVEntry;
+    @Override
+    public void close() {
+      String column = _fieldSpec.getName();
+      try {
+        _forwardIndex.close();
+      } catch (Exception e) {
+        _logger.error("Caught exception while closing forward index for column: {}, continuing with error", column, e);
+      }
+      if (_dictionary != null) {

Review comment:
       Use Optional to avoid null check?

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
##########
@@ -432,202 +409,193 @@ public void addExtraColumns(Schema newSchema) {
     _logger.info("Newly added columns: " + _newlyAddedColumnsFieldMap.toString());
   }
 
+  // NOTE: Okay for single-writer
+  @SuppressWarnings("NonAtomicOperationOnVolatileField")
   @Override
   public boolean index(GenericRow row, @Nullable RowMetadata rowMetadata) {
-    boolean canTakeMore;
     // Update dictionary first
-    Map<String, Object> dictIdMap = updateDictionary(row);
-
-    int numDocs = _numDocsIndexed;
+    updateDictionary(row);
 
     // If metrics aggregation is enabled and if the dimension values were already seen, this will return existing docId,
     // else this will return a new docId.
-    int docId = getOrCreateDocId(dictIdMap);
-
-    // docId == numDocs implies new docId.
-    if (docId == numDocs) {
-      // Add forward and inverted indices for new document.
-      addForwardIndex(row, docId, dictIdMap);
-      addInvertedIndex(row, docId, dictIdMap);
-      if (_nullHandlingEnabled) {
-        handleNullValues(row, docId);
-      }
+    int docId = getOrCreateDocId();
 
+    boolean canTakeMore;
+    if (docId == _numDocsIndexed) {
+      // New document
+      addNewDocument(row, docId);
       // Update number of document indexed at last to make the latest record queryable
       canTakeMore = _numDocsIndexed++ < _capacity;
     } else {
-      Preconditions
-          .checkState(_aggregateMetrics, "Invalid document-id during indexing: " + docId + " expected: " + numDocs);
-      // Update metrics for existing document.
-      canTakeMore = aggregateMetrics(row, docId);
+      // Aggregate metrics for existing document
+      assert _aggregateMetrics;
+      aggregateMetrics(row, docId);
+      canTakeMore = true;
     }
 
+    // Update last indexed time and latest ingestion time
     _lastIndexedTimeMs = System.currentTimeMillis();
-
-    if (rowMetadata != null && rowMetadata.getIngestionTimeMs() != Long.MIN_VALUE) {
+    if (rowMetadata != null) {
       _latestIngestionTimeMs = Math.max(_latestIngestionTimeMs, rowMetadata.getIngestionTimeMs());
     }
+
     return canTakeMore;
   }
 
-  private Map<String, Object> updateDictionary(GenericRow row) {
-    Map<String, Object> dictIdMap = new HashMap<>();
-    for (FieldSpec fieldSpec : _physicalFieldSpecs) {
-      String column = fieldSpec.getName();
+  private void updateDictionary(GenericRow row) {
+    for (Map.Entry<String, IndexContainer> entry : _indexContainerMap.entrySet()) {
+      String column = entry.getKey();
+      IndexContainer indexContainer = entry.getValue();
       Object value = row.getValue(column);
-
-      BaseMutableDictionary dictionary = _dictionaryMap.get(column);
+      BaseMutableDictionary dictionary = indexContainer._dictionary;
       if (dictionary != null) {
-        if (fieldSpec.isSingleValueField()) {
-          dictIdMap.put(column, dictionary.index(value));
+        if (indexContainer._fieldSpec.isSingleValueField()) {
+          indexContainer._dictId = dictionary.index(value);
         } else {
-          int[] dictIds = dictionary.index((Object[]) value);
-          dictIdMap.put(column, dictIds);
-
-          // No need to update min/max time value as time column cannot be multi-valued
-          continue;
+          indexContainer._dictIds = dictionary.index((Object[]) value);
         }
+
+        // Update min/max value from dictionary
+        indexContainer._minValue = dictionary.getMinVal();
+        indexContainer._maxValue = dictionary.getMaxVal();
       }
     }
-    return dictIdMap;
   }
 
-  private void addForwardIndex(GenericRow row, int docId, Map<String, Object> dictIdMap) {
-    // Store dictionary Id(s) for columns with dictionary
-    for (FieldSpec fieldSpec : _physicalFieldSpecs) {
-      String column = fieldSpec.getName();
+  private void addNewDocument(GenericRow row, int docId) {

Review comment:
       Nit: addNewRow?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org