You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2022/11/07 18:32:00 UTC

[GitHub] [pinot] vvivekiyer commented on a diff in pull request #9740: Add support to disable the forward index for existing columns

vvivekiyer commented on code in PR #9740:
URL: https://github.com/apache/pinot/pull/9740#discussion_r1015750072


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java:
##########
@@ -120,12 +128,47 @@ public void updateIndices(SegmentDirectory.Writer segmentWriter, IndexCreatorPro
           createDictBasedForwardIndex(column, segmentWriter, indexCreatorProvider);
           break;
         }
+        case DELETE_FORWARD_INDEX: {
+          // Deletion of the forward index will be handled outside the index handler to ensure that other index
+          // handlers that need the forward index to construct their own indexes will have it available.
+          // The existing forward index must be in dictionary format for this to be a no-op.
+          _forwardIndexDisabledColumnsToCleanup.add(column);
+          break;
+        }
+        case CREATE_TEMP_DICTIONARY_BASED_FORWARD_INDEX_FROM_EXISTING_RAW_FORWARD_INDEX: {
+          // The forward index has been disabled for a column which has a noDictionary based forward index. A dictionary
+          // and inverted index need to be created before we can delete the forward index. We create a dictionary here,
+          // but let the InvertedIndexHandler handle the creation of the inverted index. We create a temporary
+          // forward index here which is dictionary based and allow the post deletion step handle the actual deletion
+          // of the forward index.
+          createDictBasedForwardIndex(column, segmentWriter, indexCreatorProvider);
+          Preconditions.checkState(segmentWriter.hasIndexFor(column, ColumnIndexType.FORWARD_INDEX),
+              String.format("Temporary forward index was not created for column: %s", column));
+          _forwardIndexDisabledColumnsToCleanup.add(column);
+          break;
+        }
         default:
           throw new IllegalStateException("Unsupported operation for column " + column);
       }
     }
   }
 
+  @Override
+  public void postUpdateIndicesCleanup(SegmentDirectory.Writer segmentWriter)
+      throws Exception {
+    // Delete the forward index for columns which have it disabled. Perform this as a post-processing step after all
+    // IndexHandlers have updated their indexes as some of them need to temporarily create a forward index to
+    // generate other indexes off of.
+    for (String column : _forwardIndexDisabledColumnsToCleanup) {
+      if ((_indexLoadingConfig.getForwardIndexDisabledColumns() != null
+          && _indexLoadingConfig.getForwardIndexDisabledColumns().contains(column))

Review Comment:
   We already did this check in computeOperationMap. For my understanding, is it necessary to do it again here?



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java:
##########
@@ -62,34 +62,42 @@
 import static org.apache.pinot.segment.spi.V1Constants.MetadataKeys.Column.DICTIONARY_ELEMENT_SIZE;
 import static org.apache.pinot.segment.spi.V1Constants.MetadataKeys.Column.HAS_DICTIONARY;
 import static org.apache.pinot.segment.spi.V1Constants.MetadataKeys.Column.getKeyFor;
+
+
 /**
  * Helper class used by {@link SegmentPreProcessor} to make changes to forward index and dictionary configs. Note
  * that this handler only works for segment versions >= 3.0. Support for segment version < 3.0 is not added because
  * majority of the usecases are in versions >= 3.0 and this avoids adding tech debt. The currently supported
  * operations are:
  * 1. Change compression on raw SV and MV columns.
+ * 2. Disable forward index on a column where it is enabled

Review Comment:
   Not related to your change - can you also add that we support "Enabling dictionary on a raw column"?



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java:
##########
@@ -120,12 +128,47 @@ public void updateIndices(SegmentDirectory.Writer segmentWriter, IndexCreatorPro
           createDictBasedForwardIndex(column, segmentWriter, indexCreatorProvider);
           break;
         }
+        case DELETE_FORWARD_INDEX: {
+          // Deletion of the forward index will be handled outside the index handler to ensure that other index
+          // handlers that need the forward index to construct their own indexes will have it available.
+          // The existing forward index must be in dictionary format for this to be a no-op.
+          _forwardIndexDisabledColumnsToCleanup.add(column);
+          break;
+        }
+        case CREATE_TEMP_DICTIONARY_BASED_FORWARD_INDEX_FROM_EXISTING_RAW_FORWARD_INDEX: {
+          // The forward index has been disabled for a column which has a noDictionary based forward index. A dictionary
+          // and inverted index need to be created before we can delete the forward index. We create a dictionary here,
+          // but let the InvertedIndexHandler handle the creation of the inverted index. We create a temporary
+          // forward index here which is dictionary based and allow the post deletion step handle the actual deletion
+          // of the forward index.
+          createDictBasedForwardIndex(column, segmentWriter, indexCreatorProvider);
+          Preconditions.checkState(segmentWriter.hasIndexFor(column, ColumnIndexType.FORWARD_INDEX),
+              String.format("Temporary forward index was not created for column: %s", column));
+          _forwardIndexDisabledColumnsToCleanup.add(column);
+          break;
+        }
         default:
           throw new IllegalStateException("Unsupported operation for column " + column);
       }
     }
   }
 
+  @Override
+  public void postUpdateIndicesCleanup(SegmentDirectory.Writer segmentWriter)
+      throws Exception {
+    // Delete the forward index for columns which have it disabled. Perform this as a post-processing step after all
+    // IndexHandlers have updated their indexes as some of them need to temporarily create a forward index to
+    // generate other indexes off of.
+    for (String column : _forwardIndexDisabledColumnsToCleanup) {
+      if ((_indexLoadingConfig.getForwardIndexDisabledColumns() != null

Review Comment:
   null check not necessary.



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessor.java:
##########
@@ -103,20 +104,27 @@ public void process()
 
       // Update single-column indices, like inverted index, json index etc.
       IndexCreatorProvider indexCreatorProvider = IndexingOverrides.getIndexCreatorProvider();
+      List<IndexHandler> indexHandlers = new ArrayList<>();

Review Comment:
   Question - why is it necessary to have this List? Can't we just loop over all index handlers again while calling `postUpdateIndicesCleanup`?



##########
pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandlerTest.java:
##########
@@ -477,7 +669,13 @@ public void testRewriteRawForwardIndexForSingleColumn()
         SegmentDirectory.Writer writer = segmentLocalFSDirectory.createWriter();
 
         List<FieldConfig> fieldConfigs = new ArrayList<>(_tableConfig.getFieldConfigList());
-        FieldConfig config = fieldConfigs.remove(i);
+        int index = -1;
+        for (int j = 0; j < fieldConfigs.size(); j++) {
+          if (fieldConfigs.get(j).getName().equals(_noDictionaryColumns.get(i))) {
+            index = j;

Review Comment:
   Thanks for fixing this.
   Should we add a `break` here?



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java:
##########
@@ -147,11 +190,45 @@ Map<String, Operation> computeOperation(SegmentDirectory.Reader segmentReader)
       }
     }
 
+    // Get list of columns with forward index and those without forward index
+    Set<String> existingForwardIndexColumns =
+        segmentReader.toSegmentDirectory().getColumnsWithIndex(ColumnIndexType.FORWARD_INDEX);
+    Set<String> existingForwardIndexDisabledColumns = new HashSet<>();
+    for (String column : existingAllColumns) {
+      if (!existingForwardIndexColumns.contains(column)) {
+        existingForwardIndexDisabledColumns.add(column);
+      }
+    }
+
     // From new column config.
     Set<String> newNoDictColumns = _indexLoadingConfig.getNoDictionaryColumns();
+    Set<String> newForwardIndexDisabledColumns = _indexLoadingConfig.getForwardIndexDisabledColumns();
 
     for (String column : existingAllColumns) {
-      if (existingNoDictColumns.contains(column) && !newNoDictColumns.contains(column)) {
+      if (existingForwardIndexColumns.contains(column) && newForwardIndexDisabledColumns != null
+          && newForwardIndexDisabledColumns.contains(column)) {
+        // Existing column has a forward index. New column config disables the forward index
+        Preconditions.checkState(!newNoDictColumns.contains(column),
+            String.format("Must enable dictionary for disabling the forward index on column: %s", column));
+        Preconditions.checkState(_indexLoadingConfig.getInvertedIndexColumns().contains(column),
+            String.format("Must enable inverted index for disabling the forward index on column: %s", column));
+        if (existingDictColumns.contains(column)) {
+          columnOperationMap.put(column, Operation.DELETE_FORWARD_INDEX);
+        } else {
+          columnOperationMap.put(column,
+              Operation.CREATE_TEMP_DICTIONARY_BASED_FORWARD_INDEX_FROM_EXISTING_RAW_FORWARD_INDEX);
+        }
+      } else if (existingForwardIndexDisabledColumns.contains(column) && newForwardIndexDisabledColumns != null
+          && !newForwardIndexDisabledColumns.contains(column)) {
+        // TODO: Add support: existing column has its forward index disabled. New column config enables the forward
+        //       index
+        LOGGER.warn("Enabling forward index on a forward index disabled column {} is not yet supported", column);
+      } else if (existingForwardIndexDisabledColumns.contains(column) && newForwardIndexDisabledColumns != null
+          && newForwardIndexDisabledColumns.contains(column)) {
+        // Forward index is disabled for the existing column and should remain disabled based on the latest config
+        Preconditions.checkState(existingDictColumns.contains(column) && !newNoDictColumns.contains(column),
+            String.format("Not allowed to disable the dictionary for forward index disabled column %s", column));
+      } else if (existingNoDictColumns.contains(column) && !newNoDictColumns.contains(column)) {

Review Comment:
   (nit): can we rearrange the "if conditions"to maintain the same order in operation map? 
   1. change raw compression type
   2. enable dictionary
   3. disable forward index
   



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java:
##########
@@ -147,11 +190,45 @@ Map<String, Operation> computeOperation(SegmentDirectory.Reader segmentReader)
       }
     }
 
+    // Get list of columns with forward index and those without forward index
+    Set<String> existingForwardIndexColumns =
+        segmentReader.toSegmentDirectory().getColumnsWithIndex(ColumnIndexType.FORWARD_INDEX);
+    Set<String> existingForwardIndexDisabledColumns = new HashSet<>();
+    for (String column : existingAllColumns) {
+      if (!existingForwardIndexColumns.contains(column)) {
+        existingForwardIndexDisabledColumns.add(column);
+      }
+    }
+
     // From new column config.
     Set<String> newNoDictColumns = _indexLoadingConfig.getNoDictionaryColumns();
+    Set<String> newForwardIndexDisabledColumns = _indexLoadingConfig.getForwardIndexDisabledColumns();
 
     for (String column : existingAllColumns) {
-      if (existingNoDictColumns.contains(column) && !newNoDictColumns.contains(column)) {
+      if (existingForwardIndexColumns.contains(column) && newForwardIndexDisabledColumns != null
+          && newForwardIndexDisabledColumns.contains(column)) {
+        // Existing column has a forward index. New column config disables the forward index
+        Preconditions.checkState(!newNoDictColumns.contains(column),
+            String.format("Must enable dictionary for disabling the forward index on column: %s", column));
+        Preconditions.checkState(_indexLoadingConfig.getInvertedIndexColumns().contains(column),
+            String.format("Must enable inverted index for disabling the forward index on column: %s", column));

Review Comment:
   If the user disables fwdIndex on an existing raw column, but doesn't enable inverted index and dictionary
   We are hard failing here with a preconditions check (which might be ok).
   
   My question is - have we considered just making this a NO_OP in that case? I'm asking because making it a NO_OP is consistent with existing behavior. One example (out of many) - if user changes datatype today, we don't fail segmentReload. We instead just ignore the change (maybe with a warning). 



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java:
##########
@@ -62,34 +62,42 @@
 import static org.apache.pinot.segment.spi.V1Constants.MetadataKeys.Column.DICTIONARY_ELEMENT_SIZE;
 import static org.apache.pinot.segment.spi.V1Constants.MetadataKeys.Column.HAS_DICTIONARY;
 import static org.apache.pinot.segment.spi.V1Constants.MetadataKeys.Column.getKeyFor;
+
+
 /**
  * Helper class used by {@link SegmentPreProcessor} to make changes to forward index and dictionary configs. Note
  * that this handler only works for segment versions >= 3.0. Support for segment version < 3.0 is not added because
  * majority of the usecases are in versions >= 3.0 and this avoids adding tech debt. The currently supported
  * operations are:
  * 1. Change compression on raw SV and MV columns.
+ * 2. Disable forward index on a column where it is enabled
  *
  *  TODO: Add support for the following:
  *  1. Disable dictionary
  *  2. Segment versions < V3
+ *  3. Support enabling forward index on a forward index disabled column
  */
 public class ForwardIndexHandler implements IndexHandler {
   private static final Logger LOGGER = LoggerFactory.getLogger(ForwardIndexHandler.class);
 
   private final SegmentMetadata _segmentMetadata;
-  IndexLoadingConfig _indexLoadingConfig;
-  Schema _schema;
+  private final IndexLoadingConfig _indexLoadingConfig;
+  private final Schema _schema;
+  private final Set<String> _forwardIndexDisabledColumnsToCleanup;
 
   protected enum Operation {
-    // TODO: Add other operations like DISABLE_DICTIONARY,
+    // TODO: Add other operations like DISABLE_DICTIONARY, ADD_FORWARD_INDEX, ADD_RAW_FORWARD_INDEX
     CHANGE_RAW_INDEX_COMPRESSION_TYPE,
-    ENABLE_DICTIONARY
+    ENABLE_DICTIONARY,
+    DELETE_FORWARD_INDEX,

Review Comment:
   Just a thought - 
   
   Can we just have `DISABLE_FORWARD_INDEX` as the enum operation. This will invoke a function `disableForwardIndex()`. In this function, we can decide if we want to mark the fwdIndex for deletion or create a temp dict based fwdIndex and mark it for deletion.
   
   This will help in keeping this enum cleaner. Readers will be immediately able to tell what operations we support on a high level.
   
   
   If you still prefer keeping these 2 operations in the enum, suggest renaming them to "DISABLE_FORWARD_INDEX_FOR_RAW_COL", "DISABLE_FORWARD_INDEX_FOR_DICT_COL"



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java:
##########
@@ -147,11 +190,45 @@ Map<String, Operation> computeOperation(SegmentDirectory.Reader segmentReader)
       }
     }
 
+    // Get list of columns with forward index and those without forward index
+    Set<String> existingForwardIndexColumns =
+        segmentReader.toSegmentDirectory().getColumnsWithIndex(ColumnIndexType.FORWARD_INDEX);
+    Set<String> existingForwardIndexDisabledColumns = new HashSet<>();
+    for (String column : existingAllColumns) {
+      if (!existingForwardIndexColumns.contains(column)) {
+        existingForwardIndexDisabledColumns.add(column);
+      }
+    }
+
     // From new column config.
     Set<String> newNoDictColumns = _indexLoadingConfig.getNoDictionaryColumns();
+    Set<String> newForwardIndexDisabledColumns = _indexLoadingConfig.getForwardIndexDisabledColumns();
 
     for (String column : existingAllColumns) {
-      if (existingNoDictColumns.contains(column) && !newNoDictColumns.contains(column)) {
+      if (existingForwardIndexColumns.contains(column) && newForwardIndexDisabledColumns != null

Review Comment:
   newForwardIndexDisabledColumns != null this null check doesn't seem needed here. 



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/bloomfilter/BloomFilterHandler.java:
##########
@@ -111,6 +111,11 @@ public void updateIndices(SegmentDirectory.Writer segmentWriter, IndexCreatorPro
     }
   }
 
+  @Override
+  public void postUpdateIndicesCleanup(SegmentDirectory.Writer segmentWriter)

Review Comment:
   Make this method a default method in the factory class and only implement it in ForwardIndexHandler. This will avoid having to create an empty method in all indexHandlers?



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java:
##########
@@ -120,12 +128,47 @@ public void updateIndices(SegmentDirectory.Writer segmentWriter, IndexCreatorPro
           createDictBasedForwardIndex(column, segmentWriter, indexCreatorProvider);
           break;
         }
+        case DELETE_FORWARD_INDEX: {
+          // Deletion of the forward index will be handled outside the index handler to ensure that other index
+          // handlers that need the forward index to construct their own indexes will have it available.
+          // The existing forward index must be in dictionary format for this to be a no-op.
+          _forwardIndexDisabledColumnsToCleanup.add(column);
+          break;
+        }
+        case CREATE_TEMP_DICTIONARY_BASED_FORWARD_INDEX_FROM_EXISTING_RAW_FORWARD_INDEX: {
+          // The forward index has been disabled for a column which has a noDictionary based forward index. A dictionary
+          // and inverted index need to be created before we can delete the forward index. We create a dictionary here,
+          // but let the InvertedIndexHandler handle the creation of the inverted index. We create a temporary
+          // forward index here which is dictionary based and allow the post deletion step handle the actual deletion
+          // of the forward index.
+          createDictBasedForwardIndex(column, segmentWriter, indexCreatorProvider);
+          Preconditions.checkState(segmentWriter.hasIndexFor(column, ColumnIndexType.FORWARD_INDEX),
+              String.format("Temporary forward index was not created for column: %s", column));
+          _forwardIndexDisabledColumnsToCleanup.add(column);
+          break;
+        }
         default:
           throw new IllegalStateException("Unsupported operation for column " + column);
       }
     }
   }
 
+  @Override
+  public void postUpdateIndicesCleanup(SegmentDirectory.Writer segmentWriter)
+      throws Exception {
+    // Delete the forward index for columns which have it disabled. Perform this as a post-processing step after all
+    // IndexHandlers have updated their indexes as some of them need to temporarily create a forward index to
+    // generate other indexes off of.
+    for (String column : _forwardIndexDisabledColumnsToCleanup) {
+      if ((_indexLoadingConfig.getForwardIndexDisabledColumns() != null
+          && _indexLoadingConfig.getForwardIndexDisabledColumns().contains(column))
+          && (_segmentMetadata.getColumnMetadataFor(column) != null
+          && !_segmentMetadata.getColumnMetadataFor(column).isSorted())) {

Review Comment:
   We should probably do this check in computeOperationMap and maybe show a warning instead of doing it here?



##########
pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandlerTest.java:
##########
@@ -460,11 +538,125 @@ public void testComputeOperation()
     assertEquals(operationMap.get(config1.getName()), ForwardIndexHandler.Operation.CHANGE_RAW_INDEX_COMPRESSION_TYPE);
     assertEquals(operationMap.get(config2.getName()), ForwardIndexHandler.Operation.CHANGE_RAW_INDEX_COMPRESSION_TYPE);
 
+    // TEST10: Disable forward index for a column which already has forward index disabled
+    indexLoadingConfig = new IndexLoadingConfig(null, _tableConfig);
+    indexLoadingConfig.getForwardIndexDisabledColumns().add(DIM_SV_FORWARD_INDEX_DISABLED_INTEGER);
+    fwdIndexHandler = new ForwardIndexHandler(existingSegmentMetadata, indexLoadingConfig, null);
+    operationMap = fwdIndexHandler.computeOperation(writer);
+    assertEquals(operationMap, Collections.EMPTY_MAP);
+
+    // TEST11: Disable forward index for a dictionary column with forward index enabled
+    indexLoadingConfig = new IndexLoadingConfig(null, _tableConfig);
+    indexLoadingConfig.getForwardIndexDisabledColumns().add(DIM_DICT_INTEGER);
+    indexLoadingConfig.getInvertedIndexColumns().add(DIM_DICT_INTEGER);
+    fwdIndexHandler = new ForwardIndexHandler(existingSegmentMetadata, indexLoadingConfig, _schema);
+    operationMap = fwdIndexHandler.computeOperation(writer);
+    assertEquals(operationMap.size(), 1);
+    assertEquals(operationMap.get(DIM_DICT_INTEGER), ForwardIndexHandler.Operation.DELETE_FORWARD_INDEX);
+
+    // TEST12: Disable forward index for a dictionary column with forward index enabled, disable dictionary
+    indexLoadingConfig = new IndexLoadingConfig(null, _tableConfig);
+    indexLoadingConfig.getForwardIndexDisabledColumns().add(DIM_LZ4_BYTES);
+    indexLoadingConfig.getNoDictionaryColumns().add(DIM_LZ4_BYTES);
+    indexLoadingConfig.getInvertedIndexColumns().add(DIM_LZ4_BYTES);
+    fwdIndexHandler = new ForwardIndexHandler(existingSegmentMetadata, indexLoadingConfig, _schema);
+    ForwardIndexHandler finalFwdIndexHandler = fwdIndexHandler;
+    assertThrows(IllegalStateException.class, () -> finalFwdIndexHandler.computeOperation(writer));
+
+    // TEST13: Disable forward index for a raw column with forward index enabled and enable dictionary

Review Comment:
   (nit): enable dictionary and inverted index.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org