You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2020/01/30 12:16:02 UTC

[ignite] 02/02: IGNITE-12593 Fix assertion during CacheDataTree update caused by byte array values and TTL - Fixes #7324.

This is an automated email from the ASF dual-hosted git repository.

agoncharuk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git

commit 4593d6c11005ee5fcad70669e1ea9d8e0bafc7e5
Author: Anton Kalashnikov <ka...@yandex.ru>
AuthorDate: Thu Jan 30 14:08:00 2020 +0300

    IGNITE-12593 Fix assertion during CacheDataTree update caused by byte array values and TTL - Fixes #7324.
    
    Signed-off-by: Alexey Goncharuk <al...@gmail.com>
---
 .../processors/cache/GridCacheMapEntry.java        |   2 +
 .../cache/distributed/dht/GridDhtCacheAdapter.java |   9 +-
 .../cache/transactions/IgniteTxManager.java        |  39 ++--
 .../IgnitePdsWithTtlDeactivateOnHighloadTest.java  | 260 +++++++++++++++++++++
 .../ignite/testsuites/IgnitePdsTestSuite2.java     |   3 +
 5 files changed, 295 insertions(+), 18 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 1ad7eec..97d4fb6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -6037,6 +6037,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                         entry.deletedUnlocked(false);
                 }
             }
+            else if (oldVal != null && entry.deletedUnlocked())
+                entry.deletedUnlocked(false);
 
             CacheInvokeEntry<Object, Object> invokeEntry = null;
             IgniteBiTuple<Object, Exception> invokeRes = null;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index 139c67c..6a45ec5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.processors.cache.distributed.dht;
 
+import javax.cache.Cache;
+import javax.cache.expiry.ExpiryPolicy;
 import java.io.Externalizable;
 import java.util.Collection;
 import java.util.Collections;
@@ -28,8 +30,6 @@ import java.util.NoSuchElementException;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import javax.cache.Cache;
-import javax.cache.expiry.ExpiryPolicy;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cluster.ClusterNode;
@@ -232,6 +232,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
                 GridCacheEntryEx entry;
 
                 while (true) {
+                    ctx.shared().database().checkpointReadLock();
+
                     try {
                         entry = ctx.dht().entryEx(k);
 
@@ -278,6 +280,9 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
 
                         break;
                     }
+                    finally {
+                        ctx.shared().database().checkpointReadUnlock();
+                    }
                 }
             }
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index ef849cd..f5e8d8b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -1149,31 +1149,38 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
         Collection<IgniteTxEntry> entries = tx.local() ? tx.allEntries() : tx.writeEntries();
 
         for (IgniteTxEntry entry : entries) {
-            GridCacheEntryEx cached = entry.cached();
+            cctx.database().checkpointReadLock();
 
-            GridCacheContext cacheCtx = entry.context();
+            try {
+                GridCacheEntryEx cached = entry.cached();
 
-            if (cached == null)
-                cached = cacheCtx.cache().peekEx(entry.key());
+                GridCacheContext cacheCtx = entry.context();
 
-            if (cached.detached())
-                continue;
+                if (cached == null)
+                    cached = cacheCtx.cache().peekEx(entry.key());
 
-            try {
-                if (cached.obsolete() || cached.markObsoleteIfEmpty(tx.xidVersion()))
-                    cacheCtx.cache().removeEntry(cached);
+                if (cached.detached())
+                    continue;
+
+                try {
+                    if (cached.obsolete() || cached.markObsoleteIfEmpty(tx.xidVersion()))
+                        cacheCtx.cache().removeEntry(cached);
 
-                if (!tx.near() && isNearEnabled(cacheCtx)) {
-                    GridNearCacheAdapter near = cacheCtx.isNear() ? cacheCtx.near() : cacheCtx.dht().near();
+                    if (!tx.near() && isNearEnabled(cacheCtx)) {
+                        GridNearCacheAdapter near = cacheCtx.isNear() ? cacheCtx.near() : cacheCtx.dht().near();
 
-                    GridNearCacheEntry e = near.peekExx(entry.key());
+                        GridNearCacheEntry e = near.peekExx(entry.key());
 
-                    if (e != null && e.markObsoleteIfEmpty(null))
-                        near.removeEntry(e);
+                        if (e != null && e.markObsoleteIfEmpty(null))
+                            near.removeEntry(e);
+                    }
+                }
+                catch (IgniteCheckedException e) {
+                    U.error(log, "Failed to remove obsolete entry from cache: " + cached, e);
                 }
             }
-            catch (IgniteCheckedException e) {
-                U.error(log, "Failed to remove obsolete entry from cache: " + cached, e);
+            finally {
+                cctx.database().checkpointReadUnlock();
             }
         }
     }
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsWithTtlDeactivateOnHighloadTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsWithTtlDeactivateOnHighloadTest.java
new file mode 100644
index 0000000..5e58e5b
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsWithTtlDeactivateOnHighloadTest.java
@@ -0,0 +1,260 @@
+package org.apache.ignite.internal.processors.cache.persistence.db;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import javax.cache.expiry.AccessedExpiryPolicy;
+import javax.cache.expiry.Duration;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.cache.CacheRebalanceMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.failure.FailureContext;
+import org.apache.ignite.failure.FailureHandler;
+import org.apache.ignite.failure.NoOpFailureHandler;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.MvccFeatureChecker;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import static org.apache.ignite.testframework.GridTestUtils.runAsync;
+import static org.apache.ignite.testframework.GridTestUtils.runMultiThreadedAsync;
+import static org.apache.ignite.testframework.GridTestUtils.waitForAllFutures;
+
+/**
+ * Test TTL worker with persistence enabled
+ */
+@WithSystemProperty(key = IgniteSystemProperties.IGNITE_UNWIND_THROTTLING_TIMEOUT, value = "5")
+public class IgnitePdsWithTtlDeactivateOnHighloadTest extends GridCommonAbstractTest {
+    /** */
+    private static final String CACHE_NAME = "expirable-cache";
+
+    /** */
+    private static final String GROUP_NAME = "group1";
+
+    /** */
+    private static final int PART_SIZE = 2;
+
+    /** */
+    private static final int EXPIRATION_TIMEOUT = 10;
+
+    /** */
+    private static final int ENTRIES = 10;
+
+    /** */
+    private static final int WORKLOAD_THREADS_CNT = 8;
+
+    /** Fail. */
+    private volatile boolean fail;
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        MvccFeatureChecker.skipIfNotSupported(MvccFeatureChecker.Feature.EXPIRATION);
+
+        super.beforeTest();
+
+        stopAllGrids();
+
+        cleanPersistenceDir();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+
+        cleanPersistenceDir();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        final IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        cfg.setDataStorageConfiguration(
+            new DataStorageConfiguration()
+                .setDefaultDataRegionConfiguration(
+                    new DataRegionConfiguration()
+                        .setMaxSize(2L * 1024 * 1024 * 1024)
+                        .setPersistenceEnabled(true)
+                ).setWalMode(WALMode.LOG_ONLY));
+
+        cfg.setCacheConfiguration(getCacheConfiguration(CACHE_NAME));
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected FailureHandler getFailureHandler(String igniteInstanceName) {
+        return new NoOpFailureHandler() {
+            @Override protected boolean handle(Ignite ignite, FailureContext failureCtx) {
+                fail = true;
+
+                return super.handle(ignite, failureCtx);
+            }
+        };
+    }
+
+    /**
+     * Returns a new cache configuration with the given name and {@code GROUP_NAME} group.
+     *
+     * @param name Cache name.
+     * @return Cache configuration.
+     */
+    private CacheConfiguration getCacheConfiguration(String name) {
+        CacheConfiguration ccfg = new CacheConfiguration();
+
+        ccfg.setName(name);
+        ccfg.setGroupName(GROUP_NAME);
+        ccfg.setAffinity(new RendezvousAffinityFunction(false, PART_SIZE));
+        ccfg.setExpiryPolicyFactory(AccessedExpiryPolicy.factoryOf(new Duration(TimeUnit.MILLISECONDS, EXPIRATION_TIMEOUT)));
+        ccfg.setEagerTtl(true);
+        ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+        ccfg.setRebalanceMode(CacheRebalanceMode.SYNC);
+
+        return ccfg;
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    @Test
+    public void shouldNotBeProblemToPutToExpiredCacheConcurrentlyWithCheckpoint() throws Exception {
+        IgniteEx ig0 = startGrid(0);
+
+        ig0.cluster().active(true);
+
+        IgniteCache<Object, Object> cache = ig0.getOrCreateCache(CACHE_NAME);
+
+        AtomicBoolean timeoutReached = new AtomicBoolean(false);
+
+        GridCacheDatabaseSharedManager db = (GridCacheDatabaseSharedManager)ig0.context().cache().context().database();
+
+        IgniteInternalFuture ldrFut = runMultiThreadedAsync(() -> {
+            while (!timeoutReached.get()) {
+                Map map = new TreeMap();
+
+                for (int i = 0; i < ENTRIES; i++)
+                    map.put(i, i);
+
+                cache.putAll(map);
+            }
+        }, WORKLOAD_THREADS_CNT, "loader");
+
+        IgniteInternalFuture updaterFut = runMultiThreadedAsync(() -> {
+            while (!timeoutReached.get()) {
+                for (int i = 0; i < ENTRIES; i++)
+                    cache.put(i, i * 10);
+            }
+        }, WORKLOAD_THREADS_CNT, "updater");
+
+        IgniteInternalFuture cpWriteLockUnlockFut = runAsync(() -> {
+            ReentrantReadWriteLock lock = U.field(db, "checkpointLock");
+
+            while (!timeoutReached.get()) {
+                try {
+                    lock.writeLock().lockInterruptibly();
+
+                    doSleep(30);
+                }
+                catch (InterruptedException ignored) {
+                    break;
+                }
+                finally {
+                    lock.writeLock().unlock();
+                }
+
+                doSleep(30);
+            }
+        }, "cp-write-lock-holder");
+
+        doSleep(10_000);
+
+        timeoutReached.set(true);
+
+        waitForAllFutures(cpWriteLockUnlockFut, ldrFut, updaterFut);
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    @Test
+    public void shouldNotBeProblemToPutToExpiredCacheConcurrently() throws Exception {
+        final AtomicBoolean end = new AtomicBoolean();
+
+        final IgniteEx srv = startGrid(0);
+
+        srv.cluster().active(true);
+
+        // Start high workload
+        IgniteInternalFuture loadFut = runMultiThreadedAsync(() -> {
+            while (!end.get() && !fail) {
+                IgniteCache<Integer, byte[]> cache = srv.cache(CACHE_NAME);
+
+                for (int i = 0; i < ENTRIES; i++)
+                    cache.put(i, new byte[1024]);
+
+                //Touch entries.
+                for (int i = 0; i < ENTRIES; i++)
+                    cache.get(i); // touch entries
+            }
+        }, WORKLOAD_THREADS_CNT, "high-workload");
+
+        try {
+            // Let's wait some time.
+            loadFut.get(10, TimeUnit.SECONDS);
+        }
+        catch (Exception e) {
+            assertFalse("Failure handler was called. See log above.", fail);
+
+            assertTrue(X.hasCause(e, IgniteFutureTimeoutCheckedException.class));
+        }
+        finally {
+            end.set(true);
+        }
+
+        assertFalse("Failure handler was called. See log above.", fail);
+    }
+
+    /** */
+    protected void fillCache(IgniteCache<Integer, byte[]> cache) {
+        for (int i = 0; i < ENTRIES; i++)
+            cache.put(i, new byte[1024]);
+
+        //Touch entries.
+        for (int i = 0; i < ENTRIES; i++)
+            cache.get(i); // touch entries
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
index dcd60b9..d281e6a 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
@@ -42,6 +42,7 @@ import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsRebal
 import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsReserveWalSegmentsTest;
 import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsReserveWalSegmentsWithCompactionTest;
 import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsWholeClusterRestartTest;
+import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsWithTtlDeactivateOnHighloadTest;
 import org.apache.ignite.internal.processors.cache.persistence.db.IgniteShutdownOnSupplyMessageFailureTest;
 import org.apache.ignite.internal.processors.cache.persistence.db.SlowHistoricalRebalanceSmallHistoryTest;
 import org.apache.ignite.internal.processors.cache.persistence.db.checkpoint.CheckpointFailBeforeWriteMarkTest;
@@ -250,5 +251,7 @@ public class IgnitePdsTestSuite2 {
         GridTestUtils.addTestIfNeeded(suite, FsyncWalRolloverDoesNotBlockTest.class, ignoredTests);
 
         GridTestUtils.addTestIfNeeded(suite, IgnitePdsPartitionsStateRecoveryTest.class, ignoredTests);
+
+        GridTestUtils.addTestIfNeeded(suite, IgnitePdsWithTtlDeactivateOnHighloadTest.class, ignoredTests);
     }
 }