You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by am...@apache.org on 2022/01/28 17:15:19 UTC

[ignite] 01/03: Add test when incomplete index tree wasn't destroyed together with the cache.

This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a commit to branch ignite-16426
in repository https://gitbox.apache.org/repos/asf/ignite.git

commit 37fee68a28714ab119ae2ea95d7a16b3215fb7b1
Author: Andrew Mashenkov <an...@gmail.com>
AuthorDate: Fri Jan 28 17:46:07 2022 +0300

    Add test when incomplete index tree wasn't destroyed together with the cache.
---
 .../cache/index/ResumeCreateIndexTest.java         | 155 ++++++++++++++++++++-
 1 file changed, 154 insertions(+), 1 deletion(-)

diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/ResumeCreateIndexTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/ResumeCreateIndexTest.java
index d352b7e..3a36801 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/ResumeCreateIndexTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/ResumeCreateIndexTest.java
@@ -24,12 +24,19 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
 import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.client.Person;
+import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.cache.query.index.sorted.DurableBackgroundCleanupIndexTreeTaskV2;
+import org.apache.ignite.internal.cache.query.index.sorted.DurableBackgroundCleanupIndexTreeTaskV2.InlineIndexTreeFactory;
+import org.apache.ignite.internal.cache.query.index.sorted.inline.InlineIndexTree;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
 import org.apache.ignite.internal.processors.cache.index.IndexingTestUtils.BreakBuildIndexConsumer;
 import org.apache.ignite.internal.processors.cache.index.IndexingTestUtils.SlowdownBuildIndexConsumer;
 import org.apache.ignite.internal.processors.cache.index.IndexingTestUtils.StopBuildIndexConsumer;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
+import org.apache.ignite.internal.processors.cache.persistence.RootPage;
 import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointListener;
 import org.apache.ignite.internal.processors.query.QueryIndexDescriptorImpl;
 import org.apache.ignite.internal.processors.query.QueryIndexKey;
@@ -41,6 +48,9 @@ import org.apache.ignite.testframework.junits.WithSystemProperty;
 import org.junit.Test;
 
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_INDEX_REBUILD_BATCH_SIZE;
+import static org.apache.ignite.internal.cache.query.index.sorted.DurableBackgroundCleanupIndexTreeTaskV2.idxTreeFactory;
+import static org.apache.ignite.internal.processors.cache.index.IgniteH2IndexingEx.addIdxCreateCacheRowConsumer;
+import static org.apache.ignite.internal.processors.cache.index.IndexingTestUtils.nodeName;
 import static org.apache.ignite.internal.processors.query.aware.IndexBuildStatusHolder.Status.COMPLETE;
 import static org.apache.ignite.internal.processors.query.aware.IndexBuildStatusHolder.Status.INIT;
 import static org.apache.ignite.internal.processors.query.aware.IndexBuildStatusStorage.KEY_PREFIX;
@@ -53,14 +63,39 @@ import static org.apache.ignite.testframework.GridTestUtils.runAsync;
  */
 @WithSystemProperty(key = IGNITE_INDEX_REBUILD_BATCH_SIZE, value = "1")
 public class ResumeCreateIndexTest extends AbstractRebuildIndexTest {
+    /** Original {@link DurableBackgroundCleanupIndexTreeTaskV2#idxTreeFactory}. */
+    private InlineIndexTreeFactory originalTaskIdxTreeFactory;
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        originalTaskIdxTreeFactory = idxTreeFactory;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("AssignmentToStaticFieldFromInstanceMethod")
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        idxTreeFactory = originalTaskIdxTreeFactory;
+    }
+
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
         return super.getConfiguration(igniteInstanceName)
             .setCacheConfiguration(
-                cacheCfg(DEFAULT_CACHE_NAME, null).setAffinity(new RendezvousAffinityFunction(false, 1))
+                cacheConfig(DEFAULT_CACHE_NAME)
             );
     }
 
+    /** */
+    private <K,V> CacheConfiguration<K, V> cacheConfig(String cacheName) {
+        CacheConfiguration<K, V> ccfg = cacheCfg(cacheName, "GRP");
+
+        return ccfg.setAffinity(new RendezvousAffinityFunction(false, 1));
+    }
+
     /**
      * Checking the general flow of building a new index.
      *
@@ -328,6 +363,124 @@ public class ResumeCreateIndexTest extends AbstractRebuildIndexTest {
     }
 
     /**
+     * Checks that incomplete index is destroyed.
+     * <p>
+     * Scenario:
+     * <ol>
+     * <li> Create 2 caches in same cache group. The second one is required to prevent whole group destroy
+     * when the first one will be dropped.
+     * <li> Populate caches with some data.
+     * <li> Create new index, wait for few data being indexed and halt operation (or prevent somehow from being finished)
+     * <li> Drop cache with the incomplete index.
+     * <li> Create cache with the same name (but without index) and populate with the data.
+     * </ol>
+     * <p>
+     * TODO: Index tree must be removed from persistence on cache drop.
+     * TODO: And the put operation must not failed with corrupted tree error.
+     *
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings("AssignmentToStaticFieldFromInstanceMethod")
+    @Test
+    public void testIncompleteIndexDroppedOnCacheDestroy() throws Exception {
+        final String cacheName = DEFAULT_CACHE_NAME;
+        final int cacheSize = 10_000;
+
+        IgniteEx n = prepareNodeToCreateNewIndex(cacheName, cacheSize, false);
+
+        // Add one more cache to keep CacheGroup non-empty when the first cache will be destroyed during test.
+        populate(n.getOrCreateCache(cacheConfig(cacheName + "2")), 1);
+
+        IgniteEx cli = startClientGrid(1);
+
+        String idxName = "IDX0";
+        StopBuildIndexConsumer failBuildIndexConsumer = new FailBuildIndexConsumer(getTestTimeout(), 1000);
+        addIdxCreateCacheRowConsumer(nodeName(n), idxName, failBuildIndexConsumer);
+
+        IgniteInternalFuture<List<List<?>>> createIdxFut = createIdxAsync(cli.cache(cacheName), idxName);
+
+        GridFutureAdapter<Object> startCleanupFut = new GridFutureAdapter<>();
+        idxTreeFactory = treeFactory(startCleanupFut);
+
+        failBuildIndexConsumer.startBuildIdxFut.get(getTestTimeout());
+        checkInitStatus(n, cacheName, false, 1);
+        failBuildIndexConsumer.finishBuildIdxFut.onDone();
+
+        cli.cache(cacheName).destroy();
+        assertTrue(createIdxFut.isDone());
+        startCleanupFut.get(getTestTimeout());
+
+        cli.createCache(cacheConfig(cacheName));
+        populate(n.cache(cacheName), cacheSize); // <-- Failed here due to index tree wasn't drop with the old cache.
+
+        checkCompletedStatus(n, cacheName);
+
+        StopBuildIndexConsumer slowdownBuildIndexConsumer = addSlowdownIdxCreateConsumer(n, idxName, 0);
+        createIdxFut = createIdxAsync(cli.cache(cacheName), idxName);
+
+        slowdownBuildIndexConsumer.startBuildIdxFut.get(getTestTimeout());
+        checkInitStatus(n, cacheName, false, 1);
+        slowdownBuildIndexConsumer.finishBuildIdxFut.onDone();
+
+        createIdxFut.get(getTestTimeout());
+
+        checkCompletedStatus(n, cacheName);
+        assertTrue(allIndexes(n).containsKey(new QueryIndexKey(cacheName, idxName)));
+
+        assertEquals(cacheSize, selectPersonByName(n.cache(cacheName)).size());
+    }
+
+    /** */
+    private InlineIndexTreeFactory treeFactory(
+        GridFutureAdapter<Object> startFut
+    ) {
+        return new DurableBackgroundCleanupIndexTreeTaskV2.InlineIndexTreeFactory() {
+            /** {@inheritDoc} */
+            @Override protected InlineIndexTree create(
+                CacheGroupContext grpCtx,
+                RootPage rootPage,
+                String treeName
+            ) throws IgniteCheckedException {
+                    startFut.onDone();
+
+                return super.create(grpCtx, rootPage, treeName);
+            }
+        };
+    }
+
+    /**
+     * Consumer that fails building indexes of cache.
+     */
+    static class FailBuildIndexConsumer extends StopBuildIndexConsumer {
+        /** Number of rows to add before slowdown. */
+        private final int cnt;
+
+        /**
+         * Constructor.
+         *
+         * @param timeout The maximum time to wait finish future in milliseconds.
+         * @param cnt Amount of rows to be added before failure.
+         */
+        FailBuildIndexConsumer(long timeout, int cnt) {
+            super(timeout);
+
+            this.cnt = cnt;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void accept(CacheDataRow row) throws IgniteCheckedException {
+            if (visitCnt.incrementAndGet() < cnt)
+                return;
+
+            startBuildIdxFut.onDone();
+
+            finishBuildIdxFut.get(timeout);
+
+            throw new IgniteCheckedException("test");
+        }
+    }
+
+    /**
      * Asynchronous creation of a new index for the cache of {@link Person}.
      * SQL: CREATE INDEX " + idxName + " ON Person(name)
      *