You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by jo...@apache.org on 2019/05/23 17:23:55 UTC

[ignite] branch master updated: IGNITE-11592 Correctly handle cache stop in transactions

This is an automated email from the ASF dual-hosted git repository.

jokser pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 07addc2  IGNITE-11592 Correctly handle cache stop in transactions
07addc2 is described below

commit 07addc2660feceabf0fe7190ba9d8c83be80d4e9
Author: zstan <st...@gmail.com>
AuthorDate: Thu May 23 20:21:53 2019 +0300

    IGNITE-11592 Correctly handle cache stop in transactions
    
    Signed-off-by: Pavel Kovalenko <jo...@gmail.com>
---
 .../processors/cache/GridCacheProcessor.java       |  31 ++
 .../processors/cache/GridCacheSharedContext.java   |   3 +-
 .../dht/GridDhtTopologyFutureAdapter.java          |   4 +
 .../persistence/file/FilePageStoreManager.java     |   2 +-
 .../cache/transactions/IgniteTxEntry.java          |   8 +-
 .../cache/transactions/IgniteTxHandler.java        |  27 +-
 .../cache/transactions/IgniteTxManager.java        |  32 +++
 .../cache/transactions/IgniteTxStateImpl.java      |   7 +
 .../cache/transactions/TransactionProxyImpl.java   |   3 -
 .../cache/transactions/TxOnCachesStopTest.java     | 316 +++++++++++++++++++++
 .../ignite/testsuites/IgniteCacheTestSuite6.java   |   2 +
 11 files changed, 422 insertions(+), 13 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 05a0bf6..28bfa81 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -128,7 +128,9 @@ import org.apache.ignite.internal.processors.cache.query.GridCacheLocalQueryMana
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager;
 import org.apache.ignite.internal.processors.cache.store.CacheStoreManager;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTransactionsImpl;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersionManager;
 import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;
@@ -3276,11 +3278,40 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                 ((GridServiceProcessor)ctx.service()).updateUtilityCache();
         }
 
+        rollbackCoveredTx(exchActions);
+
         if (err == null)
             processCacheStopRequestOnExchangeDone(exchActions);
     }
 
     /**
+     * Rollback tx covered by stopped caches.
+     *
+     * @param exchActions Change requests.
+     */
+    private void rollbackCoveredTx(ExchangeActions exchActions) {
+        if (!exchActions.cacheGroupsToStop().isEmpty() || !exchActions.cacheStopRequests().isEmpty()) {
+            Set<Integer> cachesToStop = new HashSet<>();
+
+            for (ExchangeActions.CacheGroupActionData act : exchActions.cacheGroupsToStop()) {
+                @Nullable CacheGroupContext grpCtx = context().cache().cacheGroup(act.descriptor().groupId());
+
+                if (grpCtx != null && grpCtx.sharedGroup())
+                    cachesToStop.addAll(grpCtx.cacheIds());
+            }
+
+            for (ExchangeActions.CacheActionData act : exchActions.cacheStopRequests())
+                cachesToStop.add(act.descriptor().cacheId());
+
+            if (!cachesToStop.isEmpty()) {
+                IgniteTxManager tm = context().tm();
+
+                tm.rollbackTransactionsForCaches(cachesToStop);
+            }
+        }
+    }
+
+    /**
      * @param grpId Group ID.
      */
     private void stopCacheGroup(int grpId) {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
index 783939c..50b598b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
@@ -1046,10 +1046,9 @@ public class GridCacheSharedContext<K, V> {
 
     /**
      * @param tx Transaction to rollback.
-     * @throws IgniteCheckedException If failed.
      * @return Rollback future.
      */
-    public IgniteInternalFuture rollbackTxAsync(GridNearTxLocal tx) throws IgniteCheckedException {
+    public IgniteInternalFuture rollbackTxAsync(GridNearTxLocal tx) {
         boolean clearThreadMap = txMgr.threadLocalTx(null) == tx;
 
         if (clearThreadMap)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFutureAdapter.java
index 88b99b7..33b20fe 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFutureAdapter.java
@@ -88,6 +88,10 @@ public abstract class GridDhtTopologyFutureAdapter extends GridFutureAdapter<Aff
             return new CacheInvalidStateException(
                 "Failed to perform cache operation (cluster is not activated): " + cctx.name());
 
+        if (cctx.cache() == null)
+            return new CacheInvalidStateException(
+                "Failed to perform cache operation (cache is stopped): " + cctx.name());
+
         OperationType opType = read ? OperationType.READ : WRITE;
 
         CacheGroupContext grp = cctx.group();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
index 3bf4db8..8ab5033 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
@@ -1086,7 +1086,7 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen
 
         if (file.exists()) {
             if (!file.delete())
-                throw new IgniteCheckedException("Failed to delete cache configuration:" + cacheCfg.getName());
+                throw new IgniteCheckedException("Failed to delete cache configuration: " + cacheCfg.getName());
         }
     }
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
index a105a2f..b8d596c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
@@ -30,6 +30,7 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.GridDirectTransient;
 import org.apache.ignite.internal.IgniteCodeGeneratingFail;
 import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
+import org.apache.ignite.internal.processors.cache.CacheInvalidStateException;
 import org.apache.ignite.internal.processors.cache.CacheInvokeEntry;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
@@ -940,9 +941,10 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
         if (this.ctx == null) {
             GridCacheContext<?, ?> cacheCtx = ctx.cacheContext(cacheId);
 
-            assert cacheCtx != null : "Failed to find cache context [cacheId=" + cacheId +
-                ", readyTopVer=" + ctx.exchange().readyAffinityVersion() + ']';
-
+            if (cacheCtx == null)
+                throw new CacheInvalidStateException(
+                    "Failed to perform cache operation (cache is stopped), cacheId=" + cacheId);
+            
             if (cacheCtx.isNear() && !near)
                 cacheCtx = cacheCtx.near().dht().context();
             else if (!cacheCtx.isNear() && near)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index 787fb97..8846115 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -108,6 +108,7 @@ import static org.apache.ignite.internal.managers.communication.GridIoPolicy.UTI
 import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
 import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isNearEnabled;
 import static org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx.FinalizationStatus.USER_FINISH;
+import static org.apache.ignite.internal.util.lang.GridFunc.isEmpty;
 import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
 import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
 import static org.apache.ignite.transactions.TransactionState.PREPARED;
@@ -162,13 +163,13 @@ public class IgniteTxHandler {
      * @param nearNode Sender node.
      * @param req Request.
      */
-    private void processNearTxPrepareRequest0(ClusterNode nearNode, GridNearTxPrepareRequest req) {
+    private IgniteInternalFuture<GridNearTxPrepareResponse> processNearTxPrepareRequest0(ClusterNode nearNode, GridNearTxPrepareRequest req) {
         IgniteInternalFuture<GridNearTxPrepareResponse> fut;
 
         if (req.firstClientRequest() && req.allowWaitTopologyFuture()) {
             for (;;) {
                 if (waitForExchangeFuture(nearNode, req))
-                    return;
+                    return new GridFinishedFuture<>();
 
                 fut = prepareNearTx(nearNode, req);
 
@@ -181,6 +182,8 @@ public class IgniteTxHandler {
 
         assert req.txState() != null || fut == null || fut.error() != null ||
             (ctx.tm().tx(req.version()) == null && ctx.tm().nearTx(req.version()) == null);
+
+        return fut;
     }
 
     /**
@@ -348,7 +351,7 @@ public class IgniteTxHandler {
      * @param req Request.
      * @return Prepare future.
      */
-    public IgniteInternalFuture<GridNearTxPrepareResponse>  prepareNearTxLocal(
+    public IgniteInternalFuture<GridNearTxPrepareResponse> prepareNearTxLocal(
         final GridNearTxLocal originTx,
         final GridNearTxPrepareRequest req) {
         // Make sure not to provide Near entries to DHT cache.
@@ -623,10 +626,26 @@ public class IgniteTxHandler {
                                 return;
                             }
                             ctx.kernalContext().closure().runLocalWithThreadPolicy(thread, () -> {
+                                IgniteInternalFuture<GridNearTxPrepareResponse> fut = null;
+
+                                Throwable err = null;
+
                                 try {
-                                    processNearTxPrepareRequest0(node, req);
+                                    for (IgniteTxEntry itm : F.concat(false, req.writes(), req.reads())) {
+                                        err = topFut.validateCache(itm.context(), req.recovery(), isEmpty(req.writes()),
+                                            null, null);
+
+                                        if (err != null)
+                                            break;
+                                    }
+
+                                    if (err == null)
+                                        fut = processNearTxPrepareRequest0(node, req);
                                 }
                                 finally {
+                                    if (fut == null || fut.error() != null || err != null)
+                                        sendResponseOnTimeoutOrError(e, topFut, node, req);
+
                                     ctx.io().onMessageProcessed(req);
                                 }
                             });
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index 06633b4..87bd905 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -321,6 +321,38 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
     }
 
     /**
+     * @param cachesToStop Caches to stop.
+     */
+    public void rollbackTransactionsForCaches(Set<Integer> cachesToStop) {
+        if (!cachesToStop.isEmpty()) {
+            IgniteTxManager tm = context().tm();
+
+            Collection<IgniteInternalTx> active = tm.activeTransactions();
+
+            GridCompoundFuture<IgniteInternalTx, IgniteInternalTx> compFut = new GridCompoundFuture<>();
+
+            for (IgniteInternalTx tx : active) {
+                for (IgniteTxEntry e : tx.allEntries()) {
+                    if (cachesToStop.contains(e.context().cacheId())) {
+                        compFut.add(tx.rollbackAsync());
+
+                        break;
+                    }
+                }
+            }
+
+            compFut.markInitialized();
+
+            try {
+                compFut.get();
+            }
+            catch (IgniteCheckedException e) {
+                U.error(log, "Error occured during tx rollback.", e);
+            }
+        }
+    }
+
+    /**
      * Rollback transactions blocking partition map exchange.
      *
      * @param topVer Initial exchange version.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
index b5a0539..402085c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
@@ -24,8 +24,10 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+
 import org.apache.ignite.IgniteCacheRestartingException;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.cache.CacheInterceptor;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
@@ -123,6 +125,9 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter {
         for (int i = 0; i < activeCacheIds.size(); i++) {
             int cacheId = activeCacheIds.get(i);
 
+            if (cctx.cacheContext(cacheId) == null)
+                throw new IgniteException("Cache is stopped, id=" + cacheId);
+
             cctx.cacheContext(cacheId).cache().awaitLastFut();
         }
     }
@@ -394,6 +399,8 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter {
 
             GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
 
+            assert cacheCtx != null : "cacheCtx == null, cacheId=" + cacheId;
+
             onTxEnd(cacheCtx, tx, commit);
         }
     }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java
index 628b813..110f34d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java
@@ -363,9 +363,6 @@ public class TransactionProxyImpl<K, V> implements TransactionProxy, Externaliza
         try {
             return (IgniteFuture<Void>)(new IgniteFutureImpl(cctx.rollbackTxAsync(tx)));
         }
-        catch (IgniteCheckedException e) {
-            throw U.convertException(e);
-        }
         finally {
             leave();
         }
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOnCachesStopTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOnCachesStopTest.java
new file mode 100644
index 0000000..5086f28
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOnCachesStopTest.java
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.transactions;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.CacheInvalidStateException;
+import org.apache.ignite.internal.processors.cache.CacheStoppedException;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest;
+import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
+import org.apache.ignite.internal.util.GridRandom;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.MvccFeatureChecker;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+import org.apache.ignite.transactions.TransactionRollbackException;
+import org.junit.Test;
+
+import java.util.concurrent.CountDownLatch;
+
+/**
+ *
+ */
+public class TxOnCachesStopTest extends GridCommonAbstractTest {
+    /** Cache1 name. */
+    private static final String CACHE_1_NAME = "cache1";
+
+    /** Cache2 name. */
+    private static final String CACHE_2_NAME = "cache2";
+
+    /** rnd instance. */
+    private static final GridRandom rnd = new GridRandom();
+
+    /** */
+    private CacheConfiguration<Integer, byte[]> destroyCacheCfg;
+
+    /** */
+    private CacheConfiguration<Integer, byte[]> surviveCacheCfg;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        TestRecordingCommunicationSpi commSpi = new TestRecordingCommunicationSpi();
+        cfg.setCommunicationSpi(commSpi);
+
+        DataStorageConfiguration memCfg = new DataStorageConfiguration()
+            .setDefaultDataRegionConfiguration(
+                new DataRegionConfiguration().setMaxSize(100 * 1024 * 1024).setPersistenceEnabled(true))
+            .setWalMode(WALMode.LOG_ONLY);
+
+        cfg.setDataStorageConfiguration(memCfg);
+
+        CacheConfiguration<Integer, byte[]> ccfg1 = new CacheConfiguration<>();
+
+        ccfg1.setName(CACHE_1_NAME);
+        ccfg1.setBackups(1);
+        ccfg1.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
+        ccfg1.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+        ccfg1.setAffinity(new RendezvousAffinityFunction(false, 32));
+
+        destroyCacheCfg = ccfg1;
+
+        CacheConfiguration<Integer, byte[]> ccfg2 = new CacheConfiguration<>();
+
+        ccfg2.setName(CACHE_2_NAME);
+        ccfg2.setBackups(1);
+        ccfg2.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
+        ccfg2.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+        ccfg2.setAffinity(new RendezvousAffinityFunction(false, 32));
+
+        surviveCacheCfg = ccfg2;
+
+        cfg.setCacheConfiguration(ccfg1, ccfg2);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        stopAllGrids();
+
+        cleanPersistenceDir();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        grid(0).destroyCache(destroyCacheCfg.getName());
+        grid(0).destroyCache(surviveCacheCfg.getName());
+
+        stopAllGrids();
+
+        cleanPersistenceDir();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testTxOnCacheStopNoMessageBlock() throws Exception {
+        testTxOnCacheStop(false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testTxOnCacheStopWithMessageBlock() throws Exception {
+        testTxOnCacheStop(true);
+    }
+
+    /**
+     * @param block {@code True} To block GridNearTxPrepareRequest message.
+     */
+    public void testTxOnCacheStop(boolean block) throws Exception {
+        startGridsMultiThreaded(2);
+
+        Ignition.setClientMode(true);
+
+        IgniteEx ig = startGrid("client");
+
+        ig.cluster().active(true);
+
+        for (TransactionConcurrency conc : TransactionConcurrency.values())
+            for (TransactionIsolation iso : TransactionIsolation.values())
+                runTxOnCacheStop(conc, iso, ig, block);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testTxOnCacheStopInMid() throws Exception {
+        startGridsMultiThreaded(2);
+
+        Ignition.setClientMode(true);
+
+        IgniteEx ig = startGrid("client");
+
+        ig.cluster().active(true);
+
+        for (TransactionConcurrency conc : TransactionConcurrency.values())
+            for (TransactionIsolation iso : TransactionIsolation.values())
+                runCacheStopInMidTx(conc, iso, ig);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void runTxOnCacheStop(TransactionConcurrency conc, TransactionIsolation iso, Ignite ig, boolean runConc)
+        throws Exception {
+        if ((conc == TransactionConcurrency.OPTIMISTIC) && (MvccFeatureChecker.forcedMvcc()))
+            return;
+
+        CountDownLatch destroyLatch = new CountDownLatch(1);
+
+        final IgniteCache<Integer, byte[]> cache = ig.getOrCreateCache(destroyCacheCfg);
+
+        final IgniteCache<Integer, byte[]> cache2 = ig.getOrCreateCache(surviveCacheCfg);
+
+        TestRecordingCommunicationSpi spi = TestRecordingCommunicationSpi.spi(ig);
+
+        IgniteInternalFuture f0 = GridTestUtils.runAsync(() -> {
+            try {
+                destroyLatch.await();
+
+                IgniteInternalFuture f = GridTestUtils.runAsync(() -> {
+                    doSleep(rnd.nextInt(500));
+
+                    spi.stopBlock();
+                });
+
+                cache.destroy();
+
+                f.get();
+            }
+            catch (Exception e) {
+                e.printStackTrace();
+            }
+        });
+
+        spi.blockMessages((node, msg) -> {
+            if (msg instanceof GridNearTxPrepareRequest) {
+                destroyLatch.countDown();
+
+                return runConc;
+            }
+
+            return false;
+        });
+
+        IgniteInternalFuture f1 = GridTestUtils.runAsync(() -> {
+            byte[] val = new byte[1024];
+
+            try (Transaction tx = ig.transactions().txStart(conc, iso, 1_000, 2)) {
+                cache.put(100, val);
+
+                cache2.put(100, val);
+
+                tx.commit();
+            }
+            catch (IgniteException e) {
+                assertTrue(X.hasCause(e, IgniteTxTimeoutCheckedException.class)
+                    || X.hasCause(e, CacheInvalidStateException.class) || X.hasCause(e, IgniteException.class));
+            }
+        });
+
+        f1.get();
+        f0.get();
+
+        try {
+            assertEquals(cache2.get(100), cache.get(100));
+        }
+        catch (IllegalStateException e) {
+            assertTrue(X.hasCause(e, CacheStoppedException.class));
+        }
+
+        spi.stopBlock();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void runCacheStopInMidTx(TransactionConcurrency conc, TransactionIsolation iso, Ignite ig) throws Exception {
+        if ((conc == TransactionConcurrency.OPTIMISTIC) && (MvccFeatureChecker.forcedMvcc()))
+            return;
+
+        CountDownLatch destroyLatch = new CountDownLatch(1);
+
+        CountDownLatch putLatch = new CountDownLatch(1);
+
+        final IgniteCache<Integer, byte[]> cache = ig.getOrCreateCache(destroyCacheCfg);
+
+        final IgniteCache<Integer, byte[]> cache2 = ig.getOrCreateCache(surviveCacheCfg);
+
+        IgniteInternalFuture f0 = GridTestUtils.runAsync(() -> {
+            try {
+                putLatch.await();
+
+                cache.destroy();
+
+                destroyLatch.countDown();
+            }
+            catch (Exception e) {
+                e.printStackTrace();
+            }
+        });
+
+        IgniteInternalFuture f1 = GridTestUtils.runAsync(() -> {
+            byte[] val = new byte[1024];
+
+            try (Transaction tx = ig.transactions().txStart(conc, iso, 1_000, 2)) {
+                cache.put(100, val);
+
+                cache2.put(100, val);
+
+                putLatch.countDown();
+
+                destroyLatch.await();
+
+                tx.commit();
+            }
+            catch (IgniteException e) {
+                assertTrue(X.hasCause(e, CacheInvalidStateException.class) ||
+                    X.hasCause(e, CacheStoppedException.class) || X.hasCause(e, TransactionRollbackException.class) ||
+                    X.hasCause(e, IgniteException.class));
+            }
+            catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+
+        });
+
+        f1.get();
+        f0.get();
+
+        assertNull(cache2.get(100));
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
index 5bbd7b2..f2e7806 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
@@ -44,6 +44,7 @@ import org.apache.ignite.internal.processors.cache.transactions.TxLabelTest;
 import org.apache.ignite.internal.processors.cache.transactions.TxLocalDhtMixedCacheModesTest;
 import org.apache.ignite.internal.processors.cache.transactions.TxMultiCacheAsyncOpsTest;
 import org.apache.ignite.internal.processors.cache.transactions.TxOnCachesStartTest;
+import org.apache.ignite.internal.processors.cache.transactions.TxOnCachesStopTest;
 import org.apache.ignite.internal.processors.cache.transactions.TxOptimisticOnPartitionExchangeTest;
 import org.apache.ignite.internal.processors.cache.transactions.TxOptimisticPrepareOnUnstableTopologyTest;
 import org.apache.ignite.internal.processors.cache.transactions.TxRollbackAsyncNearCacheTest;
@@ -104,6 +105,7 @@ public class IgniteCacheTestSuite6 {
         GridTestUtils.addTestIfNeeded(suite, TxMultiCacheAsyncOpsTest.class, ignoredTests);
 
         GridTestUtils.addTestIfNeeded(suite, TxOnCachesStartTest.class, ignoredTests);
+        GridTestUtils.addTestIfNeeded(suite, TxOnCachesStopTest.class, ignoredTests);
 
         GridTestUtils.addTestIfNeeded(suite, IgniteCache150ClientsTest.class, ignoredTests);