You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2018/08/29 16:50:43 UTC

[03/50] [abbrv] ignite git commit: IGNITE-8920 - Fixed transaction hanging on Runtime exceptions during commit. - Unified StorageException and PersistentStorageIOException. - BPlus tree behavior is overridable in tests. - Refactored AccountTransferAmount

http://git-wip-us.apache.org/repos/asf/ignite/blob/dde936ac/modules/core/src/test/java/org/apache/ignite/failure/AccountTransferTransactionTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/failure/AccountTransferTransactionTest.java b/modules/core/src/test/java/org/apache/ignite/failure/AccountTransferTransactionTest.java
deleted file mode 100644
index 8d7cf15..0000000
--- a/modules/core/src/test/java/org/apache/ignite/failure/AccountTransferTransactionTest.java
+++ /dev/null
@@ -1,331 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.failure;
-
-import javax.management.MBeanServer;
-import javax.management.MBeanServerInvocationHandler;
-import javax.management.ObjectName;
-import java.lang.management.ManagementFactory;
-import java.util.ArrayList;
-import java.util.Random;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteCache;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.cache.CacheMode;
-import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
-import org.apache.ignite.cache.eviction.fifo.FifoEvictionPolicy;
-import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.configuration.DataRegionConfiguration;
-import org.apache.ignite.configuration.DataStorageConfiguration;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.internal.IgniteEx;
-import org.apache.ignite.internal.TestRecordingCommunicationSpi;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.internal.worker.WorkersControlMXBeanImpl;
-import org.apache.ignite.mxbean.WorkersControlMXBean;
-import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-import org.apache.ignite.transactions.Transaction;
-import org.jetbrains.annotations.NotNull;
-
-import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
-import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
-import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
-import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
-
-/**
- * Test transfer amount between accounts with enabled {@link StopNodeFailureHandler}.
- */
-public class AccountTransferTransactionTest extends GridCommonAbstractTest {
-    /** */
-    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
-    /** Count of accounts in one thread. */
-    private static final int ACCOUNTS_CNT = 20;
-    /** Count of threads and caches. */
-    private static final int THREADS_CNT = 20;
-    /** Count of nodes to start. */
-    private static final int NODES_CNT = 3;
-    /** Count of transaction on cache. */
-    private static final int TRANSACTION_CNT = 10;
-
-    /** {@inheritDoc} */
-    @Override protected FailureHandler getFailureHandler(String igniteInstanceName) {
-        return new StopNodeFailureHandler();
-    }
-
-    /** {@inheritDoc} */
-    @Override protected IgniteConfiguration getConfiguration(String name) throws Exception {
-        final IgniteConfiguration cfg = super.getConfiguration(name);
-
-        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
-        cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
-        cfg.setLocalHost("127.0.0.1");
-        cfg.setDataStorageConfiguration(new DataStorageConfiguration()
-            .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
-                .setMaxSize(50 * 1024 * 1024)
-                .setPersistenceEnabled(true))
-        );
-
-        CacheConfiguration[] cacheConfigurations = new CacheConfiguration[THREADS_CNT];
-        for (int i = 0; i < THREADS_CNT; i++) {
-            cacheConfigurations[i] = new CacheConfiguration()
-                .setName(cacheName(i))
-                .setAffinity(new RendezvousAffinityFunction(false, 32))
-                .setBackups(1)
-                .setAtomicityMode(TRANSACTIONAL)
-                .setCacheMode(CacheMode.PARTITIONED)
-                .setWriteSynchronizationMode(FULL_SYNC)
-                .setEvictionPolicy(new FifoEvictionPolicy(1000))
-                .setOnheapCacheEnabled(true);
-        }
-
-        cfg.setCacheConfiguration(cacheConfigurations);
-
-        return cfg;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void beforeTest() throws Exception {
-        super.beforeTest();
-
-        stopAllGrids();
-
-        cleanPersistenceDir();
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void afterTest() throws Exception {
-        super.afterTest();
-
-        stopAllGrids();
-
-        cleanPersistenceDir();
-    }
-
-    /**
-     * Test transfer amount.
-     */
-    public void testTransferAmount() throws Exception {
-        //given: started some nodes with client.
-        startGrids(NODES_CNT);
-
-        IgniteEx igniteClient = startGrid(getClientConfiguration(NODES_CNT));
-
-        igniteClient.cluster().active(true);
-
-        Random random = new Random();
-
-        long[] initAmount = new long[THREADS_CNT];
-
-        //and: fill all accounts on all caches and calculate total amount for every cache.
-        for (int cachePrefixIdx = 0; cachePrefixIdx < THREADS_CNT; cachePrefixIdx++) {
-            IgniteCache<Object, Object> cache = igniteClient.getOrCreateCache(cacheName(cachePrefixIdx));
-
-            try (Transaction tx = igniteClient.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
-                for (int accountId = 0; accountId < ACCOUNTS_CNT; accountId++) {
-                    Long amount = (long)random.nextInt(1000);
-
-                    cache.put(accountId, amount);
-
-                    initAmount[cachePrefixIdx] += amount;
-                }
-
-                tx.commit();
-            }
-        }
-
-        //when: start transfer amount from account to account in different threads.
-        CountDownLatch firstTransactionDone = new CountDownLatch(THREADS_CNT);
-
-        ArrayList<Thread> transferThreads = new ArrayList<>();
-
-        for (int i = 0; i < THREADS_CNT; i++) {
-            transferThreads.add(new TransferAmountTxThread(firstTransactionDone, igniteClient, cacheName(i)));
-
-            transferThreads.get(i).start();
-        }
-
-        firstTransactionDone.await(10, TimeUnit.SECONDS);
-
-        //and: terminate disco-event-worker thread on one node.
-        WorkersControlMXBean bean = workersMXBean(1);
-
-        bean.terminateWorker(
-            bean.getWorkerNames().stream()
-                .filter(name -> name.startsWith("disco-event-worker"))
-                .findFirst()
-                .orElse(null)
-        );
-
-        for (Thread thread : transferThreads) {
-            thread.join();
-        }
-
-        long[] resultAmount = new long[THREADS_CNT];
-
-        //then: calculate total amount for every thread.
-        for (int j = 0; j < THREADS_CNT; j++) {
-            String cacheName = cacheName(j);
-
-            IgniteCache<Object, Object> cache = igniteClient.getOrCreateCache(cacheName);
-
-            try (Transaction tx = igniteClient.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
-
-                for (int i = 0; i < ACCOUNTS_CNT; i++)
-                    resultAmount[j] += getNotNullValue(cache, i);
-                tx.commit();
-            }
-
-            long diffAmount = initAmount[j] - resultAmount[j];
-
-            //and: check that result amount equal to init amount.
-            assertTrue(
-                String.format("Total amount before and after transfer is not same: diff=%s, cache=%s",
-                    diffAmount, cacheName),
-                diffAmount == 0
-            );
-        }
-    }
-
-    /**
-     * Make test cache name by prefix.
-     */
-    @NotNull private String cacheName(int cachePrefixIdx) {
-        return "cache" + cachePrefixIdx;
-    }
-
-    /**
-     * Ignite configuration for client.
-     */
-    @NotNull private IgniteConfiguration getClientConfiguration(int nodesPrefix) throws Exception {
-        IgniteConfiguration clientConf = getConfiguration(getTestIgniteInstanceName(nodesPrefix));
-
-        clientConf.setClientMode(true);
-
-        return clientConf;
-    }
-
-    /**
-     * Extract not null value from cache.
-     */
-    private long getNotNullValue(IgniteCache<Object, Object> cache, int i) {
-        Object value = cache.get(i);
-
-        return value == null ? 0 : ((Long)value);
-    }
-
-    /**
-     * Configure workers mx bean.
-     */
-    private WorkersControlMXBean workersMXBean(int igniteInt) throws Exception {
-        ObjectName mbeanName = U.makeMBeanName(
-            getTestIgniteInstanceName(igniteInt),
-            "Kernal",
-            WorkersControlMXBeanImpl.class.getSimpleName()
-        );
-
-        MBeanServer mbeanSrv = ManagementFactory.getPlatformMBeanServer();
-
-        if (!mbeanSrv.isRegistered(mbeanName))
-            fail("MBean is not registered: " + mbeanName.getCanonicalName());
-
-        return MBeanServerInvocationHandler.newProxyInstance(mbeanSrv, mbeanName, WorkersControlMXBean.class, true);
-    }
-
-    /**
-     *
-     */
-    private static class TransferAmountTxThread extends Thread {
-        /** */
-        private CountDownLatch firstTransactionLatch;
-        /** */
-        private Ignite ignite;
-        /** */
-        private String cacheName;
-        /** */
-        private Random random = new Random();
-
-        /**
-         * @param ignite Ignite.
-         */
-        private TransferAmountTxThread(CountDownLatch firstTransactionLatch, final Ignite ignite, String cacheName) {
-            this.firstTransactionLatch = firstTransactionLatch;
-            this.ignite = ignite;
-            this.cacheName = cacheName;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void run() {
-            for (int i = 0; i < TRANSACTION_CNT; i++) {
-                try {
-                    updateInTransaction(ignite.cache(cacheName));
-                }
-                finally {
-                    if (i == 0)
-                        firstTransactionLatch.countDown();
-                }
-            }
-        }
-
-        /**
-         * @throws IgniteException if fails
-         */
-        @SuppressWarnings("unchecked")
-        private void updateInTransaction(IgniteCache cache) throws IgniteException {
-            int accIdFrom = random.nextInt(ACCOUNTS_CNT);
-            int accIdTo = random.nextInt(ACCOUNTS_CNT);
-
-            if (accIdFrom == accIdTo)
-                accIdTo = (int)getNextAccountId(accIdFrom);
-
-            Long acctFrom;
-            Long acctTo;
-
-            try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
-                acctFrom = (Long)cache.get(accIdFrom);
-                acctTo = (Long)cache.get(accIdTo);
-
-                long transactionAmount = (long)(random.nextDouble() * acctFrom);
-
-                cache.put(accIdFrom, acctFrom - transactionAmount);
-                cache.put(accIdTo, acctTo + transactionAmount);
-
-                tx.commit();
-            }
-        }
-
-        /**
-         * @param curr current
-         * @return random value
-         */
-        private long getNextAccountId(long curr) {
-            long randomVal;
-
-            do {
-                randomVal = random.nextInt(ACCOUNTS_CNT);
-            }
-            while (curr == randomVal);
-
-            return randomVal;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/dde936ac/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCorruptedStoreTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCorruptedStoreTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCorruptedStoreTest.java
index fc2a7d6..40025f6 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCorruptedStoreTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCorruptedStoreTest.java
@@ -38,7 +38,6 @@ import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.pagemem.PageIdUtils;
-import org.apache.ignite.internal.pagemem.wal.StorageException;
 import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
 import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
 import org.apache.ignite.internal.processors.cache.persistence.file.FileIODecorator;

http://git-wip-us.apache.org/repos/asf/ignite/blob/dde936ac/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFormatFileFailoverTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFormatFileFailoverTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFormatFileFailoverTest.java
index 379b8c3..5a1a6fa 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFormatFileFailoverTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFormatFileFailoverTest.java
@@ -32,7 +32,7 @@ import org.apache.ignite.configuration.WALMode;
 import org.apache.ignite.failure.FailureHandler;
 import org.apache.ignite.failure.TestFailureHandler;
 import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
-import org.apache.ignite.internal.pagemem.wal.StorageException;
+import org.apache.ignite.internal.processors.cache.persistence.StorageException;
 import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
 import org.apache.ignite.internal.processors.cache.persistence.file.FileIODecorator;
 import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;

http://git-wip-us.apache.org/repos/asf/ignite/blob/dde936ac/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java
index ecc7b03..3d62fe1 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java
@@ -20,7 +20,7 @@ package org.apache.ignite.internal.processors.cache.persistence.pagemem;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
-import org.apache.ignite.internal.pagemem.wal.StorageException;
+import org.apache.ignite.internal.processors.cache.persistence.StorageException;
 import org.apache.ignite.internal.pagemem.wal.WALIterator;
 import org.apache.ignite.internal.pagemem.wal.WALPointer;
 import org.apache.ignite.internal.pagemem.wal.record.WALRecord;

http://git-wip-us.apache.org/repos/asf/ignite/blob/dde936ac/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/AbstractTransactionIntergrityTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/AbstractTransactionIntergrityTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/AbstractTransactionIntergrityTest.java
new file mode 100644
index 0000000..fe27e6e
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/AbstractTransactionIntergrityTest.java
@@ -0,0 +1,595 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.transactions;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import com.google.common.collect.Sets;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.failure.FailureHandler;
+import org.apache.ignite.failure.StopNodeFailureHandler;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.jetbrains.annotations.NotNull;
+import org.jsr166.ConcurrentLinkedHashMap;
+import org.junit.Assert;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+
+/**
+ * Test transfer amount between accounts with enabled {@link StopNodeFailureHandler}.
+ *
+ * This test can be extended to emulate failover scenarios during transactional operations on the grid.
+ */
+public class AbstractTransactionIntergrityTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** Count of accounts in one thread. */
+    private static final int DFLT_ACCOUNTS_CNT = 32;
+
+    /** Count of threads and caches. */
+    private static final int DFLT_TX_THREADS_CNT = 20;
+
+    /** Count of nodes to start. */
+    private static final int DFLT_NODES_CNT = 3;
+
+    /** Count of transaction on cache. */
+    private static final int DFLT_TRANSACTIONS_CNT = 10;
+
+    /** Completed transactions map. */
+    private ConcurrentLinkedHashMap[] completedTxs;
+
+    /**
+     *
+     */
+    protected int nodesCount() {
+        return DFLT_NODES_CNT;
+    }
+
+    /**
+     *
+     */
+    protected int accountsCount() {
+        return DFLT_ACCOUNTS_CNT;
+    }
+
+    /**
+     *
+     */
+    protected int transactionsCount() {
+        return DFLT_TRANSACTIONS_CNT;
+    }
+
+    /**
+     *
+     */
+    protected int txThreadsCount() {
+        return DFLT_TX_THREADS_CNT;
+    }
+
+    /**
+     * @return Flag enables secondary index on account caches.
+     */
+    protected boolean indexed() {
+        return false;
+    }
+
+    /**
+     * @return Flag enables persistence on account caches.
+     */
+    protected boolean persistent() {
+        return true;
+    }
+
+    /**
+     * @return Flag enables cross-node transactions,
+     *         when primary partitions participating in transaction spreaded across several cluster nodes.
+     */
+    protected boolean crossNodeTransactions() {
+        // Commit error during cross node transactions breaks transaction integrity
+        // TODO: https://issues.apache.org/jira/browse/IGNITE-9086
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected FailureHandler getFailureHandler(String igniteInstanceName) {
+        return new StopNodeFailureHandler();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String name) throws Exception {
+        final IgniteConfiguration cfg = super.getConfiguration(name);
+
+        cfg.setConsistentId(name);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+        cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
+        cfg.setLocalHost("127.0.0.1");
+
+        cfg.setDataStorageConfiguration(new DataStorageConfiguration()
+            .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
+                .setMaxSize(256 * 1024 * 1024)
+                .setPersistenceEnabled(persistent()))
+        );
+
+        CacheConfiguration[] cacheConfigurations = new CacheConfiguration[txThreadsCount()];
+
+        for (int i = 0; i < txThreadsCount(); i++) {
+            CacheConfiguration ccfg = new CacheConfiguration()
+                .setName(cacheName(i))
+                .setAffinity(new RendezvousAffinityFunction(false, accountsCount()))
+                .setBackups(1)
+                .setAtomicityMode(TRANSACTIONAL)
+                .setCacheMode(CacheMode.PARTITIONED)
+                .setWriteSynchronizationMode(FULL_SYNC)
+                .setReadFromBackup(true)
+                .setOnheapCacheEnabled(true);
+
+            if (indexed())
+                ccfg.setIndexedTypes(IgniteUuid.class, AccountState.class);
+
+            cacheConfigurations[i] = ccfg;
+        }
+
+        cfg.setCacheConfiguration(cacheConfigurations);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        stopAllGrids();
+
+        cleanPersistenceDir();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+
+        cleanPersistenceDir();
+    }
+
+    /**
+     * Make test cache name by prefix.
+     */
+    @NotNull private String cacheName(int cachePrefixIdx) {
+        return "cache" + cachePrefixIdx;
+    }
+
+    /**
+     * Ignite configuration for client.
+     */
+    @NotNull private IgniteConfiguration getClientConfiguration(int nodesPrefix) throws Exception {
+        IgniteConfiguration clientConf = getConfiguration(getTestIgniteInstanceName(nodesPrefix));
+
+        clientConf.setClientMode(true);
+
+        return clientConf;
+    }
+
+    /**
+     * Test transfer amount.
+     */
+    public void doTestTransferAmount(FailoverScenario failoverScenario) throws Exception {
+        failoverScenario.beforeNodesStarted();
+
+        //given: started some nodes with client.
+        startGrids(nodesCount());
+
+        IgniteEx igniteClient = startGrid(getClientConfiguration(nodesCount()));
+
+        igniteClient.cluster().active(true);
+
+        int[] initAmount = new int[txThreadsCount()];
+        completedTxs = new ConcurrentLinkedHashMap[txThreadsCount()];
+
+        //and: fill all accounts on all caches and calculate total amount for every cache.
+        for (int cachePrefixIdx = 0; cachePrefixIdx < txThreadsCount(); cachePrefixIdx++) {
+            IgniteCache<Integer, AccountState> cache = igniteClient.getOrCreateCache(cacheName(cachePrefixIdx));
+
+            AtomicInteger coinsCounter = new AtomicInteger();
+
+            try (Transaction tx = igniteClient.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+                for (int accountId = 0; accountId < accountsCount(); accountId++) {
+                    Set<Integer> initialAmount = generateCoins(coinsCounter, 5);
+
+                    cache.put(accountId, new AccountState(accountId, tx.xid(), initialAmount));
+                }
+
+                tx.commit();
+            }
+
+            initAmount[cachePrefixIdx] = coinsCounter.get();
+            completedTxs[cachePrefixIdx] = new ConcurrentLinkedHashMap();
+        }
+
+        //when: start transfer amount from account to account in different threads.
+        CountDownLatch firstTransactionDone = new CountDownLatch(txThreadsCount());
+
+        ArrayList<Thread> transferThreads = new ArrayList<>();
+
+        for (int i = 0; i < txThreadsCount(); i++) {
+            transferThreads.add(new TransferAmountTxThread(firstTransactionDone, igniteClient, cacheName(i), i));
+
+            transferThreads.get(i).start();
+        }
+
+        firstTransactionDone.await(10, TimeUnit.SECONDS);
+
+        failoverScenario.afterFirstTransaction();
+
+        for (Thread thread : transferThreads) {
+            thread.join();
+        }
+
+        failoverScenario.afterTransactionsFinished();
+
+        consistencyCheck(initAmount);
+    }
+
+    /**
+     * Calculates total amount of coins for every thread for every node and checks that coins difference is zero (transaction integrity is saved).
+     */
+    private void consistencyCheck(int[] initAmount) {
+        for (Ignite node : G.allGrids()) {
+            for (int j = 0; j < txThreadsCount(); j++) {
+                List<Integer> totalCoins = new ArrayList<>();
+
+                String cacheName = cacheName(j);
+
+                IgniteCache<Integer, AccountState> cache = node.getOrCreateCache(cacheName);
+
+                AccountState[] accStates = new AccountState[accountsCount()];
+
+                try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+                    for (int i = 0; i < accountsCount(); i++) {
+                        AccountState state = cache.get(i);
+
+                        Assert.assertNotNull("Account state has lost [node=" + node.name() + ", cache=" + cacheName + ", accNo=" + i + "]", state);
+
+                        totalCoins.addAll(state.coins);
+
+                        accStates[i] = state;
+                    }
+
+                    tx.commit();
+                }
+
+                Collections.sort(totalCoins);
+
+                if (initAmount[j] != totalCoins.size()) {
+                    Set<Integer> lostCoins = new HashSet<>();
+                    Set<Integer> duplicateCoins = new HashSet<>();
+
+                    for (int coin = 1; coin <= initAmount[j]; coin++)
+                        if (!totalCoins.contains(coin))
+                            lostCoins.add(coin);
+
+                    for (int coinIdx = 1; coinIdx < totalCoins.size(); coinIdx++)
+                        if (totalCoins.get(coinIdx).equals(totalCoins.get(coinIdx - 1)))
+                            duplicateCoins.add(totalCoins.get(coinIdx));
+
+                    log.error("Transaction integrity failed for [node=" + node.name() + ", cache=" + cacheName + "]");
+
+                    log.error(String.format("Total amount of coins before and after transfers are not same. Lost coins: %s. Duplicate coins: %s.",
+                        Objects.toString(lostCoins),
+                        Objects.toString(duplicateCoins)));
+
+                    ConcurrentLinkedHashMap<IgniteUuid, TxState> txs = completedTxs[j];
+
+                    for (TxState tx : txs.values())
+                        log.error("Tx: " + tx);
+
+                    for (int i = 0; i < accountsCount(); i++)
+                        log.error("Account state " + i + " = " + accStates[i]);
+
+                    assertFalse("Test failed. See messages above", true);
+                }
+            }
+        }
+    }
+
+    /**
+     *
+     */
+    public static class AccountState {
+        /** Account id. */
+        private final int accId;
+
+        /** Last performed transaction id on account state. */
+        @QuerySqlField(index = true)
+        private final IgniteUuid txId;
+
+        /** Set of coins holds in account. */
+        private final Set<Integer> coins;
+
+        /**
+         * @param accId Acc id.
+         * @param txId Tx id.
+         * @param coins Coins.
+         */
+        public AccountState(int accId, IgniteUuid txId, Set<Integer> coins) {
+            this.txId = txId;
+            this.coins = Collections.unmodifiableSet(coins);
+            this.accId = accId;
+        }
+
+        /**
+         * @param random Randomizer.
+         * @return Set of coins need to transfer from.
+         */
+        public Set<Integer> coinsToTransfer(Random random) {
+            int coinsNum = random.nextInt(coins.size());
+
+            return coins.stream().limit(coinsNum).collect(Collectors.toSet());
+        }
+
+        /**
+         * @param txId Transaction id.
+         * @param coinsToAdd Coins to add to current account.
+         * @return Account state with added coins.
+         */
+        public AccountState addCoins(IgniteUuid txId, Set<Integer> coinsToAdd) {
+            return new AccountState(accId, txId, Sets.union(coins, coinsToAdd).immutableCopy());
+        }
+
+        /**
+         * @param txId Transaction id.
+         * @param coinsToRemove Coins to remove from current account.
+         * @return Account state with removed coins.
+         */
+        public AccountState removeCoins(IgniteUuid txId, Set<Integer> coinsToRemove) {
+            return new AccountState(accId, txId, Sets.difference(coins, coinsToRemove).immutableCopy());
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            AccountState that = (AccountState) o;
+            return Objects.equals(txId, that.txId) &&
+                Objects.equals(coins, that.coins);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return Objects.hash(txId, coins);
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return "AccountState{" +
+                "accId=" + Objects.toString(accId) +
+                ", coins=" + Objects.toString(coins) +
+                '}';
+        }
+    }
+
+    /**
+     * @param coinsNum Coins number.
+     */
+    private Set<Integer> generateCoins(AtomicInteger coinsCounter, int coinsNum) {
+        Set<Integer> res = new HashSet<>();
+
+        for (int i = 0; i < coinsNum; i++)
+            res.add(coinsCounter.incrementAndGet());
+
+        return res;
+    }
+
+    /**
+     * State representing transaction between two accounts.
+     */
+    static class TxState {
+        /**
+         * Account states before transaction.
+         */
+        AccountState before1, before2;
+
+        /**
+         * Account states after transaction.
+         */
+        AccountState after1, after2;
+
+        /**
+         * Transferred coins between accounts during this transaction.
+         */
+        Set<Integer> transferredCoins;
+
+        /**
+         * @param before1 Before 1.
+         * @param before2 Before 2.
+         * @param after1 After 1.
+         * @param after2 After 2.
+         * @param transferredCoins Transferred coins.
+         */
+        public TxState(AccountState before1, AccountState before2, AccountState after1, AccountState after2, Set<Integer> transferredCoins) {
+            this.before1 = before1;
+            this.before2 = before2;
+            this.after1 = after1;
+            this.after2 = after2;
+            this.transferredCoins = transferredCoins;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return "TxState{" +
+                "before1=" + before1 +
+                ", before2=" + before2 +
+                ", transferredCoins=" + transferredCoins +
+                ", after1=" + after1 +
+                ", after2=" + after2 +
+                '}';
+        }
+    }
+
+    /**
+     *
+     */
+    private class TransferAmountTxThread extends Thread {
+        /** */
+        private CountDownLatch firstTransactionLatch;
+        /** */
+        private IgniteEx ignite;
+        /** */
+        private String cacheName;
+        /** */
+        private int txIndex;
+        /** */
+        private Random random = new Random();
+
+        /**
+         * @param ignite Ignite.
+         */
+        private TransferAmountTxThread(CountDownLatch firstTransactionLatch, final IgniteEx ignite, String cacheName, int txIndex) {
+            this.firstTransactionLatch = firstTransactionLatch;
+            this.ignite = ignite;
+            this.cacheName = cacheName;
+            this.txIndex = txIndex;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void run() {
+            for (int i = 0; i < transactionsCount(); i++) {
+                try {
+                    updateInTransaction(ignite.cache(cacheName));
+                }
+                finally {
+                    if (i == 0)
+                        firstTransactionLatch.countDown();
+                }
+            }
+        }
+
+        /**
+         * @throws IgniteException if fails
+         */
+        @SuppressWarnings("unchecked")
+        private void updateInTransaction(IgniteCache<Integer, AccountState> cache) throws IgniteException {
+            int accIdFrom;
+            int accIdTo;
+
+            for (;;) {
+                accIdFrom = random.nextInt(accountsCount());
+                accIdTo = random.nextInt(accountsCount());
+
+                if (accIdFrom == accIdTo)
+                    continue;
+
+                ClusterNode primaryForAccFrom = ignite.cachex(cacheName).affinity().mapKeyToNode(accIdFrom);
+                ClusterNode primaryForAccTo = ignite.cachex(cacheName).affinity().mapKeyToNode(accIdTo);
+
+                // Allows only transaction between accounts that primary on the same node if corresponding flag is enabled.
+                if (!crossNodeTransactions() && !primaryForAccFrom.id().equals(primaryForAccTo.id()))
+                    continue;
+
+                break;
+            }
+
+            AccountState acctFrom;
+            AccountState acctTo;
+
+            try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+                acctFrom = cache.get(accIdFrom);
+                acctTo = cache.get(accIdTo);
+
+                Set<Integer> coinsToTransfer = acctFrom.coinsToTransfer(random);
+
+                AccountState nextFrom = acctFrom.removeCoins(tx.xid(), coinsToTransfer);
+                AccountState nextTo = acctTo.addCoins(tx.xid(), coinsToTransfer);
+
+                cache.put(accIdFrom, nextFrom);
+                cache.put(accIdTo, nextTo);
+
+                tx.commit();
+
+                completedTxs[txIndex].put(tx.xid(), new TxState(acctFrom, acctTo, nextFrom, nextTo, coinsToTransfer));
+            }
+        }
+
+        /**
+         * @param curr current
+         * @return random value
+         */
+        private long getNextAccountId(long curr) {
+            long randomVal;
+
+            do {
+                randomVal = random.nextInt(accountsCount());
+            }
+            while (curr == randomVal);
+
+            return randomVal;
+        }
+    }
+
+    /**
+     * Interface to implement custom failover scenario during transactional amount transfer.
+     */
+    public interface FailoverScenario {
+        /**
+         * Callback before nodes have started.
+         */
+        public default void beforeNodesStarted() throws Exception { }
+
+        /**
+         * Callback when first transaction has finished.
+         */
+        public default void afterFirstTransaction() throws Exception { }
+
+        /**
+         * Callback when all transactions have finished.
+         */
+        public default void afterTransactionsFinished() throws Exception { }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/dde936ac/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TransactionIntegrityWithPrimaryIndexCorruptionTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TransactionIntegrityWithPrimaryIndexCorruptionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TransactionIntegrityWithPrimaryIndexCorruptionTest.java
new file mode 100644
index 0000000..3260607
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TransactionIntegrityWithPrimaryIndexCorruptionTest.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.transactions;
+
+import java.util.function.BiFunction;
+import java.util.function.Supplier;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteIllegalStateException;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.util.PageHandler;
+import org.apache.ignite.internal.processors.cache.tree.SearchRow;
+import org.apache.ignite.testframework.GridTestUtils;
+
+/**
+ * Test cases that check transaction data integrity after transaction commit failed.
+ */
+public class TransactionIntegrityWithPrimaryIndexCorruptionTest extends AbstractTransactionIntergrityTest {
+    /** Corruption enabled flag. */
+    private static volatile boolean corruptionEnabled;
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        corruptionEnabled = false;
+
+        super.afterTest();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected long getTestTimeout() {
+        return 60 * 1000L;
+    }
+
+    /**
+     * Throws a test {@link AssertionError} during tx commit from {@link BPlusTree} and checks after that data is consistent.
+     */
+    public void testPrimaryIndexCorruptionDuringCommitOnPrimaryNode1() throws Exception {
+        doTestTransferAmount(new IndexCorruptionFailoverScenario(
+            true,
+            (hnd, tree) -> hnd instanceof BPlusTree.Search,
+            failoverPredicate(true, () -> new AssertionError("Test")))
+        );
+    }
+
+    /**
+     * Throws a test {@link RuntimeException} during tx commit from {@link BPlusTree} and checks after that data is consistent.
+     */
+    public void testPrimaryIndexCorruptionDuringCommitOnPrimaryNode2() throws Exception {
+        doTestTransferAmount(new IndexCorruptionFailoverScenario(
+            true,
+            (hnd, tree) -> hnd instanceof BPlusTree.Search,
+            failoverPredicate(true, () -> new RuntimeException("Test")))
+        );
+    }
+
+    /**
+     * Throws a test {@link AssertionError} during tx commit from {@link BPlusTree} and checks after that data is consistent.
+     */
+    public void testPrimaryIndexCorruptionDuringCommitOnBackupNode() throws Exception {
+        doTestTransferAmount(new IndexCorruptionFailoverScenario(
+            true,
+            (hnd, tree) -> hnd instanceof BPlusTree.Search,
+            failoverPredicate(false, () -> new AssertionError("Test")))
+        );
+    }
+
+    /**
+     * Throws a test {@link IgniteCheckedException} during tx commit from {@link BPlusTree} and checks after that data is consistent.
+     */
+    public void testPrimaryIndexCorruptionDuringCommitOnPrimaryNode3() throws Exception {
+        fail("https://issues.apache.org/jira/browse/IGNITE-9082");
+
+        doTestTransferAmount(new IndexCorruptionFailoverScenario(
+            false,
+            (hnd, tree) -> hnd instanceof BPlusTree.Search,
+            failoverPredicate(true, () -> new IgniteCheckedException("Test")))
+        );
+    }
+
+    /**
+     * Creates failover predicate which generates error during transaction commmit.
+     *
+     * @param failOnPrimary If {@code true} index should be failed on transaction primary node.
+     * @param errorSupplier Supplier to create various errors.
+     */
+    private BiFunction<IgniteEx, SearchRow, Throwable> failoverPredicate(
+        boolean failOnPrimary,
+        Supplier<Throwable> errorSupplier
+    ) {
+        return (ignite, row) -> {
+            int cacheId = row.cacheId();
+            int partId = row.key().partition();
+
+            final ClusterNode locNode = ignite.localNode();
+            final AffinityTopologyVersion curTopVer = ignite.context().discovery().topologyVersionEx();
+
+            // Throw exception if current node is primary for given row.
+            return ignite.cachesx(c -> c.context().cacheId() == cacheId)
+                .stream()
+                .filter(c -> c.context().affinity().primaryByPartition(locNode, partId, curTopVer) == failOnPrimary)
+                .map(c -> errorSupplier.get())
+                .findFirst()
+                .orElse(null);
+        };
+    }
+
+    /**
+     * Index corruption failover scenario.
+     */
+    class IndexCorruptionFailoverScenario implements FailoverScenario {
+        /** Failed node index. */
+        static final int failedNodeIdx = 1;
+
+        /** Is node stopping expected after failover. */
+        private final boolean nodeStoppingExpected;
+
+        /** Predicate that will choose an instance of {@link BPlusTree} and page operation
+         * to make further failover in this tree using {@link #failoverPredicate}. */
+        private final BiFunction<PageHandler, BPlusTree, Boolean> treeCorruptionPredicate;
+
+        /** Function that may return error during row insertion into {@link BPlusTree}. */
+        private final BiFunction<IgniteEx, SearchRow, Throwable> failoverPredicate;
+
+        /**
+         * @param nodeStoppingExpected Node stopping expected.
+         * @param treeCorruptionPredicate Tree corruption predicate.
+         * @param failoverPredicate Failover predicate.
+         */
+        IndexCorruptionFailoverScenario(
+            boolean nodeStoppingExpected,
+            BiFunction<PageHandler, BPlusTree, Boolean> treeCorruptionPredicate,
+            BiFunction<IgniteEx, SearchRow, Throwable> failoverPredicate
+        ) {
+            this.nodeStoppingExpected = nodeStoppingExpected;
+            this.treeCorruptionPredicate = treeCorruptionPredicate;
+            this.failoverPredicate = failoverPredicate;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void beforeNodesStarted() {
+            BPlusTree.pageHndWrapper = (tree, hnd) -> {
+                final IgniteEx locIgnite = (IgniteEx) Ignition.localIgnite();
+
+                if (!locIgnite.name().endsWith(String.valueOf(failedNodeIdx)))
+                    return hnd;
+
+                if (treeCorruptionPredicate.apply(hnd, tree)) {
+                    log.info("Created corrupted tree handler for -> " + hnd + " " + tree);
+
+                    PageHandler<Object, BPlusTree.Result> delegate = (PageHandler<Object, BPlusTree.Result>) hnd;
+
+                    return new PageHandler<BPlusTree.Get, BPlusTree.Result>() {
+                        @Override public BPlusTree.Result run(int cacheId, long pageId, long page, long pageAddr, PageIO io, Boolean walPlc, BPlusTree.Get arg, int lvl) throws IgniteCheckedException {
+                            log.info("Invoked " + " " + cacheId + " " + arg.toString() + " for BTree (" + corruptionEnabled + ") -> " + arg.row() + " / " + arg.row().getClass());
+
+                            if (corruptionEnabled && (arg.row() instanceof SearchRow)) {
+                                SearchRow row = (SearchRow) arg.row();
+
+                                // Store cacheId to search row explicitly, as it can be zero if there is one cache in a group.
+                                Throwable res = failoverPredicate.apply(locIgnite, new SearchRow(cacheId, row.key()));
+
+                                if (res != null) {
+                                    if (res instanceof Error)
+                                        throw (Error) res;
+                                    else if (res instanceof RuntimeException)
+                                        throw (RuntimeException) res;
+                                    else if (res instanceof IgniteCheckedException)
+                                        throw (IgniteCheckedException) res;
+                                }
+                            }
+
+                            return delegate.run(cacheId, pageId, page, pageAddr, io, walPlc, arg, lvl);
+                        }
+
+                        @Override public boolean releaseAfterWrite(int cacheId, long pageId, long page, long pageAddr, BPlusTree.Get g, int lvl) {
+                            return g.canRelease(pageId, lvl);
+                        }
+                    };
+                }
+
+                return hnd;
+            };
+        }
+
+        /** {@inheritDoc} */
+        @Override public void afterFirstTransaction() {
+            // Enable BPlus tree corruption after first transactions have finished.
+            corruptionEnabled = true;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void afterTransactionsFinished() throws Exception {
+            // Disable index corruption.
+            BPlusTree.pageHndWrapper = (tree, hnd) -> hnd;
+
+            if (nodeStoppingExpected) {
+                // Wait until node with corrupted index will left cluster.
+                GridTestUtils.waitForCondition(() -> {
+                    try {
+                        grid(failedNodeIdx);
+                    }
+                    catch (IgniteIllegalStateException e) {
+                        return true;
+                    }
+
+                    return false;
+                }, getTestTimeout());
+
+                // Failed node should be stopped.
+                GridTestUtils.assertThrows(log, () -> grid(failedNodeIdx), IgniteIllegalStateException.class, "");
+
+                // Re-start failed node.
+                startGrid(failedNodeIdx);
+
+                awaitPartitionMapExchange();
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/dde936ac/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TransactionIntegrityWithSystemWorkerDeathTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TransactionIntegrityWithSystemWorkerDeathTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TransactionIntegrityWithSystemWorkerDeathTest.java
new file mode 100644
index 0000000..25aae4b
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TransactionIntegrityWithSystemWorkerDeathTest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.transactions;
+
+import java.lang.management.ManagementFactory;
+import javax.management.MBeanServer;
+import javax.management.MBeanServerInvocationHandler;
+import javax.management.ObjectName;
+import org.apache.ignite.IgniteIllegalStateException;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.internal.worker.WorkersControlMXBeanImpl;
+import org.apache.ignite.mxbean.WorkersControlMXBean;
+import org.apache.ignite.testframework.GridTestUtils;
+
+/**
+ *
+ */
+public class TransactionIntegrityWithSystemWorkerDeathTest extends AbstractTransactionIntergrityTest {
+    /** {@inheritDoc}. */
+    @Override protected long getTestTimeout() {
+        return 60 * 1000L;
+    }
+
+    /** {@inheritDoc}. */
+    @Override protected boolean persistent() {
+        return false;
+    }
+
+    /**
+     *
+     */
+    public void testFailoverWithDiscoWorkerTermination() throws Exception {
+        doTestTransferAmount(new FailoverScenario() {
+            static final int failedNodeIdx = 1;
+
+            /** {@inheritDoc}. */
+            @Override public void afterFirstTransaction() throws Exception {
+                // Terminate disco-event-worker thread on one node.
+                WorkersControlMXBean bean = workersMXBean(failedNodeIdx);
+
+                bean.terminateWorker(
+                    bean.getWorkerNames().stream()
+                        .filter(name -> name.startsWith("disco-event-worker"))
+                        .findFirst()
+                        .orElse(null)
+                );
+            }
+
+            /** {@inheritDoc}. */
+            @Override public void afterTransactionsFinished() throws Exception {
+                // Wait until node with death worker will left cluster.
+                GridTestUtils.waitForCondition(() -> {
+                    try {
+                        grid(failedNodeIdx);
+                    }
+                    catch (IgniteIllegalStateException e) {
+                        return true;
+                    }
+
+                    return false;
+                }, getTestTimeout());
+
+                // Failed node should be stopped.
+                GridTestUtils.assertThrows(log, () -> grid(failedNodeIdx), IgniteIllegalStateException.class, "");
+
+                // Re-start failed node.
+                startGrid(failedNodeIdx);
+
+                awaitPartitionMapExchange();
+            }
+        });
+    }
+
+    /**
+     * Configure workers mx bean.
+     */
+    private WorkersControlMXBean workersMXBean(int igniteInt) throws Exception {
+        ObjectName mbeanName = U.makeMBeanName(
+            getTestIgniteInstanceName(igniteInt),
+            "Kernal",
+            WorkersControlMXBeanImpl.class.getSimpleName()
+        );
+
+        MBeanServer mbeanSrv = ManagementFactory.getPlatformMBeanServer();
+
+        if (!mbeanSrv.isRegistered(mbeanName))
+            fail("MBean is not registered: " + mbeanName.getCanonicalName());
+
+        return MBeanServerInvocationHandler.newProxyInstance(mbeanSrv, mbeanName, WorkersControlMXBean.class, true);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/dde936ac/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
index 74e23ed..ac2bed3 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.testsuites;
 
+import java.util.Set;
 import junit.framework.TestSuite;
 import org.apache.ignite.GridSuppressedExceptionSelfTest;
 import org.apache.ignite.failure.FailureHandlerTriggeredTest;
@@ -55,6 +56,7 @@ import org.apache.ignite.internal.processors.cache.IgniteMarshallerCacheFSRestor
 import org.apache.ignite.internal.processors.cache.SetTxTimeoutOnPartitionMapExchangeTest;
 import org.apache.ignite.internal.processors.cache.distributed.IgniteRejectConnectOnNodeStopTest;
 import org.apache.ignite.internal.processors.cache.transactions.AtomicOperationsInTxTest;
+import org.apache.ignite.internal.processors.cache.transactions.TransactionIntegrityWithSystemWorkerDeathTest;
 import org.apache.ignite.internal.processors.closure.GridClosureProcessorRemoteTest;
 import org.apache.ignite.internal.processors.closure.GridClosureProcessorSelfTest;
 import org.apache.ignite.internal.processors.closure.GridClosureSerializationTest;
@@ -90,8 +92,6 @@ import org.apache.ignite.testframework.test.VariationsIteratorTest;
 import org.apache.ignite.util.AttributeNodeFilterSelfTest;
 import org.jetbrains.annotations.Nullable;
 
-import java.util.Set;
-
 /**
  * Basic test suite.
  */
@@ -211,6 +211,7 @@ public class IgniteBasicTestSuite extends TestSuite {
         suite.addTestSuite(StopNodeFailureHandlerTest.class);
         suite.addTestSuite(StopNodeOrHaltFailureHandlerTest.class);
         suite.addTestSuite(OomFailureHandlerTest.class);
+        suite.addTestSuite(TransactionIntegrityWithSystemWorkerDeathTest.class);
 
         suite.addTestSuite(AtomicOperationsInTxTest.class);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/dde936ac/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicWithPersistenceTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicWithPersistenceTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicWithPersistenceTestSuite.java
index 5f1d18d..3171754 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicWithPersistenceTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicWithPersistenceTestSuite.java
@@ -17,8 +17,8 @@
 
 package org.apache.ignite.testsuites;
 
+import java.util.Set;
 import junit.framework.TestSuite;
-import org.apache.ignite.failure.AccountTransferTransactionTest;
 import org.apache.ignite.failure.IoomFailureHandlerTest;
 import org.apache.ignite.failure.SystemWorkersTerminationTest;
 import org.apache.ignite.internal.ClusterBaselineNodesMetricsSelfTest;
@@ -29,8 +29,6 @@ import org.apache.ignite.util.GridCommandHandlerTest;
 import org.apache.ignite.util.GridInternalTaskUnusedWalSegmentsTest;
 import org.jetbrains.annotations.Nullable;
 
-import java.util.Set;
-
 /**
  * Basic test suite.
  */
@@ -52,7 +50,6 @@ public class IgniteBasicWithPersistenceTestSuite extends TestSuite {
         TestSuite suite = new TestSuite("Ignite Basic With Persistence Test Suite");
 
         suite.addTestSuite(IoomFailureHandlerTest.class);
-        suite.addTestSuite(AccountTransferTransactionTest.class);
         suite.addTestSuite(ClusterBaselineNodesMetricsSelfTest.class);
         suite.addTestSuite(ServiceDeploymentOnActivationTest.class);
         suite.addTestSuite(ServiceDeploymentOutsideBaselineTest.class);

http://git-wip-us.apache.org/repos/asf/ignite/blob/dde936ac/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
index f9e6b81..1269d0d 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
@@ -26,13 +26,11 @@ import org.apache.ignite.internal.processors.cache.ReplicatedAtomicCacheGetsDist
 import org.apache.ignite.internal.processors.cache.ReplicatedTransactionalOptimisticCacheGetsDistributionTest;
 import org.apache.ignite.internal.processors.cache.ReplicatedTransactionalPessimisticCacheGetsDistributionTest;
 import org.apache.ignite.internal.processors.cache.datastructures.IgniteExchangeLatchManagerCoordinatorFailTest;
-import org.apache.ignite.internal.processors.cache.distributed.CacheClientsConcurrentStartTest;
 import org.apache.ignite.internal.processors.cache.distributed.CacheExchangeMergeTest;
 import org.apache.ignite.internal.processors.cache.distributed.CachePartitionStateTest;
 import org.apache.ignite.internal.processors.cache.distributed.CacheTryLockMultithreadedTest;
 import org.apache.ignite.internal.processors.cache.distributed.GridCachePartitionEvictionDuringReadThroughSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.IgniteCache150ClientsTest;
-import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheClientMultiNodeUpdateTopologyLockTest;
 import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheThreadLocalTxTest;
 import org.apache.ignite.internal.processors.cache.distributed.IgniteOptimisticTxSuspendResumeMultiServerTest;
 import org.apache.ignite.internal.processors.cache.distributed.IgniteOptimisticTxSuspendResumeTest;
@@ -45,11 +43,11 @@ import org.apache.ignite.internal.processors.cache.transactions.TxOptimisticOnPa
 import org.apache.ignite.internal.processors.cache.transactions.TxOptimisticPrepareOnUnstableTopologyTest;
 import org.apache.ignite.internal.processors.cache.transactions.TxRollbackAsyncNearCacheTest;
 import org.apache.ignite.internal.processors.cache.transactions.TxRollbackAsyncTest;
+import org.apache.ignite.internal.processors.cache.transactions.TxRollbackOnIncorrectParamsTest;
 import org.apache.ignite.internal.processors.cache.transactions.TxRollbackOnTimeoutNearCacheTest;
 import org.apache.ignite.internal.processors.cache.transactions.TxRollbackOnTimeoutNoDeadlockDetectionTest;
 import org.apache.ignite.internal.processors.cache.transactions.TxRollbackOnTimeoutTest;
 import org.apache.ignite.internal.processors.cache.transactions.TxRollbackOnTopologyChangeTest;
-import org.apache.ignite.internal.processors.cache.transactions.TxRollbackOnIncorrectParamsTest;
 import org.apache.ignite.internal.processors.cache.transactions.TxStateChangeEventTest;
 import org.apache.ignite.testframework.junits.GridAbstractTest;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/dde936ac/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite7.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite7.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite7.java
index 89e2ffc..2bd0861 100755
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite7.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite7.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.testsuites;
 
+import java.util.Set;
 import junit.framework.TestSuite;
 import org.apache.ignite.internal.processors.authentication.Authentication1kUsersNodeRestartTest;
 import org.apache.ignite.internal.processors.authentication.AuthenticationConfigurationClusterTest;
@@ -38,11 +39,10 @@ import org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridC
 import org.apache.ignite.internal.processors.cache.eviction.paged.PageEvictionMultinodeMixedRegionsTest;
 import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsCacheAssignmentNodeRestartsTest;
 import org.apache.ignite.internal.processors.cache.persistence.db.CheckpointBufferDeadlockTest;
+import org.apache.ignite.internal.processors.cache.transactions.TransactionIntegrityWithPrimaryIndexCorruptionTest;
 import org.apache.ignite.internal.processors.cache.transactions.TxRollbackAsyncWithPersistenceTest;
 import org.apache.ignite.internal.processors.cache.transactions.TxWithSmallTimeoutAndContentionOneKeyTest;
 
-import java.util.Set;
-
 /**
  * Test suite.
  */
@@ -95,6 +95,8 @@ public class IgniteCacheTestSuite7 extends TestSuite {
 
         suite.addTestSuite(CacheRentingStateRepairTest.class);
 
+        suite.addTestSuite(TransactionIntegrityWithPrimaryIndexCorruptionTest.class);
+
         return suite;
     }
 }