You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2022/11/03 09:40:41 UTC

[GitHub] [ignite-3] ibessonov commented on a diff in pull request #1305: IGNITE-18073 Add indexes to API for full rebalance without losing user data in MvPartitionStorage on receiver

ibessonov commented on code in PR #1305:
URL: https://github.com/apache/ignite-3/pull/1305#discussion_r1012671193


##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java:
##########
@@ -311,51 +313,89 @@ public void testMisconfiguredIndices() {
 
     @Test
     public void testStartRebalanceMvPartition() throws Exception {
+        assertThrows(StorageException.class, () -> tableStorage.startRebalanceMvPartition(PARTITION_ID_1));
+
+        UUID hashIndexId = hashIdx.id();
+        UUID sortedIndexId = sortedIdx.id();
+
         MvPartitionStorage partitionStorage = tableStorage.getOrCreateMvPartition(PARTITION_ID);
+        HashIndexStorage hashIndexStorage = tableStorage.getOrCreateHashIndex(PARTITION_ID, hashIndexId);
+        SortedIndexStorage sortedIndexStorage = tableStorage.getOrCreateSortedIndex(PARTITION_ID, sortedIndexId);
+
+        RowId rowId = new RowId(PARTITION_ID);
+
+        TestKey key = new TestKey(0, "0");
+
+        BinaryRow binaryRow = binaryRow(key, new TestValue(1, "1"));
+        IndexRowImpl indexRow = new IndexRowImpl(keyValueBinaryTuple(binaryRow), rowId);
 
         partitionStorage.runConsistently(() -> {
-            partitionStorage.addWriteCommitted(
-                    new RowId(PARTITION_ID),
-                    binaryRow(new TestKey(0, "0"), new TestValue(1, "1")),
-                    clock.now()
-            );
+            partitionStorage.addWriteCommitted(rowId, binaryRow, clock.now());
 
             partitionStorage.lastAppliedIndex(100);
 
             return null;
         });
 
+        hashIndexStorage.put(indexRow);
+
+        sortedIndexStorage.put(indexRow);
+
         partitionStorage.flush().get(1, TimeUnit.SECONDS);
 
         tableStorage.startRebalanceMvPartition(PARTITION_ID).get(1, TimeUnit.SECONDS);
 
         MvPartitionStorage newPartitionStorage0 = tableStorage.getMvPartition(PARTITION_ID);
+        HashIndexStorage newHashIndexStorage0 = tableStorage.getOrCreateHashIndex(PARTITION_ID, hashIndexId);
+        SortedIndexStorage newSortedIndexStorage0 = tableStorage.getOrCreateSortedIndex(PARTITION_ID, sortedIndexId);
 
         assertNotNull(newPartitionStorage0);
+        assertNotNull(newHashIndexStorage0);
+        assertNotNull(newSortedIndexStorage0);
+
         assertNotSame(partitionStorage, newPartitionStorage0);
+        assertNotSame(hashIndexStorage, newHashIndexStorage0);

Review Comment:
   Why do you assert things like these? They look completely wrong. What if we have a partition listener that has a cached value of MvPartitionStorage? Should it invalidate it and ask for another one?
   Old instances must work. Please fix the test and the test implementation



##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvTableStorage.java:
##########
@@ -40,50 +37,27 @@
  * Test table storage implementation.
  */
 public class TestMvTableStorage implements MvTableStorage {
-    private final Map<Integer, MvPartitionStorage> partitions = new ConcurrentHashMap<>();
+    private final Map<Integer, Storages> storagesByPartitionId = new ConcurrentHashMap<>();
 
-    private final Map<Integer, MvPartitionStorage> backupPartitions = new ConcurrentHashMap<>();
-
-    private final Map<UUID, SortedIndices> sortedIndicesById = new ConcurrentHashMap<>();
-
-    private final Map<UUID, HashIndices> hashIndicesById = new ConcurrentHashMap<>();
+    private final Map<Integer, Storages> backupStoragesByPartitionId = new ConcurrentHashMap<>();
 
     private final TableConfiguration tableCfg;
 
     private final TablesConfiguration tablesCfg;
 
     /**
-     * Class for storing Sorted Indices for a particular partition.
+     * Storages container for the partition.
      */
-    private static class SortedIndices {
-        private final SortedIndexDescriptor descriptor;
-
-        final Map<Integer, SortedIndexStorage> storageByPartitionId = new ConcurrentHashMap<>();
-
-        SortedIndices(SortedIndexDescriptor descriptor) {
-            this.descriptor = descriptor;
+    private static class Storages {

Review Comment:
   Would it be better to store indexes in the "TestMvPartitionStorage" instance? What's the ultimate purpose of a separate class?



##########
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/MvTableStorage.java:
##########
@@ -116,16 +116,18 @@ default IndexStorage getOrCreateIndex(int partitionId, UUID indexId) {
      * @throws StorageException If the given partition does not exist, or the given index does not exist or is not configured as a
      *         hash index.
      */
-    HashIndexStorage getOrCreateHashIndex(int partitionId, UUID indexId);
+    HashIndexStorage getOrCreateHashIndex(int partitionId, UUID indexId) throws StorageException;
 
     /**
      * Destroys the index under the given name and all data in it.
      *
      * <p>This method is a no-op if the index under the given name does not exist.
      *
+     * @param partitionId Partition ID.
      * @param indexId Index ID.
+     * @throws StorageException If the given partition does not exist, or fail destroying the index.
      */
-    CompletableFuture<Void> destroyIndex(UUID indexId);
+    CompletableFuture<Void> destroyIndex(int partitionId, UUID indexId) throws StorageException;

Review Comment:
   Why did you add a partition id? What does it mean? I don't understand it.



##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvTableStorage.java:
##########
@@ -93,62 +67,59 @@ public TestMvTableStorage(TableConfiguration tableCfg, TablesConfiguration table
     }
 
     @Override
-    public MvPartitionStorage getOrCreateMvPartition(int partitionId) throws StorageException {
-        return partitions.computeIfAbsent(partitionId, TestMvPartitionStorage::new);
+    public TestMvPartitionStorage getOrCreateMvPartition(int partitionId) throws StorageException {
+        return storagesByPartitionId
+                .computeIfAbsent(partitionId, partId -> new Storages(new TestMvPartitionStorage(partId)))
+                .mvPartitionStorage;
     }
 
     @Override
     @Nullable
-    public MvPartitionStorage getMvPartition(int partitionId) {
-        return partitions.get(partitionId);
+    public TestMvPartitionStorage getMvPartition(int partitionId) {
+        Storages storages = storagesByPartitionId.get(partitionId);
+
+        return storages == null ? null : storages.mvPartitionStorage;
     }
 
     @Override
     public void destroyPartition(int partitionId) throws StorageException {
-        partitions.remove(partitionId);
-
-        sortedIndicesById.values().forEach(indices -> indices.storageByPartitionId.remove(partitionId));
-        hashIndicesById.values().forEach(indices -> indices.storageByPartitionId.remove(partitionId));
+        storagesByPartitionId.remove(partitionId);
     }
 
     @Override
-    public SortedIndexStorage getOrCreateSortedIndex(int partitionId, UUID indexId) {
-        if (!partitions.containsKey(partitionId)) {
-            throw new StorageException("Partition ID " + partitionId + " does not exist");
-        }
+    public TestSortedIndexStorage getOrCreateSortedIndex(int partitionId, UUID indexId) throws StorageException {
+        Storages storages = storagesByPartitionId.get(partitionId);
 
-        SortedIndices sortedIndices = sortedIndicesById.computeIfAbsent(
-                indexId,
-                id -> new SortedIndices(new SortedIndexDescriptor(id, tablesCfg.value()))
-        );
+        checkPartitionStoragesExists(storages, partitionId);

Review Comment:
   Should we lazily create a partition? What does the documentation say? I believe that some of storages have different behavior, I don't like that



##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvTableStorage.java:
##########
@@ -180,33 +151,51 @@ public void destroy() throws StorageException {
     }
 
     @Override
-    public CompletableFuture<Void> startRebalanceMvPartition(int partitionId) {
-        MvPartitionStorage oldPartitionStorage = partitions.get(partitionId);
+    public CompletableFuture<Void> startRebalanceMvPartition(int partitionId) throws StorageException {
+        Storages oldStorages = storagesByPartitionId.get(partitionId);
+
+        checkPartitionStoragesExists(oldStorages, partitionId);
 
-        assert oldPartitionStorage != null : "Partition does not exist: " + partitionId;
+        if (backupStoragesByPartitionId.putIfAbsent(partitionId, oldStorages) == null) {
+            Storages newStorages = new Storages(new TestMvPartitionStorage(partitionId));
 
-        if (backupPartitions.putIfAbsent(partitionId, oldPartitionStorage) == null) {
-            partitions.put(partitionId, new TestMvPartitionStorage(partitionId));
+            oldStorages.hashIndexes.keySet().forEach(indexId -> newStorages.hashIndexes.put(

Review Comment:
   Creating indexes is not mandatory, as far as I see. Rebalance itself will create indexes that it needs



##########
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/MvTableStorage.java:
##########
@@ -164,43 +166,47 @@ default IndexStorage getOrCreateIndex(int partitionId, UUID indexId) {
     void destroy() throws StorageException;
 
     /**
-     * Prepares the partition storage for rebalancing: makes a backup of the current partition storage and creates a new storage.
+     * Prepares partition and index storages for rebalancing: makes backup of the partition storage and its index storages (hash and sorted)
+     * then creates new storages.
      *
-     * <p>This method must be called before every full rebalance of the partition storage, so that in case of errors or cancellation of the
-     * full rebalance, we can restore the partition storage from the backup.
+     * <p>This method must be called before every full rebalance, so that in case of errors or cancellation of the full rebalance, we can
+     * restore the partition storage and its index storages from the backup.
      *
      * <p>Full rebalance will be completed when one of the methods is called:
      * <ol>
      *     <li>{@link #abortRebalanceMvPartition(int)} - in case of a full rebalance cancellation or failure, so that we can
-     *     restore the partition storage from a backup;</li>
+     *     restore thepartition storage and its index storages from a backup;</li>

Review Comment:
   ```suggestion
        *     restore the partition storage and its index storages from a backup;</li>
   ```



##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java:
##########
@@ -311,51 +313,89 @@ public void testMisconfiguredIndices() {
 
     @Test
     public void testStartRebalanceMvPartition() throws Exception {
+        assertThrows(StorageException.class, () -> tableStorage.startRebalanceMvPartition(PARTITION_ID_1));
+
+        UUID hashIndexId = hashIdx.id();
+        UUID sortedIndexId = sortedIdx.id();
+
         MvPartitionStorage partitionStorage = tableStorage.getOrCreateMvPartition(PARTITION_ID);
+        HashIndexStorage hashIndexStorage = tableStorage.getOrCreateHashIndex(PARTITION_ID, hashIndexId);
+        SortedIndexStorage sortedIndexStorage = tableStorage.getOrCreateSortedIndex(PARTITION_ID, sortedIndexId);
+
+        RowId rowId = new RowId(PARTITION_ID);
+
+        TestKey key = new TestKey(0, "0");
+
+        BinaryRow binaryRow = binaryRow(key, new TestValue(1, "1"));
+        IndexRowImpl indexRow = new IndexRowImpl(keyValueBinaryTuple(binaryRow), rowId);
 
         partitionStorage.runConsistently(() -> {
-            partitionStorage.addWriteCommitted(
-                    new RowId(PARTITION_ID),
-                    binaryRow(new TestKey(0, "0"), new TestValue(1, "1")),
-                    clock.now()
-            );
+            partitionStorage.addWriteCommitted(rowId, binaryRow, clock.now());
 
             partitionStorage.lastAppliedIndex(100);
 
             return null;
         });
 
+        hashIndexStorage.put(indexRow);

Review Comment:
   Why are you doing it without "runConsistently"? All writes should be inside of the closure.
   And index updates should be in the same closure as writes, aren't they? It's irrelevant for this particular test, but as a general rule it's fine, why violating it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org