You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2020/01/30 12:16:02 UTC
[ignite] 02/02: IGNITE-12593 Fix assertion during CacheDataTree
update caused by byte array values and TTL - Fixes #7324.
This is an automated email from the ASF dual-hosted git repository.
agoncharuk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
commit 4593d6c11005ee5fcad70669e1ea9d8e0bafc7e5
Author: Anton Kalashnikov <ka...@yandex.ru>
AuthorDate: Thu Jan 30 14:08:00 2020 +0300
IGNITE-12593 Fix assertion during CacheDataTree update caused by byte array values and TTL - Fixes #7324.
Signed-off-by: Alexey Goncharuk <al...@gmail.com>
---
.../processors/cache/GridCacheMapEntry.java | 2 +
.../cache/distributed/dht/GridDhtCacheAdapter.java | 9 +-
.../cache/transactions/IgniteTxManager.java | 39 ++--
.../IgnitePdsWithTtlDeactivateOnHighloadTest.java | 260 +++++++++++++++++++++
.../ignite/testsuites/IgnitePdsTestSuite2.java | 3 +
5 files changed, 295 insertions(+), 18 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 1ad7eec..97d4fb6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -6037,6 +6037,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
entry.deletedUnlocked(false);
}
}
+ else if (oldVal != null && entry.deletedUnlocked())
+ entry.deletedUnlocked(false);
CacheInvokeEntry<Object, Object> invokeEntry = null;
IgniteBiTuple<Object, Exception> invokeRes = null;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index 139c67c..6a45ec5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.processors.cache.distributed.dht;
+import javax.cache.Cache;
+import javax.cache.expiry.ExpiryPolicy;
import java.io.Externalizable;
import java.util.Collection;
import java.util.Collections;
@@ -28,8 +30,6 @@ import java.util.NoSuchElementException;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import javax.cache.Cache;
-import javax.cache.expiry.ExpiryPolicy;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterNode;
@@ -232,6 +232,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
GridCacheEntryEx entry;
while (true) {
+ ctx.shared().database().checkpointReadLock();
+
try {
entry = ctx.dht().entryEx(k);
@@ -278,6 +280,9 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
break;
}
+ finally {
+ ctx.shared().database().checkpointReadUnlock();
+ }
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index ef849cd..f5e8d8b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -1149,31 +1149,38 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
Collection<IgniteTxEntry> entries = tx.local() ? tx.allEntries() : tx.writeEntries();
for (IgniteTxEntry entry : entries) {
- GridCacheEntryEx cached = entry.cached();
+ cctx.database().checkpointReadLock();
- GridCacheContext cacheCtx = entry.context();
+ try {
+ GridCacheEntryEx cached = entry.cached();
- if (cached == null)
- cached = cacheCtx.cache().peekEx(entry.key());
+ GridCacheContext cacheCtx = entry.context();
- if (cached.detached())
- continue;
+ if (cached == null)
+ cached = cacheCtx.cache().peekEx(entry.key());
- try {
- if (cached.obsolete() || cached.markObsoleteIfEmpty(tx.xidVersion()))
- cacheCtx.cache().removeEntry(cached);
+ if (cached.detached())
+ continue;
+
+ try {
+ if (cached.obsolete() || cached.markObsoleteIfEmpty(tx.xidVersion()))
+ cacheCtx.cache().removeEntry(cached);
- if (!tx.near() && isNearEnabled(cacheCtx)) {
- GridNearCacheAdapter near = cacheCtx.isNear() ? cacheCtx.near() : cacheCtx.dht().near();
+ if (!tx.near() && isNearEnabled(cacheCtx)) {
+ GridNearCacheAdapter near = cacheCtx.isNear() ? cacheCtx.near() : cacheCtx.dht().near();
- GridNearCacheEntry e = near.peekExx(entry.key());
+ GridNearCacheEntry e = near.peekExx(entry.key());
- if (e != null && e.markObsoleteIfEmpty(null))
- near.removeEntry(e);
+ if (e != null && e.markObsoleteIfEmpty(null))
+ near.removeEntry(e);
+ }
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to remove obsolete entry from cache: " + cached, e);
}
}
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to remove obsolete entry from cache: " + cached, e);
+ finally {
+ cctx.database().checkpointReadUnlock();
}
}
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsWithTtlDeactivateOnHighloadTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsWithTtlDeactivateOnHighloadTest.java
new file mode 100644
index 0000000..5e58e5b
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsWithTtlDeactivateOnHighloadTest.java
@@ -0,0 +1,260 @@
+package org.apache.ignite.internal.processors.cache.persistence.db;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import javax.cache.expiry.AccessedExpiryPolicy;
+import javax.cache.expiry.Duration;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.cache.CacheRebalanceMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.failure.FailureContext;
+import org.apache.ignite.failure.FailureHandler;
+import org.apache.ignite.failure.NoOpFailureHandler;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.MvccFeatureChecker;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import static org.apache.ignite.testframework.GridTestUtils.runAsync;
+import static org.apache.ignite.testframework.GridTestUtils.runMultiThreadedAsync;
+import static org.apache.ignite.testframework.GridTestUtils.waitForAllFutures;
+
+/**
+ * Test TTL worker with persistence enabled
+ */
+@WithSystemProperty(key = IgniteSystemProperties.IGNITE_UNWIND_THROTTLING_TIMEOUT, value = "5")
+public class IgnitePdsWithTtlDeactivateOnHighloadTest extends GridCommonAbstractTest {
+ /** */
+ private static final String CACHE_NAME = "expirable-cache";
+
+ /** */
+ private static final String GROUP_NAME = "group1";
+
+ /** */
+ private static final int PART_SIZE = 2;
+
+ /** */
+ private static final int EXPIRATION_TIMEOUT = 10;
+
+ /** */
+ private static final int ENTRIES = 10;
+
+ /** */
+ private static final int WORKLOAD_THREADS_CNT = 8;
+
+ /** Fail. */
+ private volatile boolean fail;
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ MvccFeatureChecker.skipIfNotSupported(MvccFeatureChecker.Feature.EXPIRATION);
+
+ super.beforeTest();
+
+ stopAllGrids();
+
+ cleanPersistenceDir();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ stopAllGrids();
+
+ cleanPersistenceDir();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ final IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ cfg.setDataStorageConfiguration(
+ new DataStorageConfiguration()
+ .setDefaultDataRegionConfiguration(
+ new DataRegionConfiguration()
+ .setMaxSize(2L * 1024 * 1024 * 1024)
+ .setPersistenceEnabled(true)
+ ).setWalMode(WALMode.LOG_ONLY));
+
+ cfg.setCacheConfiguration(getCacheConfiguration(CACHE_NAME));
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected FailureHandler getFailureHandler(String igniteInstanceName) {
+ return new NoOpFailureHandler() {
+ @Override protected boolean handle(Ignite ignite, FailureContext failureCtx) {
+ fail = true;
+
+ return super.handle(ignite, failureCtx);
+ }
+ };
+ }
+
+ /**
+ * Returns a new cache configuration with the given name and {@code GROUP_NAME} group.
+ *
+ * @param name Cache name.
+ * @return Cache configuration.
+ */
+ private CacheConfiguration getCacheConfiguration(String name) {
+ CacheConfiguration ccfg = new CacheConfiguration();
+
+ ccfg.setName(name);
+ ccfg.setGroupName(GROUP_NAME);
+ ccfg.setAffinity(new RendezvousAffinityFunction(false, PART_SIZE));
+ ccfg.setExpiryPolicyFactory(AccessedExpiryPolicy.factoryOf(new Duration(TimeUnit.MILLISECONDS, EXPIRATION_TIMEOUT)));
+ ccfg.setEagerTtl(true);
+ ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+ ccfg.setRebalanceMode(CacheRebalanceMode.SYNC);
+
+ return ccfg;
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ @Test
+ public void shouldNotBeProblemToPutToExpiredCacheConcurrentlyWithCheckpoint() throws Exception {
+ IgniteEx ig0 = startGrid(0);
+
+ ig0.cluster().active(true);
+
+ IgniteCache<Object, Object> cache = ig0.getOrCreateCache(CACHE_NAME);
+
+ AtomicBoolean timeoutReached = new AtomicBoolean(false);
+
+ GridCacheDatabaseSharedManager db = (GridCacheDatabaseSharedManager)ig0.context().cache().context().database();
+
+ IgniteInternalFuture ldrFut = runMultiThreadedAsync(() -> {
+ while (!timeoutReached.get()) {
+ Map map = new TreeMap();
+
+ for (int i = 0; i < ENTRIES; i++)
+ map.put(i, i);
+
+ cache.putAll(map);
+ }
+ }, WORKLOAD_THREADS_CNT, "loader");
+
+ IgniteInternalFuture updaterFut = runMultiThreadedAsync(() -> {
+ while (!timeoutReached.get()) {
+ for (int i = 0; i < ENTRIES; i++)
+ cache.put(i, i * 10);
+ }
+ }, WORKLOAD_THREADS_CNT, "updater");
+
+ IgniteInternalFuture cpWriteLockUnlockFut = runAsync(() -> {
+ ReentrantReadWriteLock lock = U.field(db, "checkpointLock");
+
+ while (!timeoutReached.get()) {
+ try {
+ lock.writeLock().lockInterruptibly();
+
+ doSleep(30);
+ }
+ catch (InterruptedException ignored) {
+ break;
+ }
+ finally {
+ lock.writeLock().unlock();
+ }
+
+ doSleep(30);
+ }
+ }, "cp-write-lock-holder");
+
+ doSleep(10_000);
+
+ timeoutReached.set(true);
+
+ waitForAllFutures(cpWriteLockUnlockFut, ldrFut, updaterFut);
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ @Test
+ public void shouldNotBeProblemToPutToExpiredCacheConcurrently() throws Exception {
+ final AtomicBoolean end = new AtomicBoolean();
+
+ final IgniteEx srv = startGrid(0);
+
+ srv.cluster().active(true);
+
+ // Start high workload
+ IgniteInternalFuture loadFut = runMultiThreadedAsync(() -> {
+ while (!end.get() && !fail) {
+ IgniteCache<Integer, byte[]> cache = srv.cache(CACHE_NAME);
+
+ for (int i = 0; i < ENTRIES; i++)
+ cache.put(i, new byte[1024]);
+
+ //Touch entries.
+ for (int i = 0; i < ENTRIES; i++)
+ cache.get(i); // touch entries
+ }
+ }, WORKLOAD_THREADS_CNT, "high-workload");
+
+ try {
+ // Let's wait some time.
+ loadFut.get(10, TimeUnit.SECONDS);
+ }
+ catch (Exception e) {
+ assertFalse("Failure handler was called. See log above.", fail);
+
+ assertTrue(X.hasCause(e, IgniteFutureTimeoutCheckedException.class));
+ }
+ finally {
+ end.set(true);
+ }
+
+ assertFalse("Failure handler was called. See log above.", fail);
+ }
+
+ /** */
+ protected void fillCache(IgniteCache<Integer, byte[]> cache) {
+ for (int i = 0; i < ENTRIES; i++)
+ cache.put(i, new byte[1024]);
+
+ //Touch entries.
+ for (int i = 0; i < ENTRIES; i++)
+ cache.get(i); // touch entries
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
index dcd60b9..d281e6a 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
@@ -42,6 +42,7 @@ import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsRebal
import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsReserveWalSegmentsTest;
import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsReserveWalSegmentsWithCompactionTest;
import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsWholeClusterRestartTest;
+import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsWithTtlDeactivateOnHighloadTest;
import org.apache.ignite.internal.processors.cache.persistence.db.IgniteShutdownOnSupplyMessageFailureTest;
import org.apache.ignite.internal.processors.cache.persistence.db.SlowHistoricalRebalanceSmallHistoryTest;
import org.apache.ignite.internal.processors.cache.persistence.db.checkpoint.CheckpointFailBeforeWriteMarkTest;
@@ -250,5 +251,7 @@ public class IgnitePdsTestSuite2 {
GridTestUtils.addTestIfNeeded(suite, FsyncWalRolloverDoesNotBlockTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, IgnitePdsPartitionsStateRecoveryTest.class, ignoredTests);
+
+ GridTestUtils.addTestIfNeeded(suite, IgnitePdsWithTtlDeactivateOnHighloadTest.class, ignoredTests);
}
}