You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2020/07/21 17:28:32 UTC

[GitHub] [incubator-pinot] Jackie-Jiang commented on a change in pull request #5722: Introduce IndexContainer in MutableSegmentImpl to reduce map lookups

Jackie-Jiang commented on a change in pull request #5722:
URL: https://github.com/apache/incubator-pinot/pull/5722#discussion_r458267495



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
##########
@@ -432,202 +409,193 @@ public void addExtraColumns(Schema newSchema) {
     _logger.info("Newly added columns: " + _newlyAddedColumnsFieldMap.toString());
   }
 
+  // NOTE: Okay for single-writer
+  @SuppressWarnings("NonAtomicOperationOnVolatileField")
   @Override
   public boolean index(GenericRow row, @Nullable RowMetadata rowMetadata) {
-    boolean canTakeMore;
     // Update dictionary first
-    Map<String, Object> dictIdMap = updateDictionary(row);
-
-    int numDocs = _numDocsIndexed;
+    updateDictionary(row);
 
     // If metrics aggregation is enabled and if the dimension values were already seen, this will return existing docId,
     // else this will return a new docId.
-    int docId = getOrCreateDocId(dictIdMap);
-
-    // docId == numDocs implies new docId.
-    if (docId == numDocs) {
-      // Add forward and inverted indices for new document.
-      addForwardIndex(row, docId, dictIdMap);
-      addInvertedIndex(row, docId, dictIdMap);
-      if (_nullHandlingEnabled) {
-        handleNullValues(row, docId);
-      }
+    int docId = getOrCreateDocId();
 
+    boolean canTakeMore;
+    if (docId == _numDocsIndexed) {
+      // New document
+      addNewDocument(row, docId);
       // Update number of document indexed at last to make the latest record queryable
       canTakeMore = _numDocsIndexed++ < _capacity;
     } else {
-      Preconditions
-          .checkState(_aggregateMetrics, "Invalid document-id during indexing: " + docId + " expected: " + numDocs);
-      // Update metrics for existing document.
-      canTakeMore = aggregateMetrics(row, docId);
+      // Aggregate metrics for existing document
+      assert _aggregateMetrics;
+      aggregateMetrics(row, docId);
+      canTakeMore = true;
     }
 
+    // Update last indexed time and latest ingestion time
     _lastIndexedTimeMs = System.currentTimeMillis();
-
-    if (rowMetadata != null && rowMetadata.getIngestionTimeMs() != Long.MIN_VALUE) {
+    if (rowMetadata != null) {
       _latestIngestionTimeMs = Math.max(_latestIngestionTimeMs, rowMetadata.getIngestionTimeMs());
     }
+
     return canTakeMore;
   }
 
-  private Map<String, Object> updateDictionary(GenericRow row) {
-    Map<String, Object> dictIdMap = new HashMap<>();
-    for (FieldSpec fieldSpec : _physicalFieldSpecs) {
-      String column = fieldSpec.getName();
+  private void updateDictionary(GenericRow row) {
+    for (Map.Entry<String, IndexContainer> entry : _indexContainerMap.entrySet()) {
+      String column = entry.getKey();
+      IndexContainer indexContainer = entry.getValue();
       Object value = row.getValue(column);
-
-      BaseMutableDictionary dictionary = _dictionaryMap.get(column);
+      BaseMutableDictionary dictionary = indexContainer._dictionary;
       if (dictionary != null) {
-        if (fieldSpec.isSingleValueField()) {
-          dictIdMap.put(column, dictionary.index(value));
+        if (indexContainer._fieldSpec.isSingleValueField()) {
+          indexContainer._dictId = dictionary.index(value);
         } else {
-          int[] dictIds = dictionary.index((Object[]) value);
-          dictIdMap.put(column, dictIds);
-
-          // No need to update min/max time value as time column cannot be multi-valued
-          continue;
+          indexContainer._dictIds = dictionary.index((Object[]) value);
         }
+
+        // Update min/max value from dictionary
+        indexContainer._minValue = dictionary.getMinVal();
+        indexContainer._maxValue = dictionary.getMaxVal();
       }
     }
-    return dictIdMap;
   }
 
-  private void addForwardIndex(GenericRow row, int docId, Map<String, Object> dictIdMap) {
-    // Store dictionary Id(s) for columns with dictionary
-    for (FieldSpec fieldSpec : _physicalFieldSpecs) {
-      String column = fieldSpec.getName();
+  private void addNewDocument(GenericRow row, int docId) {
+    for (Map.Entry<String, IndexContainer> entry : _indexContainerMap.entrySet()) {
+      String column = entry.getKey();
+      IndexContainer indexContainer = entry.getValue();
       Object value = row.getValue(column);
-      NumValuesInfo numValuesInfo = _numValuesInfoMap.get(column);
+      FieldSpec fieldSpec = indexContainer._fieldSpec;
       if (fieldSpec.isSingleValueField()) {
-        // SV column
-        MutableForwardIndex mutableForwardIndex = _forwardIndexMap.get(column);
-        Integer dictId = (Integer) dictIdMap.get(column);
-        if (dictId != null) {
-          // SV Column with dictionary
-          mutableForwardIndex.setDictId(docId, dictId);
+        // Single-value column
+
+        // Update numValues info
+        indexContainer._numValuesInfo.updateSVEntry();
+
+        // Update indexes
+        MutableForwardIndex forwardIndex = indexContainer._forwardIndex;
+        int dictId = indexContainer._dictId;
+        if (dictId >= 0) {
+          // Dictionary-encoded single-value column
+
+          // Update forward index
+          forwardIndex.setDictId(docId, dictId);
+
+          // Update inverted index
+          RealtimeInvertedIndexReader invertedIndex = indexContainer._invertedIndex;
+          if (invertedIndex != null) {
+            invertedIndex.add(dictId, docId);
+          }
         } else {
-          // No-dictionary SV column
+          // Single-value column with raw index
+
+          // Update forward index
           DataType dataType = fieldSpec.getDataType();
           switch (dataType) {
             case INT:
-              mutableForwardIndex.setInt(docId, (Integer) value);
+              forwardIndex.setInt(docId, (Integer) value);
               break;
             case LONG:
-              mutableForwardIndex.setLong(docId, (Long) value);
+              forwardIndex.setLong(docId, (Long) value);
               break;
             case FLOAT:
-              mutableForwardIndex.setFloat(docId, (Float) value);
+              forwardIndex.setFloat(docId, (Float) value);
               break;
             case DOUBLE:
-              mutableForwardIndex.setDouble(docId, (Double) value);
+              forwardIndex.setDouble(docId, (Double) value);
               break;
             case STRING:
-              mutableForwardIndex.setString(docId, (String) value);
+              forwardIndex.setString(docId, (String) value);
               break;
             case BYTES:
-              mutableForwardIndex.setBytes(docId, (byte[]) value);
+              forwardIndex.setBytes(docId, (byte[]) value);
               break;
             default:
               throw new UnsupportedOperationException(
                   "Unsupported data type: " + dataType + " for no-dictionary column: " + column);
           }
+
+          // Update min/max value from raw value
+          // NOTE: Skip updating min/max value for aggregated metrics because the value will change over time.
+          if (!_aggregateMetrics || fieldSpec.getFieldType() != FieldSpec.FieldType.METRIC) {

Review comment:
       No, without `aggregateMetrics` we can store min/max value for metric columns because the value won't change over time




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org