You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by dp...@apache.org on 2019/03/26 17:09:55 UTC

[ignite] branch ignite-2.7.5 updated: IGNITE-10873 CorruptedTreeException during simultaneous cache put operations - Fixes #6128.

This is an automated email from the ASF dual-hosted git repository.

dpavlov pushed a commit to branch ignite-2.7.5
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/ignite-2.7.5 by this push:
     new 917c7e7  IGNITE-10873 CorruptedTreeException during simultaneous cache put operations - Fixes #6128.
917c7e7 is described below

commit 917c7e75420e2b756afbc72481bfd43630183266
Author: Ivan Rakov <ir...@apache.org>
AuthorDate: Thu Feb 21 11:43:59 2019 +0300

    IGNITE-10873 CorruptedTreeException during simultaneous cache put operations - Fixes #6128.
    
    Signed-off-by: Ivan Rakov <ir...@apache.org>
    
    (cherry picked from commit 81e5bee1f7ece600ba6c8e5a4bb1dcf792adcb89)
---
 .../processors/query/GridQueryProcessor.java       |  92 +++----
 .../verify/VisorValidateIndexesJobResult.java      |   8 +
 ...xingMultithreadedLoadContinuousRestartTest.java | 282 +++++++++++++++++++++
 .../testsuites/IgnitePdsWithIndexingTestSuite.java |   2 +
 4 files changed, 331 insertions(+), 53 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index accccbb..081b43d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -1847,6 +1847,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
         assert cctx != null;
         assert newRow != null;
         assert prevRowAvailable || prevRow == null;
+        // No need to acquire busy lock here - operation is protected by GridCacheQueryManager.busyLock
 
         KeyCacheObject key = newRow.key();
         CacheObject val = newRow.value();
@@ -1857,56 +1858,48 @@ public class GridQueryProcessor extends GridProcessorAdapter {
         if (idx == null)
             return;
 
-        if (!busyLock.enterBusy())
-            throw new NodeStoppingException("Operation has been cancelled (node is stopping).");
-
-        try {
-            String cacheName = cctx.name();
+        String cacheName = cctx.name();
 
-            CacheObjectContext coctx = cctx.cacheObjectContext();
+        CacheObjectContext coctx = cctx.cacheObjectContext();
 
-            QueryTypeDescriptorImpl desc = typeByValue(cacheName, coctx, key, val, true);
+        QueryTypeDescriptorImpl desc = typeByValue(cacheName, coctx, key, val, true);
 
-            if (prevRowAvailable && prevRow != null) {
-                QueryTypeDescriptorImpl prevValDesc = typeByValue(cacheName,
-                    coctx,
-                    key,
-                    prevRow.value(),
-                    false);
+        if (prevRowAvailable && prevRow != null) {
+            QueryTypeDescriptorImpl prevValDesc = typeByValue(cacheName,
+                coctx,
+                key,
+                prevRow.value(),
+                false);
 
-                if (prevValDesc != desc) {
-                    if (prevValDesc != null)
-                        idx.remove(cctx, prevValDesc, prevRow);
+            if (prevValDesc != desc) {
+                if (prevValDesc != null)
+                    idx.remove(cctx, prevValDesc, prevRow);
 
-                    // Row has already been removed from another table indexes
-                    prevRow = null;
-                }
+                // Row has already been removed from another table indexes
+                prevRow = null;
             }
+        }
 
-            if (desc == null) {
-                int typeId = ctx.cacheObjects().typeId(val);
+        if (desc == null) {
+            int typeId = ctx.cacheObjects().typeId(val);
 
-                long missedCacheTypeKey = missedCacheTypeKey(cacheName, typeId);
+            long missedCacheTypeKey = missedCacheTypeKey(cacheName, typeId);
 
-                if (!missedCacheTypes.contains(missedCacheTypeKey)) {
-                    if (missedCacheTypes.add(missedCacheTypeKey)) {
-                        LT.warn(log, "Key-value pair is not inserted into any SQL table [cacheName=" + cacheName +
-                            ", " + describeTypeMismatch(cacheName, val) + "]");
+            if (!missedCacheTypes.contains(missedCacheTypeKey)) {
+                if (missedCacheTypes.add(missedCacheTypeKey)) {
+                    LT.warn(log, "Key-value pair is not inserted into any SQL table [cacheName=" + cacheName +
+                        ", " + describeTypeMismatch(cacheName, val) + "]");
 
-                        LT.warn(log, "  ^-- Value type(s) are specified via CacheConfiguration.indexedTypes or CacheConfiguration.queryEntities");
-                        LT.warn(log, "  ^-- Make sure that same type(s) used when adding Object or BinaryObject to cache");
-                        LT.warn(log, "  ^-- Otherwise, entries will be stored in cache, but not appear as SQL Table rows");
-                    }
+                    LT.warn(log, "  ^-- Value type(s) are specified via CacheConfiguration.indexedTypes or CacheConfiguration.queryEntities");
+                    LT.warn(log, "  ^-- Make sure that same type(s) used when adding Object or BinaryObject to cache");
+                    LT.warn(log, "  ^-- Otherwise, entries will be stored in cache, but not appear as SQL Table rows");
                 }
-
-                return;
             }
 
-            idx.store(cctx, desc, newRow, prevRow, prevRowAvailable);
-        }
-        finally {
-            busyLock.leaveBusy();
+            return;
         }
+
+        idx.store(cctx, desc, newRow, prevRow, prevRowAvailable);
     }
 
     /**
@@ -2573,31 +2566,24 @@ public class GridQueryProcessor extends GridProcessorAdapter {
     public void remove(GridCacheContext cctx, CacheDataRow row)
         throws IgniteCheckedException {
         assert row != null;
+        // No need to acquire busy lock here - operation is protected by GridCacheQueryManager.busyLock
 
         if (log.isDebugEnabled())
-            log.debug("Remove [cacheName=" + cctx.name() + ", key=" + row.key()+ ", val=" + row.value() + "]");
+            log.debug("Remove [cacheName=" + cctx.name() + ", key=" + row.key() + ", val=" + row.value() + "]");
 
         if (idx == null)
             return;
 
-        if (!busyLock.enterBusy())
-            throw new IllegalStateException("Failed to remove from index (grid is stopping).");
+        QueryTypeDescriptorImpl desc = typeByValue(cctx.name(),
+            cctx.cacheObjectContext(),
+            row.key(),
+            row.value(),
+            false);
 
-        try {
-            QueryTypeDescriptorImpl desc = typeByValue(cctx.name(),
-                cctx.cacheObjectContext(),
-                row.key(),
-                row.value(),
-                false);
-
-            if (desc == null)
-                return;
+        if (desc == null)
+            return;
 
-                idx.remove(cctx, desc, row);
-        }
-        finally {
-            busyLock.leaveBusy();
-        }
+        idx.remove(cctx, desc, row);
     }
 
     /**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/VisorValidateIndexesJobResult.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/VisorValidateIndexesJobResult.java
index aa74323..afd985f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/VisorValidateIndexesJobResult.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/VisorValidateIndexesJobResult.java
@@ -92,4 +92,12 @@ public class VisorValidateIndexesJobResult extends VisorDataTransferObject {
     @Override public String toString() {
         return S.toString(VisorValidateIndexesJobResult.class, this);
     }
+
+    /**
+     * @return {@code true} If any indexes issues found on node, otherwise returns {@code false}.
+     */
+    public boolean hasIssues() {
+        return (partRes != null && partRes.entrySet().stream().anyMatch(e -> !e.getValue().issues().isEmpty())) ||
+            (idxRes != null && idxRes.entrySet().stream().anyMatch(e -> !e.getValue().issues().isEmpty()));
+    }
 }
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IndexingMultithreadedLoadContinuousRestartTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IndexingMultithreadedLoadContinuousRestartTest.java
new file mode 100644
index 0000000..2b5d882
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IndexingMultithreadedLoadContinuousRestartTest.java
@@ -0,0 +1,282 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*      http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.ignite.internal.processors.cache.persistence.db;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.QueryIndex;
+import org.apache.ignite.cache.QueryIndexType;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.visor.verify.ValidateIndexesClosure;
+import org.apache.ignite.internal.visor.verify.VisorValidateIndexesJobResult;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+/**
+ * Tests that continuous non-graceful node stop under load doesn't break SQL indexes.
+ */
+public class IndexingMultithreadedLoadContinuousRestartTest extends GridCommonAbstractTest {
+    /** Cache name. */
+    private static final String CACHE_NAME = "test-cache-name";
+
+    /** Restarts. */
+    private static final int RESTARTS = 5;
+
+    /** Load threads. */
+    private static final int THREADS = 5;
+
+    /** Load loop cycles. */
+    private static final int LOAD_LOOP = 5000;
+
+    /** Key bound. */
+    private static final int KEY_BOUND = 1000;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        QueryEntity qryEntity = new QueryEntity();
+        qryEntity.setKeyType(UserKey.class.getName());
+        qryEntity.setValueType(UserValue.class.getName());
+        qryEntity.setTableName("USER_TEST_TABLE");
+        qryEntity.setKeyFields(new HashSet<>(Arrays.asList("a", "b", "c")));
+
+        LinkedHashMap<String, String> fields = new LinkedHashMap<>();
+        fields.put("a", "java.lang.Integer");
+        fields.put("b", "java.lang.Integer");
+        fields.put("c", "java.lang.Integer");
+        fields.put("x", "java.lang.Integer");
+        fields.put("y", "java.lang.Integer");
+        fields.put("z", "java.lang.Integer");
+        qryEntity.setFields(fields);
+
+        QueryIndex idx1 = new QueryIndex();
+        idx1.setName("IDX_1");
+        idx1.setIndexType(QueryIndexType.SORTED);
+        LinkedHashMap<String, Boolean> idxFields = new LinkedHashMap<>();
+        idxFields.put("a", false);
+        idxFields.put("b", false);
+        idxFields.put("c", false);
+        idx1.setFields(idxFields);
+
+        QueryIndex idx2 = new QueryIndex();
+        idx2.setName("IDX_2");
+        idx2.setIndexType(QueryIndexType.SORTED);
+        idxFields = new LinkedHashMap<>();
+        idxFields.put("x", false);
+        idx2.setFields(idxFields);
+
+        QueryIndex idx3 = new QueryIndex();
+        idx3.setName("IDX_3");
+        idx3.setIndexType(QueryIndexType.SORTED);
+        idxFields = new LinkedHashMap<>();
+        idxFields.put("y", false);
+        idx3.setFields(idxFields);
+
+        QueryIndex idx4 = new QueryIndex();
+        idx4.setName("IDX_4");
+        idx4.setIndexType(QueryIndexType.SORTED);
+        idxFields = new LinkedHashMap<>();
+        idxFields.put("z", false);
+        idx4.setFields(idxFields);
+
+        qryEntity.setIndexes(Arrays.asList(idx1, idx2, idx3, idx4));
+
+        cfg.setCacheConfiguration(new CacheConfiguration<Integer, Integer>()
+            .setAffinity(new RendezvousAffinityFunction(false, 32))
+            .setName(CACHE_NAME)
+            .setQueryEntities(Collections.singleton(qryEntity)));
+
+        cfg.setDataStorageConfiguration(
+            new DataStorageConfiguration()
+                .setDefaultDataRegionConfiguration(
+                    new DataRegionConfiguration()
+                        .setPersistenceEnabled(true)
+                        .setInitialSize(200L * 1024 * 1024)
+                        .setMaxSize(200L * 1024 * 1024)
+                )
+        );
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        cleanPersistenceDir();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        cleanPersistenceDir();
+    }
+
+    /**
+     * Tests that continuous non-graceful node stop under load doesn't break SQL indexes.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void test() throws Exception {
+        for (int i = 0; i < RESTARTS; i++) {
+            IgniteEx ignite = startGrid(0);
+
+            ignite.cluster().active(true);
+
+            // Ensure that checkpoint isn't running - otherwise validate indexes task may fail.
+            forceCheckpoint();
+
+            // Validate indexes on start.
+            ValidateIndexesClosure clo = new ValidateIndexesClosure(Collections.singleton(CACHE_NAME), 0, 0);
+            ignite.context().resource().injectGeneric(clo);
+            VisorValidateIndexesJobResult res = clo.call();
+
+            assertFalse(res.hasIssues());
+
+            IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Runnable() {
+                @Override public void run() {
+                    IgniteCache<UserKey, UserValue> cache = ignite.cache(CACHE_NAME);
+
+                    int i = 0;
+                    try {
+                        for (; i < LOAD_LOOP; i++) {
+                            ThreadLocalRandom r = ThreadLocalRandom.current();
+
+                            Integer keySeed = r.nextInt(KEY_BOUND);
+                            UserKey key = new UserKey(keySeed);
+
+                            if (r.nextBoolean())
+                                cache.put(key, new UserValue(r.nextLong()));
+                            else
+                                cache.remove(key);
+                        }
+
+                        ignite.close(); // Intentionally stop grid while another loaders are still in progress.
+                    }
+                    catch (Exception e) {
+                        log.warning("Failed to update cache after " + i + " loop cycles", e);
+                    }
+                }
+            }, THREADS, "loader");
+
+            fut.get();
+
+            ignite.close();
+        }
+    }
+
+    /**
+     * User key.
+     */
+    private static class UserKey {
+        /** A. */
+        private int a;
+
+        /** B. */
+        private int b;
+
+        /** C. */
+        private int c;
+
+        /**
+         * @param a A.
+         * @param b B.
+         * @param c C.
+         */
+        public UserKey(int a, int b, int c) {
+            this.a = a;
+            this.b = b;
+            this.c = c;
+        }
+
+        /**
+         * @param seed Seed.
+         */
+        public UserKey(long seed) {
+            a = (int)(seed % 17);
+            b = (int)(seed % 257);
+            c = (int)(seed % 3001);
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return "UserKey{" +
+                "a=" + a +
+                ", b=" + b +
+                ", c=" + c +
+                '}';
+        }
+    }
+
+    /**
+     * User value.
+     */
+    private static class UserValue {
+        /** X. */
+        private int x;
+
+        /** Y. */
+        private int y;
+
+        /** Z. */
+        private int z;
+
+        /**
+         * @param x X.
+         * @param y Y.
+         * @param z Z.
+         */
+        public UserValue(int x, int y, int z) {
+            this.x = x;
+            this.y = y;
+            this.z = z;
+        }
+
+        /**
+         * @param seed Seed.
+         */
+        public UserValue(long seed) {
+            x = (int)(seed % 6991);
+            y = (int)(seed % 18679);
+            z = (int)(seed % 31721);
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return "UserValue{" +
+                "x=" + x +
+                ", y=" + y +
+                ", z=" + z +
+                '}';
+        }
+    }
+}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingTestSuite.java
index 88e3fe7..47d2346 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingTestSuite.java
@@ -21,6 +21,7 @@ import junit.framework.TestSuite;
 import org.apache.ignite.internal.processors.cache.IgnitePdsSingleNodeWithIndexingAndGroupPutGetPersistenceSelfTest;
 import org.apache.ignite.internal.processors.cache.IgnitePdsSingleNodeWithIndexingPutGetPersistenceTest;
 import org.apache.ignite.internal.processors.cache.persistence.db.IgniteTcBotInitNewPageTest;
+import org.apache.ignite.internal.processors.cache.persistence.db.IndexingMultithreadedLoadContinuousRestartTest;
 import org.apache.ignite.internal.processors.database.IgniteDbMultiNodeWithIndexingPutGetTest;
 import org.apache.ignite.internal.processors.database.IgniteDbSingleNodeWithIndexingPutGetTest;
 import org.apache.ignite.internal.processors.database.IgniteDbSingleNodeWithIndexingWalRestoreTest;
@@ -48,6 +49,7 @@ public class IgnitePdsWithIndexingTestSuite extends TestSuite {
         suite.addTestSuite(IgnitePersistentStoreQueryWithMultipleClassesPerCacheTest.class);
         suite.addTestSuite(IgniteTwoRegionsRebuildIndexTest.class);
         suite.addTestSuite(IgniteTcBotInitNewPageTest.class);
+        suite.addTestSuite(IndexingMultithreadedLoadContinuousRestartTest.class);
 
         return suite;
     }