You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by jo...@apache.org on 2019/05/23 17:23:55 UTC
[ignite] branch master updated: IGNITE-11592 Correctly handle cache
stop in transactions
This is an automated email from the ASF dual-hosted git repository.
jokser pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 07addc2 IGNITE-11592 Correctly handle cache stop in transactions
07addc2 is described below
commit 07addc2660feceabf0fe7190ba9d8c83be80d4e9
Author: zstan <st...@gmail.com>
AuthorDate: Thu May 23 20:21:53 2019 +0300
IGNITE-11592 Correctly handle cache stop in transactions
Signed-off-by: Pavel Kovalenko <jo...@gmail.com>
---
.../processors/cache/GridCacheProcessor.java | 31 ++
.../processors/cache/GridCacheSharedContext.java | 3 +-
.../dht/GridDhtTopologyFutureAdapter.java | 4 +
.../persistence/file/FilePageStoreManager.java | 2 +-
.../cache/transactions/IgniteTxEntry.java | 8 +-
.../cache/transactions/IgniteTxHandler.java | 27 +-
.../cache/transactions/IgniteTxManager.java | 32 +++
.../cache/transactions/IgniteTxStateImpl.java | 7 +
.../cache/transactions/TransactionProxyImpl.java | 3 -
.../cache/transactions/TxOnCachesStopTest.java | 316 +++++++++++++++++++++
.../ignite/testsuites/IgniteCacheTestSuite6.java | 2 +
11 files changed, 422 insertions(+), 13 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 05a0bf6..28bfa81 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -128,7 +128,9 @@ import org.apache.ignite.internal.processors.cache.query.GridCacheLocalQueryMana
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager;
import org.apache.ignite.internal.processors.cache.store.CacheStoreManager;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTransactionsImpl;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionManager;
import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;
@@ -3276,11 +3278,40 @@ public class GridCacheProcessor extends GridProcessorAdapter {
((GridServiceProcessor)ctx.service()).updateUtilityCache();
}
+ rollbackCoveredTx(exchActions);
+
if (err == null)
processCacheStopRequestOnExchangeDone(exchActions);
}
/**
+ * Rollback tx covered by stopped caches.
+ *
+ * @param exchActions Change requests.
+ */
+ private void rollbackCoveredTx(ExchangeActions exchActions) {
+ if (!exchActions.cacheGroupsToStop().isEmpty() || !exchActions.cacheStopRequests().isEmpty()) {
+ Set<Integer> cachesToStop = new HashSet<>();
+
+ for (ExchangeActions.CacheGroupActionData act : exchActions.cacheGroupsToStop()) {
+ @Nullable CacheGroupContext grpCtx = context().cache().cacheGroup(act.descriptor().groupId());
+
+ if (grpCtx != null && grpCtx.sharedGroup())
+ cachesToStop.addAll(grpCtx.cacheIds());
+ }
+
+ for (ExchangeActions.CacheActionData act : exchActions.cacheStopRequests())
+ cachesToStop.add(act.descriptor().cacheId());
+
+ if (!cachesToStop.isEmpty()) {
+ IgniteTxManager tm = context().tm();
+
+ tm.rollbackTransactionsForCaches(cachesToStop);
+ }
+ }
+ }
+
+ /**
* @param grpId Group ID.
*/
private void stopCacheGroup(int grpId) {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
index 783939c..50b598b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
@@ -1046,10 +1046,9 @@ public class GridCacheSharedContext<K, V> {
/**
* @param tx Transaction to rollback.
- * @throws IgniteCheckedException If failed.
* @return Rollback future.
*/
- public IgniteInternalFuture rollbackTxAsync(GridNearTxLocal tx) throws IgniteCheckedException {
+ public IgniteInternalFuture rollbackTxAsync(GridNearTxLocal tx) {
boolean clearThreadMap = txMgr.threadLocalTx(null) == tx;
if (clearThreadMap)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFutureAdapter.java
index 88b99b7..33b20fe 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFutureAdapter.java
@@ -88,6 +88,10 @@ public abstract class GridDhtTopologyFutureAdapter extends GridFutureAdapter<Aff
return new CacheInvalidStateException(
"Failed to perform cache operation (cluster is not activated): " + cctx.name());
+ if (cctx.cache() == null)
+ return new CacheInvalidStateException(
+ "Failed to perform cache operation (cache is stopped): " + cctx.name());
+
OperationType opType = read ? OperationType.READ : WRITE;
CacheGroupContext grp = cctx.group();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
index 3bf4db8..8ab5033 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
@@ -1086,7 +1086,7 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen
if (file.exists()) {
if (!file.delete())
- throw new IgniteCheckedException("Failed to delete cache configuration:" + cacheCfg.getName());
+ throw new IgniteCheckedException("Failed to delete cache configuration: " + cacheCfg.getName());
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
index a105a2f..b8d596c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
@@ -30,6 +30,7 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.GridDirectTransient;
import org.apache.ignite.internal.IgniteCodeGeneratingFail;
import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
+import org.apache.ignite.internal.processors.cache.CacheInvalidStateException;
import org.apache.ignite.internal.processors.cache.CacheInvokeEntry;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
@@ -940,9 +941,10 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
if (this.ctx == null) {
GridCacheContext<?, ?> cacheCtx = ctx.cacheContext(cacheId);
- assert cacheCtx != null : "Failed to find cache context [cacheId=" + cacheId +
- ", readyTopVer=" + ctx.exchange().readyAffinityVersion() + ']';
-
+ if (cacheCtx == null)
+ throw new CacheInvalidStateException(
+ "Failed to perform cache operation (cache is stopped), cacheId=" + cacheId);
+
if (cacheCtx.isNear() && !near)
cacheCtx = cacheCtx.near().dht().context();
else if (!cacheCtx.isNear() && near)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index 787fb97..8846115 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -108,6 +108,7 @@ import static org.apache.ignite.internal.managers.communication.GridIoPolicy.UTI
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isNearEnabled;
import static org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx.FinalizationStatus.USER_FINISH;
+import static org.apache.ignite.internal.util.lang.GridFunc.isEmpty;
import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
import static org.apache.ignite.transactions.TransactionState.PREPARED;
@@ -162,13 +163,13 @@ public class IgniteTxHandler {
* @param nearNode Sender node.
* @param req Request.
*/
- private void processNearTxPrepareRequest0(ClusterNode nearNode, GridNearTxPrepareRequest req) {
+ private IgniteInternalFuture<GridNearTxPrepareResponse> processNearTxPrepareRequest0(ClusterNode nearNode, GridNearTxPrepareRequest req) {
IgniteInternalFuture<GridNearTxPrepareResponse> fut;
if (req.firstClientRequest() && req.allowWaitTopologyFuture()) {
for (;;) {
if (waitForExchangeFuture(nearNode, req))
- return;
+ return new GridFinishedFuture<>();
fut = prepareNearTx(nearNode, req);
@@ -181,6 +182,8 @@ public class IgniteTxHandler {
assert req.txState() != null || fut == null || fut.error() != null ||
(ctx.tm().tx(req.version()) == null && ctx.tm().nearTx(req.version()) == null);
+
+ return fut;
}
/**
@@ -348,7 +351,7 @@ public class IgniteTxHandler {
* @param req Request.
* @return Prepare future.
*/
- public IgniteInternalFuture<GridNearTxPrepareResponse> prepareNearTxLocal(
+ public IgniteInternalFuture<GridNearTxPrepareResponse> prepareNearTxLocal(
final GridNearTxLocal originTx,
final GridNearTxPrepareRequest req) {
// Make sure not to provide Near entries to DHT cache.
@@ -623,10 +626,26 @@ public class IgniteTxHandler {
return;
}
ctx.kernalContext().closure().runLocalWithThreadPolicy(thread, () -> {
+ IgniteInternalFuture<GridNearTxPrepareResponse> fut = null;
+
+ Throwable err = null;
+
try {
- processNearTxPrepareRequest0(node, req);
+ for (IgniteTxEntry itm : F.concat(false, req.writes(), req.reads())) {
+ err = topFut.validateCache(itm.context(), req.recovery(), isEmpty(req.writes()),
+ null, null);
+
+ if (err != null)
+ break;
+ }
+
+ if (err == null)
+ fut = processNearTxPrepareRequest0(node, req);
}
finally {
+ if (fut == null || fut.error() != null || err != null)
+ sendResponseOnTimeoutOrError(e, topFut, node, req);
+
ctx.io().onMessageProcessed(req);
}
});
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index 06633b4..87bd905 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -321,6 +321,38 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
}
/**
+ * @param cachesToStop Caches to stop.
+ */
+ public void rollbackTransactionsForCaches(Set<Integer> cachesToStop) {
+ if (!cachesToStop.isEmpty()) {
+ IgniteTxManager tm = context().tm();
+
+ Collection<IgniteInternalTx> active = tm.activeTransactions();
+
+ GridCompoundFuture<IgniteInternalTx, IgniteInternalTx> compFut = new GridCompoundFuture<>();
+
+ for (IgniteInternalTx tx : active) {
+ for (IgniteTxEntry e : tx.allEntries()) {
+ if (cachesToStop.contains(e.context().cacheId())) {
+ compFut.add(tx.rollbackAsync());
+
+ break;
+ }
+ }
+ }
+
+ compFut.markInitialized();
+
+ try {
+ compFut.get();
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Error occured during tx rollback.", e);
+ }
+ }
+ }
+
+ /**
* Rollback transactions blocking partition map exchange.
*
* @param topVer Initial exchange version.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
index b5a0539..402085c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
@@ -24,8 +24,10 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+
import org.apache.ignite.IgniteCacheRestartingException;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.CacheInterceptor;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
@@ -123,6 +125,9 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter {
for (int i = 0; i < activeCacheIds.size(); i++) {
int cacheId = activeCacheIds.get(i);
+ if (cctx.cacheContext(cacheId) == null)
+ throw new IgniteException("Cache is stopped, id=" + cacheId);
+
cctx.cacheContext(cacheId).cache().awaitLastFut();
}
}
@@ -394,6 +399,8 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter {
GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
+ assert cacheCtx != null : "cacheCtx == null, cacheId=" + cacheId;
+
onTxEnd(cacheCtx, tx, commit);
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java
index 628b813..110f34d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java
@@ -363,9 +363,6 @@ public class TransactionProxyImpl<K, V> implements TransactionProxy, Externaliza
try {
return (IgniteFuture<Void>)(new IgniteFutureImpl(cctx.rollbackTxAsync(tx)));
}
- catch (IgniteCheckedException e) {
- throw U.convertException(e);
- }
finally {
leave();
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOnCachesStopTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOnCachesStopTest.java
new file mode 100644
index 0000000..5086f28
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOnCachesStopTest.java
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.transactions;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.CacheInvalidStateException;
+import org.apache.ignite.internal.processors.cache.CacheStoppedException;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest;
+import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
+import org.apache.ignite.internal.util.GridRandom;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.MvccFeatureChecker;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+import org.apache.ignite.transactions.TransactionRollbackException;
+import org.junit.Test;
+
+import java.util.concurrent.CountDownLatch;
+
+/**
+ *
+ */
+public class TxOnCachesStopTest extends GridCommonAbstractTest {
+ /** Cache1 name. */
+ private static final String CACHE_1_NAME = "cache1";
+
+ /** Cache2 name. */
+ private static final String CACHE_2_NAME = "cache2";
+
+ /** rnd instance. */
+ private static final GridRandom rnd = new GridRandom();
+
+ /** */
+ private CacheConfiguration<Integer, byte[]> destroyCacheCfg;
+
+ /** */
+ private CacheConfiguration<Integer, byte[]> surviveCacheCfg;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ TestRecordingCommunicationSpi commSpi = new TestRecordingCommunicationSpi();
+ cfg.setCommunicationSpi(commSpi);
+
+ DataStorageConfiguration memCfg = new DataStorageConfiguration()
+ .setDefaultDataRegionConfiguration(
+ new DataRegionConfiguration().setMaxSize(100 * 1024 * 1024).setPersistenceEnabled(true))
+ .setWalMode(WALMode.LOG_ONLY);
+
+ cfg.setDataStorageConfiguration(memCfg);
+
+ CacheConfiguration<Integer, byte[]> ccfg1 = new CacheConfiguration<>();
+
+ ccfg1.setName(CACHE_1_NAME);
+ ccfg1.setBackups(1);
+ ccfg1.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
+ ccfg1.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+ ccfg1.setAffinity(new RendezvousAffinityFunction(false, 32));
+
+ destroyCacheCfg = ccfg1;
+
+ CacheConfiguration<Integer, byte[]> ccfg2 = new CacheConfiguration<>();
+
+ ccfg2.setName(CACHE_2_NAME);
+ ccfg2.setBackups(1);
+ ccfg2.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
+ ccfg2.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+ ccfg2.setAffinity(new RendezvousAffinityFunction(false, 32));
+
+ surviveCacheCfg = ccfg2;
+
+ cfg.setCacheConfiguration(ccfg1, ccfg2);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ stopAllGrids();
+
+ cleanPersistenceDir();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ grid(0).destroyCache(destroyCacheCfg.getName());
+ grid(0).destroyCache(surviveCacheCfg.getName());
+
+ stopAllGrids();
+
+ cleanPersistenceDir();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testTxOnCacheStopNoMessageBlock() throws Exception {
+ testTxOnCacheStop(false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testTxOnCacheStopWithMessageBlock() throws Exception {
+ testTxOnCacheStop(true);
+ }
+
+ /**
+ * @param block {@code True} To block GridNearTxPrepareRequest message.
+ */
+ public void testTxOnCacheStop(boolean block) throws Exception {
+ startGridsMultiThreaded(2);
+
+ Ignition.setClientMode(true);
+
+ IgniteEx ig = startGrid("client");
+
+ ig.cluster().active(true);
+
+ for (TransactionConcurrency conc : TransactionConcurrency.values())
+ for (TransactionIsolation iso : TransactionIsolation.values())
+ runTxOnCacheStop(conc, iso, ig, block);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testTxOnCacheStopInMid() throws Exception {
+ startGridsMultiThreaded(2);
+
+ Ignition.setClientMode(true);
+
+ IgniteEx ig = startGrid("client");
+
+ ig.cluster().active(true);
+
+ for (TransactionConcurrency conc : TransactionConcurrency.values())
+ for (TransactionIsolation iso : TransactionIsolation.values())
+ runCacheStopInMidTx(conc, iso, ig);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ private void runTxOnCacheStop(TransactionConcurrency conc, TransactionIsolation iso, Ignite ig, boolean runConc)
+ throws Exception {
+ if ((conc == TransactionConcurrency.OPTIMISTIC) && (MvccFeatureChecker.forcedMvcc()))
+ return;
+
+ CountDownLatch destroyLatch = new CountDownLatch(1);
+
+ final IgniteCache<Integer, byte[]> cache = ig.getOrCreateCache(destroyCacheCfg);
+
+ final IgniteCache<Integer, byte[]> cache2 = ig.getOrCreateCache(surviveCacheCfg);
+
+ TestRecordingCommunicationSpi spi = TestRecordingCommunicationSpi.spi(ig);
+
+ IgniteInternalFuture f0 = GridTestUtils.runAsync(() -> {
+ try {
+ destroyLatch.await();
+
+ IgniteInternalFuture f = GridTestUtils.runAsync(() -> {
+ doSleep(rnd.nextInt(500));
+
+ spi.stopBlock();
+ });
+
+ cache.destroy();
+
+ f.get();
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ }
+ });
+
+ spi.blockMessages((node, msg) -> {
+ if (msg instanceof GridNearTxPrepareRequest) {
+ destroyLatch.countDown();
+
+ return runConc;
+ }
+
+ return false;
+ });
+
+ IgniteInternalFuture f1 = GridTestUtils.runAsync(() -> {
+ byte[] val = new byte[1024];
+
+ try (Transaction tx = ig.transactions().txStart(conc, iso, 1_000, 2)) {
+ cache.put(100, val);
+
+ cache2.put(100, val);
+
+ tx.commit();
+ }
+ catch (IgniteException e) {
+ assertTrue(X.hasCause(e, IgniteTxTimeoutCheckedException.class)
+ || X.hasCause(e, CacheInvalidStateException.class) || X.hasCause(e, IgniteException.class));
+ }
+ });
+
+ f1.get();
+ f0.get();
+
+ try {
+ assertEquals(cache2.get(100), cache.get(100));
+ }
+ catch (IllegalStateException e) {
+ assertTrue(X.hasCause(e, CacheStoppedException.class));
+ }
+
+ spi.stopBlock();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ private void runCacheStopInMidTx(TransactionConcurrency conc, TransactionIsolation iso, Ignite ig) throws Exception {
+ if ((conc == TransactionConcurrency.OPTIMISTIC) && (MvccFeatureChecker.forcedMvcc()))
+ return;
+
+ CountDownLatch destroyLatch = new CountDownLatch(1);
+
+ CountDownLatch putLatch = new CountDownLatch(1);
+
+ final IgniteCache<Integer, byte[]> cache = ig.getOrCreateCache(destroyCacheCfg);
+
+ final IgniteCache<Integer, byte[]> cache2 = ig.getOrCreateCache(surviveCacheCfg);
+
+ IgniteInternalFuture f0 = GridTestUtils.runAsync(() -> {
+ try {
+ putLatch.await();
+
+ cache.destroy();
+
+ destroyLatch.countDown();
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ }
+ });
+
+ IgniteInternalFuture f1 = GridTestUtils.runAsync(() -> {
+ byte[] val = new byte[1024];
+
+ try (Transaction tx = ig.transactions().txStart(conc, iso, 1_000, 2)) {
+ cache.put(100, val);
+
+ cache2.put(100, val);
+
+ putLatch.countDown();
+
+ destroyLatch.await();
+
+ tx.commit();
+ }
+ catch (IgniteException e) {
+ assertTrue(X.hasCause(e, CacheInvalidStateException.class) ||
+ X.hasCause(e, CacheStoppedException.class) || X.hasCause(e, TransactionRollbackException.class) ||
+ X.hasCause(e, IgniteException.class));
+ }
+ catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ });
+
+ f1.get();
+ f0.get();
+
+ assertNull(cache2.get(100));
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
index 5bbd7b2..f2e7806 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
@@ -44,6 +44,7 @@ import org.apache.ignite.internal.processors.cache.transactions.TxLabelTest;
import org.apache.ignite.internal.processors.cache.transactions.TxLocalDhtMixedCacheModesTest;
import org.apache.ignite.internal.processors.cache.transactions.TxMultiCacheAsyncOpsTest;
import org.apache.ignite.internal.processors.cache.transactions.TxOnCachesStartTest;
+import org.apache.ignite.internal.processors.cache.transactions.TxOnCachesStopTest;
import org.apache.ignite.internal.processors.cache.transactions.TxOptimisticOnPartitionExchangeTest;
import org.apache.ignite.internal.processors.cache.transactions.TxOptimisticPrepareOnUnstableTopologyTest;
import org.apache.ignite.internal.processors.cache.transactions.TxRollbackAsyncNearCacheTest;
@@ -104,6 +105,7 @@ public class IgniteCacheTestSuite6 {
GridTestUtils.addTestIfNeeded(suite, TxMultiCacheAsyncOpsTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, TxOnCachesStartTest.class, ignoredTests);
+ GridTestUtils.addTestIfNeeded(suite, TxOnCachesStopTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, IgniteCache150ClientsTest.class, ignoredTests);