You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ir...@apache.org on 2018/10/18 11:57:04 UTC

[1/3] ignite git commit: IGNITE-9082 Throwing checked exception during tx commit without node stopping leads to data corruption - Fixes #4809.

Repository: ignite
Updated Branches:
  refs/heads/master 829dc1f24 -> 5eb871e19


http://git-wip-us.apache.org/repos/asf/ignite/blob/5eb871e1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDataConsistencyOnCommitFailureTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDataConsistencyOnCommitFailureTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDataConsistencyOnCommitFailureTest.java
new file mode 100644
index 0000000..881c680
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDataConsistencyOnCommitFailureTest.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.transactions;
+
+import java.util.UUID;
+import java.util.function.Supplier;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteTransactions;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.testsuites.IgniteIgnore;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+import org.jetbrains.annotations.Nullable;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+/**
+ * Tests data consistency if transaction is failed due to heuristic exception on originating node.
+ */
+public class TxDataConsistencyOnCommitFailureTest extends GridCommonAbstractTest {
+    /** */
+    public static final int KEY = 0;
+
+    /** */
+    public static final String CLIENT = "client";
+
+    /** */
+    private int nodesCnt;
+
+    /** */
+    private int backups;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        cfg.setClientMode(igniteInstanceName.startsWith(CLIENT));
+
+        cfg.setCacheConfiguration(new CacheConfiguration(DEFAULT_CACHE_NAME).
+            setCacheMode(CacheMode.PARTITIONED).
+            setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL).
+            setBackups(backups).
+            setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC));
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+    }
+
+    /** */
+    public void testCommitErrorOnNearNode2PC() throws Exception {
+        nodesCnt = 3;
+
+        backups = 2;
+
+        doTestCommitError(() -> {
+            try {
+                return startGrid("client");
+            }
+            catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        });
+    }
+
+    /** */
+    public void testCommitErrorOnNearNode1PC() throws Exception {
+        nodesCnt = 2;
+
+        backups = 1;
+
+        doTestCommitError(() -> {
+            try {
+                return startGrid("client");
+            }
+            catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        });
+    }
+
+    /** */
+    @IgniteIgnore(value = "https://issues.apache.org/jira/browse/IGNITE-9806", forceFailure = false)
+    public void testCommitErrorOnColocatedNode2PC() throws Exception {
+        nodesCnt = 3;
+
+        backups = 2;
+
+        doTestCommitError(() -> primaryNode(KEY, DEFAULT_CACHE_NAME));
+    }
+
+    /**
+     * @param factory Factory.
+     */
+    private void doTestCommitError(Supplier<Ignite> factory) throws Exception {
+        Ignite crd = startGridsMultiThreaded(nodesCnt);
+
+        crd.cache(DEFAULT_CACHE_NAME).put(KEY, KEY);
+
+        Ignite ignite = factory.get();
+
+        if (ignite == null)
+            ignite = startGrid("client");
+
+        assertNotNull(ignite.cache(DEFAULT_CACHE_NAME));
+
+        injectMockedTxManager(ignite);
+
+        checkKey();
+
+        IgniteTransactions transactions = ignite.transactions();
+
+        try(Transaction tx = transactions.txStart(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ, 0, 1)) {
+            assertNotNull(transactions.tx());
+
+            ignite.cache(DEFAULT_CACHE_NAME).put(KEY, KEY + 1);
+
+            tx.commit();
+
+            fail();
+        }
+        catch (Exception t) {
+            // No-op.
+        }
+
+        checkKey();
+
+        checkFutures();
+    }
+
+    /**
+     * @param ignite Ignite.
+     */
+    private void injectMockedTxManager(Ignite ignite) {
+        IgniteEx igniteEx = (IgniteEx)ignite;
+
+        GridCacheSharedContext<Object, Object> ctx = igniteEx.context().cache().context();
+
+        IgniteTxManager tm = ctx.tm();
+
+        IgniteTxManager mockTm = Mockito.spy(tm);
+
+        MockGridNearTxLocal locTx = new MockGridNearTxLocal(ctx, false, false, false, GridIoPolicy.SYSTEM_POOL,
+            TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ, 0, true, null, 1, null, 0, null);
+
+        Mockito.doAnswer(new Answer<GridNearTxLocal>() {
+            @Override public GridNearTxLocal answer(InvocationOnMock invocation) throws Throwable {
+                mockTm.onCreated(null, locTx);
+
+                return locTx;
+            }
+        }).when(mockTm).
+            newTx(locTx.implicit(), locTx.implicitSingle(), null, locTx.concurrency(),
+                locTx.isolation(), locTx.timeout(), locTx.storeEnabled(), null, locTx.size(), locTx.label());
+
+        ctx.setTxManager(mockTm);
+    }
+
+    /** */
+    private void checkKey() {
+        for (Ignite ignite : G.allGrids()) {
+            if (!ignite.configuration().isClientMode())
+                assertNotNull(ignite.cache(DEFAULT_CACHE_NAME).localPeek(KEY));
+        }
+    }
+
+    /** */
+    private static class MockGridNearTxLocal extends GridNearTxLocal {
+        /** Empty constructor. */
+        public MockGridNearTxLocal() {
+        }
+
+        /**
+         * @param ctx Context.
+         * @param implicit Implicit.
+         * @param implicitSingle Implicit single.
+         * @param sys System.
+         * @param plc Policy.
+         * @param concurrency Concurrency.
+         * @param isolation Isolation.
+         * @param timeout Timeout.
+         * @param storeEnabled Store enabled.
+         * @param mvccOp Mvcc op.
+         * @param txSize Tx size.
+         * @param subjId Subj id.
+         * @param taskNameHash Task name hash.
+         * @param lb Label.
+         */
+        public MockGridNearTxLocal(GridCacheSharedContext ctx, boolean implicit, boolean implicitSingle, boolean sys,
+            byte plc, TransactionConcurrency concurrency, TransactionIsolation isolation, long timeout,
+            boolean storeEnabled, Boolean mvccOp, int txSize, @Nullable UUID subjId, int taskNameHash, @Nullable String lb) {
+            super(ctx, implicit, implicitSingle, sys, plc, concurrency, isolation, timeout, storeEnabled, mvccOp,
+                txSize, subjId, taskNameHash, lb);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void userCommit() throws IgniteCheckedException {
+            throw new IgniteCheckedException("Force failure");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5eb871e1/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
index 7e98ec7..7091a09 100755
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
@@ -1996,17 +1996,26 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
 
             final Collection<GridCacheFuture<?>> futs = ig.context().cache().context().mvcc().activeFutures();
 
-            for (GridCacheFuture<?> fut : futs)
-                log.info("Waiting for future: " + fut);
+            boolean hasFutures = false;
 
-            assertTrue("Expecting no active futures: node=" + ig.localNode().id(), futs.isEmpty());
+            for (GridCacheFuture<?> fut : futs) {
+                if (!fut.isDone()) {
+                    log.error("Expecting no active future [node=" + ig.localNode().id() + ", fut=" + fut + ']');
+
+                    hasFutures = true;
+                }
+            }
+
+            if (hasFutures)
+                fail("Some mvcc futures are not finished");
 
             Collection<IgniteInternalTx> txs = ig.context().cache().context().tm().activeTransactions();
 
             for (IgniteInternalTx tx : txs)
-                log.info("Waiting for tx: " + tx);
+                log.error("Expecting no active transaction [node=" + ig.localNode().id() + ", tx=" + tx + ']');
 
-            assertTrue("Expecting no active transactions: node=" + ig.localNode().id(), txs.isEmpty());
+            if (!txs.isEmpty())
+                fail("Some transaction are not finished");
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/5eb871e1/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite9.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite9.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite9.java
index 386b17b..7dba461 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite9.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite9.java
@@ -25,6 +25,7 @@ import org.apache.ignite.internal.processors.cache.distributed.CacheAtomicPrimar
 import org.apache.ignite.internal.processors.cache.distributed.IgniteCachePrimarySyncTest;
 import org.apache.ignite.internal.processors.cache.distributed.IgniteTxCachePrimarySyncTest;
 import org.apache.ignite.internal.processors.cache.distributed.IgniteTxCacheWriteSynchronizationModesMultithreadedTest;
+import org.apache.ignite.internal.processors.cache.transactions.TxDataConsistencyOnCommitFailureTest;
 import org.apache.ignite.testframework.junits.GridAbstractTest;
 
 /**
@@ -49,6 +50,8 @@ public class IgniteCacheTestSuite9 extends TestSuite {
 
         suite.addTestSuite(CacheAtomicPrimarySyncBackPressureTest.class);
 
+        suite.addTestSuite(TxDataConsistencyOnCommitFailureTest.class);
+
         return suite;
     }
 }


[3/3] ignite git commit: IGNITE-9082 Throwing checked exception during tx commit without node stopping leads to data corruption - Fixes #4809.

Posted by ir...@apache.org.
IGNITE-9082 Throwing checked exception during tx commit without node stopping leads to data corruption - Fixes #4809.

Signed-off-by: Ivan Rakov <ir...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5eb871e1
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5eb871e1
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5eb871e1

Branch: refs/heads/master
Commit: 5eb871e191a14fc21f6e2c62bdfa742e27c14695
Parents: 829dc1f
Author: Aleksei Scherbakov <al...@gmail.com>
Authored: Thu Oct 18 14:52:34 2018 +0300
Committer: Ivan Rakov <ir...@apache.org>
Committed: Thu Oct 18 14:52:34 2018 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheMapEntry.java     |   5 -
 .../cache/GridCacheSharedContext.java           |   9 +
 .../GridDistributedTxRemoteAdapter.java         | 535 +++++++++----------
 .../distributed/dht/GridDhtTxFinishFuture.java  |  11 +-
 .../cache/distributed/dht/GridDhtTxLocal.java   |  10 +-
 .../distributed/dht/GridDhtTxLocalAdapter.java  |   3 +
 .../distributed/dht/GridDhtTxPrepareFuture.java |  59 +-
 .../near/GridNearTxFinishFuture.java            |  38 +-
 .../cache/distributed/near/GridNearTxLocal.java |   2 +-
 .../cache/transactions/IgniteTxAdapter.java     |  31 ++
 .../cache/transactions/IgniteTxHandler.java     | 119 ++---
 .../transactions/IgniteTxLocalAdapter.java      | 519 +++++++++---------
 .../processors/failure/FailureProcessor.java    |   8 +
 .../org/apache/ignite/spi/IgniteSpiAdapter.java |   1 -
 .../cache/GridCacheAbstractSelfTest.java        |   9 +-
 .../cache/query/IndexingSpiQuerySelfTest.java   |  66 +--
 .../cache/query/IndexingSpiQueryTxSelfTest.java |  74 +--
 .../AbstractTransactionIntergrityTest.java      | 111 ++--
 ...IntegrityWithPrimaryIndexCorruptionTest.java | 268 ++++++----
 ...ctionIntegrityWithSystemWorkerDeathTest.java |   6 +-
 .../TxDataConsistencyOnCommitFailureTest.java   | 234 ++++++++
 .../junits/common/GridCommonAbstractTest.java   |  19 +-
 .../testsuites/IgniteCacheTestSuite9.java       |   3 +
 23 files changed, 1220 insertions(+), 920 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/5eb871e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 9bb8aec..ab5b725 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -2906,11 +2906,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             ver = newVer;
             flags &= ~IS_EVICT_DISABLED;
 
-            if (cctx.mvccEnabled())
-                cctx.offheap().mvccRemoveAll(this);
-            else
-                removeValue();
-
             onInvalidate();
 
             return obsoleteVersionExtras() != null;

http://git-wip-us.apache.org/repos/asf/ignite/blob/5eb871e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
index 52d8525..b5cd82b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
@@ -1141,4 +1141,13 @@ public class GridCacheSharedContext<K, V> {
     public void readOnlyMode(boolean readOnlyMode) {
         this.readOnlyMode = readOnlyMode;
     }
+
+    /**
+     * For test purposes.
+     * @param txMgr Tx manager.
+     */
+    public void setTxManager(IgniteTxManager txMgr) {
+        this.txMgr = txMgr;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/5eb871e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index 7313197..4db4685 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@ -33,7 +33,6 @@ import org.apache.ignite.IgniteException;
 import org.apache.ignite.failure.FailureContext;
 import org.apache.ignite.failure.FailureType;
 import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.InvalidEnvironmentException;
 import org.apache.ignite.internal.NodeStoppingException;
 import org.apache.ignite.internal.pagemem.wal.WALPointer;
 import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
@@ -55,7 +54,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.PartitionUpda
 import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
 import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry;
-import org.apache.ignite.internal.processors.cache.persistence.StorageException;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxAdapter;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
@@ -512,312 +510,267 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
 
                         batchStoreCommit(writeMap().values());
 
-                        try {
-                            // Node that for near transactions we grab all entries.
-                            for (IgniteTxEntry txEntry : entries) {
-                                GridCacheContext cacheCtx = txEntry.context();
+                        // Node that for near transactions we grab all entries.
+                        for (IgniteTxEntry txEntry : entries) {
+                            GridCacheContext cacheCtx = txEntry.context();
 
-                                boolean replicate = cacheCtx.isDrEnabled();
+                            boolean replicate = cacheCtx.isDrEnabled();
 
+                            while (true) {
                                 try {
-                                    while (true) {
-                                        try {
-                                            GridCacheEntryEx cached = txEntry.cached();
+                                    GridCacheEntryEx cached = txEntry.cached();
 
-                                            if (cached == null)
-                                                txEntry.cached(cached = cacheCtx.cache().entryEx(txEntry.key(), topologyVersion()));
+                                    if (cached == null)
+                                        txEntry.cached(cached = cacheCtx.cache().entryEx(txEntry.key(), topologyVersion()));
 
-                                            if (near() && cacheCtx.dr().receiveEnabled()) {
-                                                cached.markObsolete(xidVer);
+                                    if (near() && cacheCtx.dr().receiveEnabled()) {
+                                        cached.markObsolete(xidVer);
 
-                                                break;
-                                            }
+                                        break;
+                                    }
 
-                                            GridNearCacheEntry nearCached = null;
+                                    GridNearCacheEntry nearCached = null;
 
-                                            if (updateNearCache(cacheCtx, txEntry.key(), topVer))
-                                                nearCached = cacheCtx.dht().near().peekExx(txEntry.key());
+                                    if (updateNearCache(cacheCtx, txEntry.key(), topVer))
+                                        nearCached = cacheCtx.dht().near().peekExx(txEntry.key());
 
-                                            if (!F.isEmpty(txEntry.entryProcessors()))
-                                                txEntry.cached().unswap(false);
+                                    if (!F.isEmpty(txEntry.entryProcessors()))
+                                        txEntry.cached().unswap(false);
 
-                                            IgniteBiTuple<GridCacheOperation, CacheObject> res =
-                                                applyTransformClosures(txEntry, false, ret);
+                                    IgniteBiTuple<GridCacheOperation, CacheObject> res =
+                                        applyTransformClosures(txEntry, false, ret);
 
-                                            GridCacheOperation op = res.get1();
-                                            CacheObject val = res.get2();
+                                    GridCacheOperation op = res.get1();
+                                    CacheObject val = res.get2();
 
-                                            GridCacheVersion explicitVer = txEntry.conflictVersion();
+                                    GridCacheVersion explicitVer = txEntry.conflictVersion();
 
-                                            if (explicitVer == null)
-                                                explicitVer = writeVersion();
+                                    if (explicitVer == null)
+                                        explicitVer = writeVersion();
 
-                                            if (txEntry.ttl() == CU.TTL_ZERO)
-                                                op = DELETE;
+                                    if (txEntry.ttl() == CU.TTL_ZERO)
+                                        op = DELETE;
 
-                                            boolean conflictNeedResolve = cacheCtx.conflictNeedResolve();
+                                    boolean conflictNeedResolve = cacheCtx.conflictNeedResolve();
 
-                                            GridCacheVersionConflictContext conflictCtx = null;
+                                    GridCacheVersionConflictContext conflictCtx = null;
 
-                                            if (conflictNeedResolve) {
-                                                IgniteBiTuple<GridCacheOperation, GridCacheVersionConflictContext>
-                                                    drRes = conflictResolve(op, txEntry, val, explicitVer, cached);
+                                    if (conflictNeedResolve) {
+                                        IgniteBiTuple<GridCacheOperation, GridCacheVersionConflictContext>
+                                            drRes = conflictResolve(op, txEntry, val, explicitVer, cached);
 
-                                                assert drRes != null;
+                                        assert drRes != null;
 
-                                                conflictCtx = drRes.get2();
+                                        conflictCtx = drRes.get2();
 
-                                                if (conflictCtx.isUseOld())
-                                                    op = NOOP;
-                                                else if (conflictCtx.isUseNew()) {
-                                                    txEntry.ttl(conflictCtx.ttl());
-                                                    txEntry.conflictExpireTime(conflictCtx.expireTime());
-                                                }
-                                                else if (conflictCtx.isMerge()) {
-                                                    op = drRes.get1();
-                                                    val = txEntry.context().toCacheObject(conflictCtx.mergeValue());
-                                                    explicitVer = writeVersion();
+                                        if (conflictCtx.isUseOld())
+                                            op = NOOP;
+                                        else if (conflictCtx.isUseNew()) {
+                                            txEntry.ttl(conflictCtx.ttl());
+                                            txEntry.conflictExpireTime(conflictCtx.expireTime());
+                                        }
+                                        else if (conflictCtx.isMerge()) {
+                                            op = drRes.get1();
+                                            val = txEntry.context().toCacheObject(conflictCtx.mergeValue());
+                                            explicitVer = writeVersion();
 
-                                                    txEntry.ttl(conflictCtx.ttl());
-                                                    txEntry.conflictExpireTime(conflictCtx.expireTime());
-                                                }
-                                            }
-                                            else
-                                                // Nullify explicit version so that innerSet/innerRemove will work as usual.
-                                                explicitVer = null;
-
-                                            GridCacheVersion dhtVer = cached.isNear() ? writeVersion() : null;
-
-                                            if (!near() && cacheCtx.group().persistenceEnabled() && cacheCtx.group().walEnabled() &&
-                                                op != NOOP && op != RELOAD && (op != READ || cctx.snapshot().needTxReadLogging())) {
-                                                if (dataEntries == null)
-                                                    dataEntries = new ArrayList<>(entries.size());
-
-                                                dataEntries.add(
-                                                        new T2<>(
-                                                                new DataEntry(
-                                                                        cacheCtx.cacheId(),
-                                                                        txEntry.key(),
-                                                                        val,
-                                                                        op,
-                                                                        nearXidVersion(),
-                                                                        writeVersion(),
-                                                                        0,
-                                                                        txEntry.key().partition(),
-                                                                        txEntry.updateCounter()
-                                                                ),
-                                                                txEntry
-                                                        )
-                                                );
-                                            }
+                                            txEntry.ttl(conflictCtx.ttl());
+                                            txEntry.conflictExpireTime(conflictCtx.expireTime());
+                                        }
+                                    }
+                                    else
+                                        // Nullify explicit version so that innerSet/innerRemove will work as usual.
+                                        explicitVer = null;
+
+                                    GridCacheVersion dhtVer = cached.isNear() ? writeVersion() : null;
+
+                                    if (!near() && cacheCtx.group().persistenceEnabled() && cacheCtx.group().walEnabled() &&
+                                        op != NOOP && op != RELOAD && (op != READ || cctx.snapshot().needTxReadLogging())) {
+                                        if (dataEntries == null)
+                                            dataEntries = new ArrayList<>(entries.size());
+
+                                        dataEntries.add(
+                                            new T2<>(
+                                                new DataEntry(
+                                                    cacheCtx.cacheId(),
+                                                    txEntry.key(),
+                                                    val,
+                                                    op,
+                                                    nearXidVersion(),
+                                                    writeVersion(),
+                                                    0,
+                                                    txEntry.key().partition(),
+                                                    txEntry.updateCounter()
+                                                ),
+                                                txEntry
+                                            )
+                                        );
+                                    }
 
-                                            if (op == CREATE || op == UPDATE) {
-                                                // Invalidate only for near nodes (backups cannot be invalidated).
-                                                if (isSystemInvalidate() || (isInvalidate() && cacheCtx.isNear()))
-                                                    cached.innerRemove(this,
-                                                        eventNodeId(),
-                                                        nodeId,
-                                                        false,
-                                                        true,
-                                                        true,
-                                                        txEntry.keepBinary(),
-                                                        txEntry.hasOldValue(),
-                                                        txEntry.oldValue(),
-                                                        topVer,
-                                                        null,
-                                                        replicate ? DR_BACKUP : DR_NONE,
-                                                        near() ? null : explicitVer,
-                                                        CU.subjectId(this, cctx),
-                                                        resolveTaskName(),
-                                                        dhtVer,
-                                                        txEntry.updateCounter(),
-                                                        mvccSnapshot());
-                                                else {
-                                                    assert val != null : txEntry;
-
-                                                    GridCacheUpdateTxResult updRes = cached.innerSet(this,
-                                                        eventNodeId(),
-                                                        nodeId,
-                                                        val,
-                                                        false,
-                                                        false,
-                                                        txEntry.ttl(),
-                                                        true,
-                                                        true,
-                                                        txEntry.keepBinary(),
-                                                        txEntry.hasOldValue(),
-                                                        txEntry.oldValue(),
-                                                        topVer,
-                                                        null,
-                                                        replicate ? DR_BACKUP : DR_NONE,
-                                                        txEntry.conflictExpireTime(),
-                                                        near() ? null : explicitVer,
-                                                        CU.subjectId(this, cctx),
-                                                        resolveTaskName(),
-                                                        dhtVer,
-                                                        txEntry.updateCounter(),
-                                                        mvccSnapshot());
-
-                                                    txEntry.updateCounter(updRes.updateCounter());
-
-                                                    if (updRes.loggedPointer() != null)
-                                                        ptr = updRes.loggedPointer();
-
-                                                    // Keep near entry up to date.
-                                                    if (nearCached != null) {
-                                                        CacheObject val0 = cached.valueBytes();
-
-                                                        nearCached.updateOrEvict(xidVer,
-                                                            val0,
-                                                            cached.expireTime(),
-                                                            cached.ttl(),
-                                                            nodeId,
-                                                            topVer);
-                                                    }
-                                                }
-                                            }
-                                            else if (op == DELETE) {
-                                                GridCacheUpdateTxResult updRes = cached.innerRemove(this,
-                                                    eventNodeId(),
+                                    if (op == CREATE || op == UPDATE) {
+                                        // Invalidate only for near nodes (backups cannot be invalidated).
+                                        if (isSystemInvalidate() || (isInvalidate() && cacheCtx.isNear()))
+                                            cached.innerRemove(this,
+                                                eventNodeId(),
+                                                nodeId,
+                                                false,
+                                                true,
+                                                true,
+                                                txEntry.keepBinary(),
+                                                txEntry.hasOldValue(),
+                                                txEntry.oldValue(),
+                                                topVer,
+                                                null,
+                                                replicate ? DR_BACKUP : DR_NONE,
+                                                near() ? null : explicitVer,
+                                                CU.subjectId(this, cctx),
+                                                resolveTaskName(),
+                                                dhtVer,
+                                                txEntry.updateCounter(),
+                                                mvccSnapshot());
+                                        else {
+                                            assert val != null : txEntry;
+
+                                            GridCacheUpdateTxResult updRes = cached.innerSet(this,
+                                                eventNodeId(),
+                                                nodeId,
+                                                val,
+                                                false,
+                                                false,
+                                                txEntry.ttl(),
+                                                true,
+                                                true,
+                                                txEntry.keepBinary(),
+                                                txEntry.hasOldValue(),
+                                                txEntry.oldValue(),
+                                                topVer,
+                                                null,
+                                                replicate ? DR_BACKUP : DR_NONE,
+                                                txEntry.conflictExpireTime(),
+                                                near() ? null : explicitVer,
+                                                CU.subjectId(this, cctx),
+                                                resolveTaskName(),
+                                                dhtVer,
+                                                txEntry.updateCounter(),
+                                                mvccSnapshot());
+
+                                            txEntry.updateCounter(updRes.updateCounter());
+
+                                            if (updRes.loggedPointer() != null)
+                                                ptr = updRes.loggedPointer();
+
+                                            // Keep near entry up to date.
+                                            if (nearCached != null) {
+                                                CacheObject val0 = cached.valueBytes();
+
+                                                nearCached.updateOrEvict(xidVer,
+                                                    val0,
+                                                    cached.expireTime(),
+                                                    cached.ttl(),
                                                     nodeId,
-                                                    false,
-                                                    true,
-                                                    true,
-                                                    txEntry.keepBinary(),
-                                                    txEntry.hasOldValue(),
-                                                    txEntry.oldValue(),
-                                                    topVer,
-                                                    null,
-                                                    replicate ? DR_BACKUP : DR_NONE,
-                                                    near() ? null : explicitVer,
-                                                    CU.subjectId(this, cctx),
-                                                    resolveTaskName(),
-                                                    dhtVer,
-                                                    txEntry.updateCounter(),
-                                                    mvccSnapshot());
-
-                                                txEntry.updateCounter(updRes.updateCounter());
-
-                                                if (updRes.loggedPointer() != null)
-                                                    ptr = updRes.loggedPointer();
-
-                                                // Keep near entry up to date.
-                                                if (nearCached != null)
-                                                    nearCached.updateOrEvict(xidVer, null, 0, 0, nodeId, topVer);
-                                            }
-                                            else if (op == RELOAD) {
-                                                CacheObject reloaded = cached.innerReload();
-
-                                                if (nearCached != null) {
-                                                    nearCached.innerReload();
-
-                                                    nearCached.updateOrEvict(cached.version(),
-                                                        reloaded,
-                                                        cached.expireTime(),
-                                                        cached.ttl(),
-                                                        nodeId,
-                                                        topVer);
-                                                }
+                                                    topVer);
                                             }
-                                            else if (op == READ) {
-                                                assert near();
-
-                                                if (log.isDebugEnabled())
-                                                    log.debug("Ignoring READ entry when committing: " + txEntry);
-                                            }
-                                            // No-op.
-                                            else {
-                                                if (conflictCtx == null || !conflictCtx.isUseOld()) {
-                                                    if (txEntry.ttl() != CU.TTL_NOT_CHANGED)
-                                                        cached.updateTtl(null, txEntry.ttl());
-
-                                                    if (nearCached != null) {
-                                                        CacheObject val0 = cached.valueBytes();
-
-                                                        nearCached.updateOrEvict(xidVer,
-                                                            val0,
-                                                            cached.expireTime(),
-                                                            cached.ttl(),
-                                                            nodeId,
-                                                            topVer);
-                                                    }
-                                                }
-                                            }
-
-                                            // Assert after setting values as we want to make sure
-                                            // that if we replaced removed entries.
-                                            assert
-                                                txEntry.op() == READ || onePhaseCommit() ||
-                                                    // If candidate is not there, then lock was explicit
-                                                    // and we simply allow the commit to proceed.
-                                                    !cached.hasLockCandidateUnsafe(xidVer) || cached.lockedByUnsafe(xidVer) :
-                                                "Transaction does not own lock for commit [entry=" + cached +
-                                                    ", tx=" + this + ']';
-
-                                            // Break out of while loop.
-                                            break;
-                                        }
-                                        catch (GridCacheEntryRemovedException ignored) {
-                                            if (log.isDebugEnabled())
-                                                log.debug("Attempting to commit a removed entry (will retry): " + txEntry);
-
-                                            // Renew cached entry.
-                                            txEntry.cached(cacheCtx.cache().entryEx(txEntry.key(), topologyVersion()));
                                         }
                                     }
-                                }
-                                catch (Throwable ex) {
-                                    boolean isNodeStopping = X.hasCause(ex, NodeStoppingException.class);
-                                    boolean hasInvalidEnvironmentIssue = X.hasCause(ex, InvalidEnvironmentException.class);
-
-                                    // In case of error, we still make the best effort to commit,
-                                    // as there is no way to rollback at this point.
-                                    err = new IgniteTxHeuristicCheckedException("Commit produced a runtime exception " +
-                                        "(all transaction entries will be invalidated): " + CU.txString(this), ex);
-
-                                    if (isNodeStopping) {
-                                        U.warn(log, "Failed to commit transaction, node is stopping [tx=" + this +
-                                            ", err=" + ex + ']');
-                                    }
-                                    else if (hasInvalidEnvironmentIssue) {
-                                        U.warn(log, "Failed to commit transaction, node is in invalid state and will be stopped [tx=" + this +
-                                            ", err=" + ex + ']');
+                                    else if (op == DELETE) {
+                                        GridCacheUpdateTxResult updRes = cached.innerRemove(this,
+                                            eventNodeId(),
+                                            nodeId,
+                                            false,
+                                            true,
+                                            true,
+                                            txEntry.keepBinary(),
+                                            txEntry.hasOldValue(),
+                                            txEntry.oldValue(),
+                                            topVer,
+                                            null,
+                                            replicate ? DR_BACKUP : DR_NONE,
+                                            near() ? null : explicitVer,
+                                            CU.subjectId(this, cctx),
+                                            resolveTaskName(),
+                                            dhtVer,
+                                            txEntry.updateCounter(),
+                                            mvccSnapshot());
+
+                                        txEntry.updateCounter(updRes.updateCounter());
+
+                                        if (updRes.loggedPointer() != null)
+                                            ptr = updRes.loggedPointer();
+
+                                        // Keep near entry up to date.
+                                        if (nearCached != null)
+                                            nearCached.updateOrEvict(xidVer, null, 0, 0, nodeId, topVer);
                                     }
-                                    else
-                                        U.error(log, "Commit failed.", err);
-
-                                    state(UNKNOWN);
-
-                                    if (hasInvalidEnvironmentIssue)
-                                        cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, ex));
-                                    else if (!isNodeStopping) { // Skip fair uncommit in case of node stopping or invalidation.
-                                        try {
-                                            // Courtesy to minimize damage.
-                                            uncommit();
+                                    else if (op == RELOAD) {
+                                        CacheObject reloaded = cached.innerReload();
+
+                                        if (nearCached != null) {
+                                            nearCached.innerReload();
+
+                                            nearCached.updateOrEvict(cached.version(),
+                                                reloaded,
+                                                cached.expireTime(),
+                                                cached.ttl(),
+                                                nodeId,
+                                                topVer);
                                         }
-                                        catch (Throwable ex1) {
-                                            U.error(log, "Failed to uncommit transaction: " + this, ex1);
+                                    }
+                                    else if (op == READ) {
+                                        assert near();
 
-                                            if (ex1 instanceof Error)
-                                                throw ex1;
+                                        if (log.isDebugEnabled())
+                                            log.debug("Ignoring READ entry when committing: " + txEntry);
+                                    }
+                                    // No-op.
+                                    else {
+                                        if (conflictCtx == null || !conflictCtx.isUseOld()) {
+                                            if (txEntry.ttl() != CU.TTL_NOT_CHANGED)
+                                                cached.updateTtl(null, txEntry.ttl());
+
+                                            if (nearCached != null) {
+                                                CacheObject val0 = cached.valueBytes();
+
+                                                nearCached.updateOrEvict(xidVer,
+                                                    val0,
+                                                    cached.expireTime(),
+                                                    cached.ttl(),
+                                                    nodeId,
+                                                    topVer);
+                                            }
                                         }
                                     }
 
-                                    if (ex instanceof Error)
-                                        throw (Error) ex;
+                                    // Assert after setting values as we want to make sure
+                                    // that if we replaced removed entries.
+                                    assert
+                                        txEntry.op() == READ || onePhaseCommit() ||
+                                            // If candidate is not there, then lock was explicit
+                                            // and we simply allow the commit to proceed.
+                                            !cached.hasLockCandidateUnsafe(xidVer) || cached.lockedByUnsafe(xidVer) :
+                                        "Transaction does not own lock for commit [entry=" + cached +
+                                            ", tx=" + this + ']';
+
+                                    // Break out of while loop.
+                                    break;
+                                }
+                                catch (GridCacheEntryRemovedException ignored) {
+                                    if (log.isDebugEnabled())
+                                        log.debug("Attempting to commit a removed entry (will retry): " + txEntry);
 
-                                    throw err;
+                                    // Renew cached entry.
+                                    txEntry.cached(cacheCtx.cache().entryEx(txEntry.key(), topologyVersion()));
                                 }
                             }
+                        }
 
-                            // Apply cache size deltas.
-                            applyTxSizes();
+                        // Apply cache size deltas.
+                        applyTxSizes();
 
-                            TxCounters txCntrs = txCounters(false);
+                        TxCounters txCntrs = txCounters(false);
 
-                            // Apply update counters.
-                            if (txCntrs != null)
-                                applyPartitionsUpdatesCounters(txCntrs.updateCounters());
+                        // Apply update counters.
+                        if (txCntrs != null)
+                            applyPartitionsUpdatesCounters(txCntrs.updateCounters());
 
                             cctx.mvccCaching().onTxFinished(this, true);
 
@@ -827,18 +780,32 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
                                         .map(tuple -> tuple.get1().partitionCounter(tuple.get2().updateCounter()))
                                         .collect(Collectors.toList());
 
-                                cctx.wal().log(new DataRecord(entriesWithCounters));
-                            }
+                            cctx.wal().log(new DataRecord(entriesWithCounters));
+                        }
+
+                        if (ptr != null && !cctx.tm().logTxRecords())
+                            cctx.wal().flush(ptr, false);
+                    }
+                    catch (Throwable ex) {
+                        state(UNKNOWN);
 
-                            if (ptr != null && !cctx.tm().logTxRecords())
-                                cctx.wal().flush(ptr, false);
+                        if (X.hasCause(ex, NodeStoppingException.class)) {
+                            U.warn(log, "Failed to commit transaction, node is stopping [tx=" + CU.txString(this) +
+                                ", err=" + ex + ']');
+
+                            return;
                         }
-                        catch (StorageException e) {
-                            err = e;
 
-                            throw new IgniteCheckedException("Failed to log transaction record " +
-                                "(transaction will be rolled back): " + this, e);
+                        err = heuristicException(ex);
+
+                        try {
+                            uncommit();
+                        }
+                        catch (Throwable e) {
+                            err.addSuppressed(e);
                         }
+
+                        throw err;
                     }
                     finally {
                         cctx.database().checkpointReadUnlock();
@@ -878,9 +845,19 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
                 throw new IgniteCheckedException("Invalid transaction state for commit [state=" + state + ", tx=" + this + ']');
 
             rollbackRemoteTx();
+
+            return;
         }
 
-        commitIfLocked();
+        try {
+            commitIfLocked();
+        }
+        catch (IgniteTxHeuristicCheckedException e) {
+            // Treat heuristic exception as critical.
+            cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
+
+            throw e;
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/5eb871e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
index 21eb7b2..9f96b46 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
@@ -173,10 +173,13 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCacheCompoundIdentity
         if (ERR_UPD.compareAndSet(this, null, e)) {
             tx.setRollbackOnly();
 
-            if (X.hasCause(e, InvalidEnvironmentException.class, NodeStoppingException.class))
+            if (X.hasCause(e, NodeStoppingException.class) || cctx.kernalContext().failure().nodeStopping())
                 onComplete();
-            else
+            else {
+                // Rolling back a remote transaction may result in partial commit.
+                // This is only acceptable in tests with no-op failure handler.
                 finish(false);
+            }
         }
     }
 
@@ -230,9 +233,9 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCacheCompoundIdentity
 
             if (this.tx.onePhaseCommit() && (this.tx.state() == COMMITTING)) {
                 try {
-                    boolean hasInvalidEnvironmentIssue = X.hasCause(err, InvalidEnvironmentException.class, NodeStoppingException.class);
+                    boolean nodeStopping = X.hasCause(err, NodeStoppingException.class);
 
-                    this.tx.tmFinish(err == null, hasInvalidEnvironmentIssue, false);
+                    this.tx.tmFinish(err == null, nodeStopping || cctx.kernalContext().failure().nodeStopping(), false);
                 }
                 catch (IgniteCheckedException finishErr) {
                     U.error(log, "Failed to finish tx: " + tx, e);

http://git-wip-us.apache.org/repos/asf/ignite/blob/5eb871e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
index a091d44..ca451f0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
@@ -24,6 +24,8 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.failure.FailureContext;
+import org.apache.ignite.failure.FailureType;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
@@ -39,6 +41,7 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPr
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException;
 import org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException;
 import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
 import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
@@ -46,6 +49,7 @@ import org.apache.ignite.internal.util.tostring.GridToStringBuilder;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteInClosure;
@@ -467,7 +471,11 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
                     ", tx=" + CU.txString(this) + ']');
         }
         catch (IgniteCheckedException e) {
-            U.error(log, "Failed to finish transaction [commit=" + commit + ", tx=" + this + ']', e);
+            logTxFinishErrorSafe(log, commit, e);
+
+            // Treat heuristic exception as critical.
+            if (X.hasCause(e, IgniteTxHeuristicCheckedException.class))
+                cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
 
             err = e;
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/5eb871e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
index ffa383b..483990f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
@@ -30,6 +30,8 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.failure.FailureContext;
+import org.apache.ignite.failure.FailureType;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager;
@@ -43,6 +45,7 @@ import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTx
 import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteTxAdapter;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalAdapter;
 import org.apache.ignite.internal.processors.cache.transactions.TxCounters;

http://git-wip-us.apache.org/repos/asf/ignite/blob/5eb871e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 741faee..c505677 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -36,6 +36,8 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteInterruptedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.failure.FailureContext;
+import org.apache.ignite.failure.FailureType;
 import org.apache.ignite.internal.IgniteDiagnosticAware;
 import org.apache.ignite.internal.IgniteDiagnosticPrepareContext;
 import org.apache.ignite.internal.IgniteInternalFuture;
@@ -740,8 +742,6 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite
 
             if (tx.commitOnPrepare()) {
                 if (tx.markFinalizing(IgniteInternalTx.FinalizationStatus.USER_FINISH)) {
-                    IgniteInternalFuture<IgniteInternalTx> fut = null;
-
                     CIX1<IgniteInternalFuture<IgniteInternalTx>> resClo =
                         new CIX1<IgniteInternalFuture<IgniteInternalTx>>() {
                             @Override public void applyx(IgniteInternalFuture<IgniteInternalTx> fut) {
@@ -753,42 +753,43 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite
                             }
                         };
 
-                    if (prepErr == null) {
-                        try {
-                            fut = tx.commitAsync();
-                        }
-                        catch (RuntimeException | Error e) {
-                            Exception hEx = new IgniteTxHeuristicCheckedException("Commit produced a runtime " +
-                                "exception: " + CU.txString(tx), e);
-
-                            res.error(hEx);
+                    try {
+                        if (prepErr == null) {
+                            try {
+                                tx.commitAsync().listen(resClo);
+                            }
+                            catch (Throwable e) {
+                                res.error(e);
 
-                            tx.systemInvalidate(true);
+                                tx.systemInvalidate(true);
 
-                            try {
-                                fut = tx.rollbackAsync();
+                                try {
+                                    tx.rollbackAsync().listen(resClo);
+                                }
+                                catch (Throwable e1) {
+                                    e.addSuppressed(e1);
+                                }
 
-                                fut.listen(resClo);
+                                throw e;
                             }
-                            catch (Throwable e1) {
-                                e.addSuppressed(e1);
+                        }
+                        else if (!cctx.kernalContext().isStopping()) {
+                            try {
+                                tx.rollbackAsync().listen(resClo);
                             }
+                            catch (Throwable e) {
+                                if (err != null)
+                                    err.addSuppressed(e);
 
-                            throw e;
+                                throw err;
+                            }
                         }
-
                     }
-                    else if (!cctx.kernalContext().isStopping())
-                        try {
-                            fut = tx.rollbackAsync();
-                        }
-                        catch (Throwable e) {
-                            err.addSuppressed(e);
-                            fut = null;
-                        }
+                    catch (Throwable e){
+                        tx.logTxFinishErrorSafe(log, true, e);
 
-                    if (fut != null)
-                        fut.listen(resClo);
+                        cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
+                    }
                 }
             }
             else {

http://git-wip-us.apache.org/repos/asf/ignite/blob/5eb871e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index 4a4d8e3..befa305 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@ -311,7 +311,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit
             if (err != null) {
                 tx.setRollbackOnly();
 
-                nodeStop = err instanceof NodeStoppingException;
+                nodeStop = err instanceof NodeStoppingException || cctx.kernalContext().failure().nodeStopping();
             }
 
             if (commit) {
@@ -357,29 +357,6 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit
                 }
 
                 if (super.onDone(tx0, err)) {
-                    if (error() instanceof IgniteTxHeuristicCheckedException && !nodeStop) {
-                        AffinityTopologyVersion topVer = tx.topologyVersion();
-
-                        for (IgniteTxEntry e : tx.writeMap().values()) {
-                            GridCacheContext cacheCtx = e.context();
-
-                            try {
-                                if (e.op() != NOOP && !cacheCtx.affinity().keyLocalNode(e.key(), topVer)) {
-                                    GridCacheEntryEx entry = cacheCtx.cache().peekEx(e.key());
-
-                                    if (entry != null)
-                                        entry.invalidate(tx.xidVersion());
-                                }
-                            }
-                            catch (Throwable t) {
-                                U.error(log, "Failed to invalidate entry.", t);
-
-                                if (t instanceof Error)
-                                    throw (Error)t;
-                            }
-                        }
-                    }
-
                     // Don't forget to clean up.
                     cctx.mvcc().removeFuture(futId);
 
@@ -402,8 +379,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit
     }
 
     /** {@inheritDoc} */
-    @Override @SuppressWarnings("ForLoopReplaceableByForEach")
-    public void finish(final boolean commit, final boolean clearThreadMap, final boolean onTimeout) {
+    @Override public void finish(final boolean commit, final boolean clearThreadMap, final boolean onTimeout) {
         if (!cctx.mvcc().addFuture(this, futureId()))
             return;
 
@@ -490,18 +466,22 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit
                     }
                 }
 
+                // Cleanup transaction if heuristic failure.
+                if (tx.state() == UNKNOWN)
+                    cctx.tm().rollbackTx(tx, clearThreadMap, false);
+
                 if ((tx.onePhaseCommit() && needFinishOnePhase(commit)) || (!tx.onePhaseCommit() && mappings != null)) {
                     if (mappings.single()) {
                         GridDistributedTxMapping mapping = mappings.singleMapping();
 
                         if (mapping != null) {
-                            assert !hasFutures() || waitTxs != null : futures();
+                            assert !hasFutures() || isDone() || waitTxs != null : futures();
 
                             finish(1, mapping, commit, !clearThreadMap);
                         }
                     }
                     else {
-                        assert !hasFutures() || waitTxs != null : futures();
+                        assert !hasFutures() || isDone() || waitTxs != null : futures();
 
                         finish(mappings.mappings(), commit, !clearThreadMap);
                     }
@@ -762,7 +742,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit
     /**
      * @param mappings Mappings.
      * @param commit Commit flag.
-     * @param {@code true} If need to add completed version on finish.
+     * @param useCompletedVer {@code True} if need to add completed version on finish.
      */
     private void finish(Iterable<GridDistributedTxMapping> mappings, boolean commit, boolean useCompletedVer) {
         int miniId = 0;

http://git-wip-us.apache.org/repos/asf/ignite/blob/5eb871e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index 76d464e..f56d99b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -3955,7 +3955,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
 
                                 assert rollbackFut.isDone() : rollbackFut;
                             }
-                            else
+                            else // First finish attempt was unsuccessful. Try again.
                                 rollbackFut.finish(false, clearThreadMap, onTimeout);
                         }
                         else {

http://git-wip-us.apache.org/repos/asf/ignite/blob/5eb871e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index b091061..0d3ba75 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -75,6 +75,7 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx;
 import org.apache.ignite.internal.processors.cluster.BaselineTopology;
+import org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException;
 import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
 import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
 import org.apache.ignite.internal.util.GridSetWrapper;
@@ -764,6 +765,36 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
             "[timeout=" + timeout() + ", tx=" + CU.txString(this) + ']');
     }
 
+    /**
+     * @param ex Root cause.
+     */
+    public final IgniteCheckedException heuristicException(Throwable ex) {
+        return new IgniteTxHeuristicCheckedException("Committing a transaction has produced runtime exception", ex);
+    }
+
+    /**
+     * @param log Log.
+     * @param commit Commit.
+     * @param e Exception.
+     */
+    public void logTxFinishErrorSafe(@Nullable IgniteLogger log, boolean commit, Throwable e) {
+        assert e != null : "Exception is expected";
+
+        final String fmt = "Failed completing the transaction: [commit=%s, tx=%s, plc=%s]";
+
+        try {
+            // First try printing a full transaction. This is error prone.
+            U.error(log, String.format(fmt, commit, this,
+                cctx.gridConfig().getFailureHandler().getClass().getSimpleName()), e);
+        }
+        catch (Throwable e0) {
+            e.addSuppressed(e0);
+
+            U.error(log, String.format(fmt, commit, CU.txString(this),
+                cctx.gridConfig().getFailureHandler().getClass().getSimpleName()), e);
+        }
+    }
+
     /** {@inheritDoc} */
     @Override public GridCacheVersion xidVersion() {
         return xidVer;

http://git-wip-us.apache.org/repos/asf/ignite/blob/5eb871e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index 4c7b65d..895a9d1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -1030,45 +1030,34 @@ public class IgniteTxHandler {
         }
         catch (Throwable e) {
             try {
-                U.error(log, "Failed completing transaction [commit=" + req.commit() + ", tx=" + tx + ']', e);
-            }
-            catch (Throwable e0) {
-                ClusterNode node0 = ctx.discovery().node(nodeId);
-
-                U.error(log, "Failed completing transaction [commit=" + req.commit() + ", tx=" +
-                        CU.txString(tx) + ']', e);
-
-                U.error(log, "Failed to log message due to an error: ", e0);
+                if (tx != null) {
+                    tx.commitError(e);
 
-                if (node0 != null && (!node0.isClient() || node0.isLocal())) {
-                    ctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
+                    tx.systemInvalidate(true);
 
-                    throw e;
-                }
-            }
-
-            if (tx != null) {
-                tx.commitError(e);
-
-                tx.systemInvalidate(true);
+                    try {
+                        IgniteInternalFuture<IgniteInternalTx> res = tx.rollbackDhtLocalAsync();
 
-                try {
-                    IgniteInternalFuture<IgniteInternalTx> res = tx.rollbackDhtLocalAsync();
+                        // Only for error logging.
+                        res.listen(CU.errorLogger(log));
 
-                    // Only for error logging.
-                    res.listen(CU.errorLogger(log));
+                        return res;
+                    }
+                    catch (Throwable e1) {
+                        e.addSuppressed(e1);
+                    }
 
-                    return res;
+                    tx.logTxFinishErrorSafe(log, req.commit(), e);
                 }
-                catch (Throwable e1) {
-                    e.addSuppressed(e1);
-                }
-            }
 
-            if (e instanceof Error)
-                throw (Error)e;
+                if (e instanceof Error)
+                    throw (Error)e;
 
-            return new GridFinishedFuture<>(e);
+                return new GridFinishedFuture<>(e);
+            }
+            finally {
+                ctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
+            }
         }
     }
 
@@ -1093,20 +1082,26 @@ public class IgniteTxHandler {
                 return tx.rollbackAsyncLocal();
         }
         catch (Throwable e) {
-            U.error(log, "Failed completing transaction [commit=" + commit + ", tx=" + tx + ']', e);
-
-            if (e instanceof Error)
-                throw e;
+            try {
+                if (tx != null) {
+                    try {
+                        return tx.rollbackNearTxLocalAsync();
+                    }
+                    catch (Throwable e1) {
+                        e.addSuppressed(e1);
+                    }
 
-            if (tx != null)
-                try {
-                    return tx.rollbackNearTxLocalAsync();
-                }
-                catch (Throwable e1) {
-                    e.addSuppressed(e1);
+                    tx.logTxFinishErrorSafe(log, commit, e);
                 }
 
-            return new GridFinishedFuture<>(e);
+                if (e instanceof Error)
+                    throw e;
+
+                return new GridFinishedFuture<>(e);
+            }
+            finally {
+                ctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
+            }
         }
     }
 
@@ -1193,10 +1188,6 @@ public class IgniteTxHandler {
                 if (log.isDebugEnabled())
                     log.debug("Optimistic failure for remote transaction (will rollback): " + req);
             }
-            else if (e instanceof IgniteTxHeuristicCheckedException) {
-                U.warn(log, "Failed to commit transaction (all transaction entries were invalidated): " +
-                    CU.txString(dhtTx));
-            }
             else
                 U.error(log, "Failed to process prepare request: " + req, e);
 
@@ -1421,9 +1412,10 @@ public class IgniteTxHandler {
                 tx.rollbackRemoteTx();
             }
         }
+        catch (IgniteTxHeuristicCheckedException e) {
+            // Already uncommitted.
+        }
         catch (Throwable e) {
-            U.error(log, "Failed completing transaction [commit=" + req.commit() + ", tx=" + tx + ']', e);
-
             // Mark transaction for invalidate.
             tx.invalidate(true);
             tx.systemInvalidate(true);
@@ -1441,6 +1433,8 @@ public class IgniteTxHandler {
     }
 
     /**
+     * Finish for one-phase distributed tx.
+     *
      * @param tx Transaction.
      * @param req Request.
      */
@@ -1464,22 +1458,27 @@ public class IgniteTxHandler {
             throw e;
         }
         catch (Throwable e) {
-            U.error(log, "Failed committing transaction [tx=" + tx + ']', e);
+            try {
+                // Mark transaction for invalidate.
+                tx.invalidate(true);
 
-            // Mark transaction for invalidate.
-            tx.invalidate(true);
-            tx.systemInvalidate(true);
+                tx.systemInvalidate(true);
 
-            try {
-                tx.rollbackRemoteTx();
+                try {
+                    tx.rollbackRemoteTx();
+                }
+                catch (Throwable e1) {
+                    e.addSuppressed(e1);
+                }
+
+                tx.logTxFinishErrorSafe(log, true, e);
+
+                if (e instanceof Error)
+                    throw (Error)e;
             }
-            catch (Throwable e1) {
-                e.addSuppressed(e1);
-                U.error(log, "Failed to automatically rollback transaction: " + tx, e1);
+            finally {
+                ctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
             }
-
-            if (e instanceof Error)
-                throw (Error)e;
         }
     }
 


[2/3] ignite git commit: IGNITE-9082 Throwing checked exception during tx commit without node stopping leads to data corruption - Fixes #4809.

Posted by ir...@apache.org.
http://git-wip-us.apache.org/repos/asf/ignite/blob/5eb871e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 775b61c..7e04292 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -558,7 +558,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
 
     /** {@inheritDoc} */
     @SuppressWarnings({"CatchGenericClass"})
-    @Override public final void userCommit() throws IgniteCheckedException {
+    @Override public void userCommit() throws IgniteCheckedException {
         TransactionState state = state();
 
         if (state != COMMITTING) {
@@ -590,7 +590,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
 
             WALPointer ptr = null;
 
-            Exception err = null;
+            IgniteCheckedException err = null;
 
             cctx.database().checkpointReadLock();
 
@@ -609,176 +609,175 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
 
                     UUID nodeId = txEntry.nodeId() == null ? this.nodeId : txEntry.nodeId();
 
-                    try {
-                        while (true) {
-                            try {
-                                GridCacheEntryEx cached = txEntry.cached();
+                    while (true) {
+                        try {
+                            GridCacheEntryEx cached = txEntry.cached();
 
-                                // Must try to evict near entries before committing from
-                                // transaction manager to make sure locks are held.
-                                if (!evictNearEntry(txEntry, false)) {
-                                    if (cacheCtx.isNear() && cacheCtx.dr().receiveEnabled()) {
-                                        cached.markObsolete(xidVer);
+                            // Must try to evict near entries before committing from
+                            // transaction manager to make sure locks are held.
+                            if (!evictNearEntry(txEntry, false)) {
+                                if (cacheCtx.isNear() && cacheCtx.dr().receiveEnabled()) {
+                                    cached.markObsolete(xidVer);
 
-                                        break;
-                                    }
+                                    break;
+                                }
 
-                                    if (cached.detached())
-                                        break;
+                                if (cached.detached())
+                                    break;
 
-                                    boolean updateNearCache = updateNearCache(cacheCtx, txEntry.key(), topVer);
+                                boolean updateNearCache = updateNearCache(cacheCtx, txEntry.key(), topVer);
 
-                                    boolean metrics = true;
+                                boolean metrics = true;
 
-                                    if (!updateNearCache && cacheCtx.isNear() && txEntry.locallyMapped())
-                                        metrics = false;
+                                if (!updateNearCache && cacheCtx.isNear() && txEntry.locallyMapped())
+                                    metrics = false;
 
-                                    boolean evt = !isNearLocallyMapped(txEntry, false);
+                                boolean evt = !isNearLocallyMapped(txEntry, false);
 
-                                    if (!F.isEmpty(txEntry.entryProcessors()) || !F.isEmpty(txEntry.filters()))
-                                        txEntry.cached().unswap(false);
+                                if (!F.isEmpty(txEntry.entryProcessors()) || !F.isEmpty(txEntry.filters()))
+                                    txEntry.cached().unswap(false);
 
-                                    IgniteBiTuple<GridCacheOperation, CacheObject> res = applyTransformClosures(txEntry,
-                                        true, null);
+                                IgniteBiTuple<GridCacheOperation, CacheObject> res = applyTransformClosures(txEntry,
+                                    true, null);
 
-                                    GridCacheVersion dhtVer = null;
+                                GridCacheVersion dhtVer = null;
 
-                                    // For near local transactions we must record DHT version
-                                    // in order to keep near entries on backup nodes until
-                                    // backup remote transaction completes.
-                                    if (cacheCtx.isNear()) {
-                                        if (txEntry.op() == CREATE || txEntry.op() == UPDATE ||
-                                            txEntry.op() == DELETE || txEntry.op() == TRANSFORM)
-                                            dhtVer = txEntry.dhtVersion();
+                                // For near local transactions we must record DHT version
+                                // in order to keep near entries on backup nodes until
+                                // backup remote transaction completes.
+                                if (cacheCtx.isNear()) {
+                                    if (txEntry.op() == CREATE || txEntry.op() == UPDATE ||
+                                        txEntry.op() == DELETE || txEntry.op() == TRANSFORM)
+                                        dhtVer = txEntry.dhtVersion();
 
-                                        if ((txEntry.op() == CREATE || txEntry.op() == UPDATE) &&
-                                            txEntry.conflictExpireTime() == CU.EXPIRE_TIME_CALCULATE) {
-                                            ExpiryPolicy expiry = cacheCtx.expiryForTxEntry(txEntry);
+                                    if ((txEntry.op() == CREATE || txEntry.op() == UPDATE) &&
+                                        txEntry.conflictExpireTime() == CU.EXPIRE_TIME_CALCULATE) {
+                                        ExpiryPolicy expiry = cacheCtx.expiryForTxEntry(txEntry);
 
-                                            if (expiry != null) {
-                                                txEntry.cached().unswap(false);
+                                        if (expiry != null) {
+                                            txEntry.cached().unswap(false);
 
-                                                Duration duration = cached.hasValue() ?
-                                                    expiry.getExpiryForUpdate() : expiry.getExpiryForCreation();
+                                            Duration duration = cached.hasValue() ?
+                                                expiry.getExpiryForUpdate() : expiry.getExpiryForCreation();
 
-                                                txEntry.ttl(CU.toTtl(duration));
-                                            }
+                                            txEntry.ttl(CU.toTtl(duration));
                                         }
                                     }
+                                }
 
-                                    GridCacheOperation op = res.get1();
-                                    CacheObject val = res.get2();
+                                GridCacheOperation op = res.get1();
+                                CacheObject val = res.get2();
 
-                                    // Deal with conflicts.
-                                    GridCacheVersion explicitVer = txEntry.conflictVersion() != null ?
-                                        txEntry.conflictVersion() : writeVersion();
+                                // Deal with conflicts.
+                                GridCacheVersion explicitVer = txEntry.conflictVersion() != null ?
+                                    txEntry.conflictVersion() : writeVersion();
 
-                                    if ((op == CREATE || op == UPDATE) &&
-                                        txEntry.conflictExpireTime() == CU.EXPIRE_TIME_CALCULATE) {
-                                        ExpiryPolicy expiry = cacheCtx.expiryForTxEntry(txEntry);
+                                if ((op == CREATE || op == UPDATE) &&
+                                    txEntry.conflictExpireTime() == CU.EXPIRE_TIME_CALCULATE) {
+                                    ExpiryPolicy expiry = cacheCtx.expiryForTxEntry(txEntry);
 
-                                        if (expiry != null) {
-                                            Duration duration = cached.hasValue() ?
-                                                expiry.getExpiryForUpdate() : expiry.getExpiryForCreation();
+                                    if (expiry != null) {
+                                        Duration duration = cached.hasValue() ?
+                                            expiry.getExpiryForUpdate() : expiry.getExpiryForCreation();
 
-                                            long ttl = CU.toTtl(duration);
+                                        long ttl = CU.toTtl(duration);
 
-                                            txEntry.ttl(ttl);
+                                        txEntry.ttl(ttl);
 
-                                            if (ttl == CU.TTL_ZERO)
-                                                op = DELETE;
-                                        }
+                                        if (ttl == CU.TTL_ZERO)
+                                            op = DELETE;
                                     }
+                                }
 
-                                    boolean conflictNeedResolve = cacheCtx.conflictNeedResolve();
-
-                                    GridCacheVersionConflictContext<?, ?> conflictCtx = null;
-
-                                    if (conflictNeedResolve) {
-                                        IgniteBiTuple<GridCacheOperation, GridCacheVersionConflictContext> conflictRes =
-                                            conflictResolve(op, txEntry, val, explicitVer, cached);
+                                boolean conflictNeedResolve = cacheCtx.conflictNeedResolve();
 
-                                        assert conflictRes != null;
+                                GridCacheVersionConflictContext<?, ?> conflictCtx = null;
 
-                                        conflictCtx = conflictRes.get2();
+                                if (conflictNeedResolve) {
+                                    IgniteBiTuple<GridCacheOperation, GridCacheVersionConflictContext> conflictRes =
+                                        conflictResolve(op, txEntry, val, explicitVer, cached);
 
-                                        if (conflictCtx.isUseOld())
-                                            op = NOOP;
-                                        else if (conflictCtx.isUseNew()) {
-                                            txEntry.ttl(conflictCtx.ttl());
-                                            txEntry.conflictExpireTime(conflictCtx.expireTime());
-                                        }
-                                        else {
-                                            assert conflictCtx.isMerge();
+                                    assert conflictRes != null;
 
-                                            op = conflictRes.get1();
-                                            val = txEntry.context().toCacheObject(conflictCtx.mergeValue());
-                                            explicitVer = writeVersion();
+                                    conflictCtx = conflictRes.get2();
 
-                                            txEntry.ttl(conflictCtx.ttl());
-                                            txEntry.conflictExpireTime(conflictCtx.expireTime());
-                                        }
+                                    if (conflictCtx.isUseOld())
+                                        op = NOOP;
+                                    else if (conflictCtx.isUseNew()) {
+                                        txEntry.ttl(conflictCtx.ttl());
+                                        txEntry.conflictExpireTime(conflictCtx.expireTime());
                                     }
-                                    else
-                                        // Nullify explicit version so that innerSet/innerRemove will work as usual.
-                                        explicitVer = null;
+                                    else {
+                                        assert conflictCtx.isMerge();
 
-                                    if (sndTransformedVals || conflictNeedResolve) {
-                                        assert sndTransformedVals && cacheCtx.isReplicated() || conflictNeedResolve;
+                                        op = conflictRes.get1();
+                                        val = txEntry.context().toCacheObject(conflictCtx.mergeValue());
+                                        explicitVer = writeVersion();
 
-                                        txEntry.value(val, true, false);
-                                        txEntry.op(op);
-                                        txEntry.entryProcessors(null);
-                                        txEntry.conflictVersion(explicitVer);
+                                        txEntry.ttl(conflictCtx.ttl());
+                                        txEntry.conflictExpireTime(conflictCtx.expireTime());
                                     }
+                                }
+                                else
+                                    // Nullify explicit version so that innerSet/innerRemove will work as usual.
+                                    explicitVer = null;
 
-                                    if (dhtVer == null)
-                                        dhtVer = explicitVer != null ? explicitVer : writeVersion();
+                                if (sndTransformedVals || conflictNeedResolve) {
+                                    assert sndTransformedVals && cacheCtx.isReplicated() || conflictNeedResolve;
 
-                                    if (op == CREATE || op == UPDATE) {
-                                        assert val != null : txEntry;
+                                    txEntry.value(val, true, false);
+                                    txEntry.op(op);
+                                    txEntry.entryProcessors(null);
+                                    txEntry.conflictVersion(explicitVer);
+                                }
 
-                                        GridCacheUpdateTxResult updRes = cached.innerSet(
-                                            this,
-                                            eventNodeId(),
-                                            txEntry.nodeId(),
-                                            val,
-                                            false,
-                                            false,
-                                            txEntry.ttl(),
-                                            evt,
-                                            metrics,
-                                            txEntry.keepBinary(),
-                                            txEntry.hasOldValue(),
-                                            txEntry.oldValue(),
-                                            topVer,
-                                            null,
-                                            cached.detached() ? DR_NONE : drType,
-                                            txEntry.conflictExpireTime(),
-                                            cached.isNear() ? null : explicitVer,
-                                            CU.subjectId(this, cctx),
-                                            resolveTaskName(),
-                                            dhtVer,
-                                            null,
-                                            mvccSnapshot());
-
-                                        if (updRes.success()) {
-                                            txEntry.updateCounter(updRes.updateCounter());
-
-                                            GridLongList waitTxs = updRes.mvccWaitTransactions();
-
-                                            updateWaitTxs(waitTxs);
-                                        }
+                                if (dhtVer == null)
+                                    dhtVer = explicitVer != null ? explicitVer : writeVersion();
+
+                                if (op == CREATE || op == UPDATE) {
+                                    assert val != null : txEntry;
+
+                                    GridCacheUpdateTxResult updRes = cached.innerSet(
+                                        this,
+                                        eventNodeId(),
+                                        txEntry.nodeId(),
+                                        val,
+                                        false,
+                                        false,
+                                        txEntry.ttl(),
+                                        evt,
+                                        metrics,
+                                        txEntry.keepBinary(),
+                                        txEntry.hasOldValue(),
+                                        txEntry.oldValue(),
+                                        topVer,
+                                        null,
+                                        cached.detached() ? DR_NONE : drType,
+                                        txEntry.conflictExpireTime(),
+                                        cached.isNear() ? null : explicitVer,
+                                        CU.subjectId(this, cctx),
+                                        resolveTaskName(),
+                                        dhtVer,
+                                        null,
+                                        mvccSnapshot());
+
+                                    if (updRes.success()) {
+                                        txEntry.updateCounter(updRes.updateCounter());
+
+                                        GridLongList waitTxs = updRes.mvccWaitTransactions();
+
+                                        updateWaitTxs(waitTxs);
+                                    }
 
-                                        if (updRes.loggedPointer() != null)
-                                            ptr = updRes.loggedPointer();
+                                    if (updRes.loggedPointer() != null)
+                                        ptr = updRes.loggedPointer();
 
-                                        if (updRes.success() && updateNearCache) {
-                                            final CacheObject val0 = val;
-                                            final boolean metrics0 = metrics;
-                                            final GridCacheVersion dhtVer0 = dhtVer;
+                                    if (updRes.success() && updateNearCache) {
+                                        final CacheObject val0 = val;
+                                        final boolean metrics0 = metrics;
+                                        final GridCacheVersion dhtVer0 = dhtVer;
 
-                                            updateNearEntrySafely(cacheCtx, txEntry.key(), entry -> entry.innerSet(
+                                        updateNearEntrySafely(cacheCtx, txEntry.key(), entry -> entry.innerSet(
                                                 null,
                                                 eventNodeId(),
                                                 nodeId,
@@ -801,46 +800,46 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
                                                 dhtVer0,
                                                 null,
                                                 mvccSnapshot())
-                                            );
-                                        }
+                                        );
+                                    }
+                                }
+                                else if (op == DELETE) {
+                                    GridCacheUpdateTxResult updRes = cached.innerRemove(
+                                        this,
+                                        eventNodeId(),
+                                        txEntry.nodeId(),
+                                        false,
+                                        evt,
+                                        metrics,
+                                        txEntry.keepBinary(),
+                                        txEntry.hasOldValue(),
+                                        txEntry.oldValue(),
+                                        topVer,
+                                        null,
+                                        cached.detached() ? DR_NONE : drType,
+                                        cached.isNear() ? null : explicitVer,
+                                        CU.subjectId(this, cctx),
+                                        resolveTaskName(),
+                                        dhtVer,
+                                        null,
+                                        mvccSnapshot());
+
+                                    if (updRes.success()) {
+                                        txEntry.updateCounter(updRes.updateCounter());
+
+                                        GridLongList waitTxs = updRes.mvccWaitTransactions();
+
+                                        updateWaitTxs(waitTxs);
                                     }
-                                    else if (op == DELETE) {
-                                        GridCacheUpdateTxResult updRes = cached.innerRemove(
-                                            this,
-                                            eventNodeId(),
-                                            txEntry.nodeId(),
-                                            false,
-                                            evt,
-                                            metrics,
-                                            txEntry.keepBinary(),
-                                            txEntry.hasOldValue(),
-                                            txEntry.oldValue(),
-                                            topVer,
-                                            null,
-                                            cached.detached() ? DR_NONE : drType,
-                                            cached.isNear() ? null : explicitVer,
-                                            CU.subjectId(this, cctx),
-                                            resolveTaskName(),
-                                            dhtVer,
-                                            null,
-                                            mvccSnapshot());
-
-                                        if (updRes.success()) {
-                                            txEntry.updateCounter(updRes.updateCounter());
-
-                                            GridLongList waitTxs = updRes.mvccWaitTransactions();
-
-                                            updateWaitTxs(waitTxs);
-                                        }
 
-                                        if (updRes.loggedPointer() != null)
-                                            ptr = updRes.loggedPointer();
+                                    if (updRes.loggedPointer() != null)
+                                        ptr = updRes.loggedPointer();
 
-                                        if (updRes.success() && updateNearCache) {
-                                            final boolean metrics0 = metrics;
-                                            final GridCacheVersion dhtVer0 = dhtVer;
+                                    if (updRes.success() && updateNearCache) {
+                                        final boolean metrics0 = metrics;
+                                        final GridCacheVersion dhtVer0 = dhtVer;
 
-                                            updateNearEntrySafely(cacheCtx, txEntry.key(), entry -> entry.innerRemove(
+                                        updateNearEntrySafely(cacheCtx, txEntry.key(), entry -> entry.innerRemove(
                                                 null,
                                                 eventNodeId(),
                                                 nodeId,
@@ -859,125 +858,78 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
                                                 dhtVer0,
                                                 null,
                                                 mvccSnapshot())
-                                            );
-                                        }
+                                        );
                                     }
-                                    else if (op == RELOAD) {
-                                        cached.innerReload();
+                                }
+                                else if (op == RELOAD) {
+                                    cached.innerReload();
 
-                                        if (updateNearCache)
-                                            updateNearEntrySafely(cacheCtx, txEntry.key(), entry -> entry.innerReload());
+                                    if (updateNearCache)
+                                        updateNearEntrySafely(cacheCtx, txEntry.key(), entry -> entry.innerReload());
+                                }
+                                else if (op == READ) {
+                                    CacheGroupContext grp = cacheCtx.group();
+
+                                    if (grp.persistenceEnabled() && grp.walEnabled() &&
+                                        cctx.snapshot().needTxReadLogging()) {
+                                        ptr = cctx.wal().log(new DataRecord(new DataEntry(
+                                            cacheCtx.cacheId(),
+                                            txEntry.key(),
+                                            val,
+                                            op,
+                                            nearXidVersion(),
+                                            writeVersion(),
+                                            0,
+                                            txEntry.key().partition(),
+                                            txEntry.updateCounter())));
                                     }
-                                    else if (op == READ) {
-                                        CacheGroupContext grp = cacheCtx.group();
-
-                                        if (grp.persistenceEnabled() && grp.walEnabled() &&
-                                            cctx.snapshot().needTxReadLogging()) {
-                                            ptr = cctx.wal().log(new DataRecord(new DataEntry(
-                                                cacheCtx.cacheId(),
-                                                txEntry.key(),
-                                                val,
-                                                op,
-                                                nearXidVersion(),
-                                                writeVersion(),
-                                                0,
-                                                txEntry.key().partition(),
-                                                txEntry.updateCounter())));
-                                        }
 
-                                        ExpiryPolicy expiry = cacheCtx.expiryForTxEntry(txEntry);
+                                    ExpiryPolicy expiry = cacheCtx.expiryForTxEntry(txEntry);
 
-                                        if (expiry != null) {
-                                            Duration duration = expiry.getExpiryForAccess();
+                                    if (expiry != null) {
+                                        Duration duration = expiry.getExpiryForAccess();
 
-                                            if (duration != null)
-                                                cached.updateTtl(null, CU.toTtl(duration));
-                                        }
-
-                                        if (log.isDebugEnabled())
-                                            log.debug("Ignoring READ entry when committing: " + txEntry);
+                                        if (duration != null)
+                                            cached.updateTtl(null, CU.toTtl(duration));
                                     }
-                                    else {
-                                        assert ownsLock(txEntry.cached()) :
-                                            "Transaction does not own lock for group lock entry during  commit [tx=" +
-                                                this + ", txEntry=" + txEntry + ']';
 
-                                        if (conflictCtx == null || !conflictCtx.isUseOld()) {
-                                            if (txEntry.ttl() != CU.TTL_NOT_CHANGED)
-                                                cached.updateTtl(null, txEntry.ttl());
-                                        }
-
-                                        if (log.isDebugEnabled())
-                                            log.debug("Ignoring NOOP entry when committing: " + txEntry);
-                                    }
+                                    if (log.isDebugEnabled())
+                                        log.debug("Ignoring READ entry when committing: " + txEntry);
                                 }
+                                else {
+                                    assert ownsLock(txEntry.cached()) :
+                                        "Transaction does not own lock for group lock entry during  commit [tx=" +
+                                            this + ", txEntry=" + txEntry + ']';
+
+                                    if (conflictCtx == null || !conflictCtx.isUseOld()) {
+                                        if (txEntry.ttl() != CU.TTL_NOT_CHANGED)
+                                            cached.updateTtl(null, txEntry.ttl());
+                                    }
 
-                                // Check commit locks after set, to make sure that
-                                // we are not changing obsolete entries.
-                                // (innerSet and innerRemove will throw an exception
-                                // if an entry is obsolete).
-                                if (txEntry.op() != READ)
-                                    checkCommitLocks(cached);
-
-                                // Break out of while loop.
-                                break;
-                            }
-                            // If entry cached within transaction got removed.
-                            catch (GridCacheEntryRemovedException ignored) {
-                                if (log.isDebugEnabled())
-                                    log.debug("Got removed entry during transaction commit (will retry): " + txEntry);
-
-                                txEntry.cached(entryEx(cacheCtx, txEntry.txKey(), topologyVersion()));
+                                    if (log.isDebugEnabled())
+                                        log.debug("Ignoring NOOP entry when committing: " + txEntry);
+                                }
                             }
-                        }
-                    }
-                    catch (Throwable ex) {
-                        // We are about to initiate transaction rollback when tx has started to committing.
-                        // Need to remove version from committed list.
-                        cctx.tm().removeCommittedTx(this);
 
-                        boolean isNodeStopping = X.hasCause(ex, NodeStoppingException.class);
-                        boolean hasInvalidEnvironmentIssue = X.hasCause(ex, InvalidEnvironmentException.class);
+                            // Check commit locks after set, to make sure that
+                            // we are not changing obsolete entries.
+                            // (innerSet and innerRemove will throw an exception
+                            // if an entry is obsolete).
+                            if (txEntry.op() != READ)
+                                checkCommitLocks(cached);
 
-                        IgniteCheckedException err0 = new IgniteTxHeuristicCheckedException("Failed to locally write to cache " +
-                            "(all transaction entries will be invalidated, however there was a window when " +
-                            "entries for this transaction were visible to others): " + this, ex);
-
-                        if (isNodeStopping) {
-                            U.warn(log, "Failed to commit transaction, node is stopping [tx=" + this +
-                                ", err=" + ex + ']');
-                        }
-                        else if (hasInvalidEnvironmentIssue) {
-                            U.warn(log, "Failed to commit transaction, node is in invalid state and will be stopped [tx=" + this +
-                                ", err=" + ex + ']');
+                            // Break out of while loop.
+                            break;
                         }
-                        else
-                            U.error(log, "Commit failed.", err0);
-
-                        COMMIT_ERR_UPD.compareAndSet(this, null, err0);
+                        // If entry cached within transaction got removed.
+                        catch (GridCacheEntryRemovedException ignored) {
+                            if (log.isDebugEnabled())
+                                log.debug("Got removed entry during transaction commit (will retry): " + txEntry);
 
-                        state(UNKNOWN);
-
-                        if (hasInvalidEnvironmentIssue)
-                            cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, ex));
-                        else if (!isNodeStopping) { // Skip fair uncommit in case of node stopping or invalidation.
-                            try {
-                                // Courtesy to minimize damage.
-                                uncommit();
-                            }
-                            catch (Throwable ex1) {
-                                U.error(log, "Failed to uncommit transaction: " + this, ex1);
-
-                                if (ex1 instanceof Error)
-                                    throw ex1;
-                            }
+                            txEntry.cached(entryEx(cacheCtx, txEntry.txKey(), topologyVersion()));
                         }
-
-                        if (ex instanceof Error)
-                            throw ex;
-
-                        throw err0;
                     }
+
                 }
 
                 // Apply cache sizes only for primary nodes. Update counters were applied on prepare state.
@@ -988,11 +940,32 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
                 if (ptr != null && !cctx.tm().logTxRecords())
                     cctx.wal().flush(ptr, false);
             }
-            catch (StorageException e) {
-                err = e;
+            catch (Throwable ex) {
+                // We are about to initiate transaction rollback when tx has started to committing.
+                // Need to remove version from committed list.
+                cctx.tm().removeCommittedTx(this);
+
+                if (X.hasCause(ex, NodeStoppingException.class)) {
+                    U.warn(log, "Failed to commit transaction, node is stopping [tx=" + CU.txString(this) +
+                        ", err=" + ex + ']');
+
+                    return;
+                }
+
+                err = heuristicException(ex);
+
+                COMMIT_ERR_UPD.compareAndSet(this, null, err);
+
+                state(UNKNOWN);
+
+                try {
+                    uncommit();
+                }
+                catch (Throwable e) {
+                    err.addSuppressed(e);
+                }
 
-                throw new IgniteCheckedException("Failed to log transaction record " +
-                    "(transaction will be rolled back): " + this, e);
+                throw err;
             }
             finally {
                 cctx.database().checkpointReadUnlock();

http://git-wip-us.apache.org/repos/asf/ignite/blob/5eb871e1/modules/core/src/main/java/org/apache/ignite/internal/processors/failure/FailureProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/failure/FailureProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/failure/FailureProcessor.java
index f31f0e9..ceeb4e1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/failure/FailureProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/failure/FailureProcessor.java
@@ -24,6 +24,7 @@ import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.failure.FailureContext;
 import org.apache.ignite.failure.FailureHandler;
+import org.apache.ignite.failure.NoOpFailureHandler;
 import org.apache.ignite.failure.StopNodeOrHaltFailureHandler;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.processors.GridProcessorAdapter;
@@ -79,6 +80,13 @@ public class FailureProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * @return @{code True} if a node will be stopped by current handler in near time.
+     */
+    public boolean nodeStopping() {
+        return failureCtx != null && !(hnd instanceof NoOpFailureHandler);
+    }
+
+    /**
      * This method is used to initialize local failure handler if {@link IgniteConfiguration} don't contain configured one.
      *
      * @return Default {@link FailureHandler} implementation.

http://git-wip-us.apache.org/repos/asf/ignite/blob/5eb871e1/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
index a7e6e8c..f68ecd6 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
@@ -41,7 +41,6 @@ import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.timeout.GridSpiTimeoutObject;
 import org.apache.ignite.internal.util.IgniteExceptionRegistry;
 import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.SB;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiPredicate;

http://git-wip-us.apache.org/repos/asf/ignite/blob/5eb871e1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java
index facea69..e69aff8 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java
@@ -256,7 +256,7 @@ public abstract class GridCacheAbstractSelfTest extends GridCommonAbstractTest {
             cfg.setIndexedTypes(idxTypes);
 
         if (cacheMode() == PARTITIONED)
-            cfg.setBackups(1);
+            cfg.setBackups(backups());
 
         return cfg;
     }
@@ -362,6 +362,13 @@ public abstract class GridCacheAbstractSelfTest extends GridCommonAbstractTest {
     }
 
     /**
+     * @return Backups.
+     */
+    protected int backups() {
+        return 1;
+    }
+
+    /**
      * @param idx Index of grid.
      * @return Default cache.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/5eb871e1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQuerySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQuerySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQuerySelfTest.java
index 5f2e2ed..3e59c2a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQuerySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQuerySelfTest.java
@@ -26,12 +26,10 @@ import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.concurrent.Callable;
 import javax.cache.Cache;
-import junit.framework.TestCase;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.IgniteTransactions;
-import org.apache.ignite.Ignition;
 import org.apache.ignite.binary.BinaryObject;
 import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.query.QueryCursor;
@@ -47,6 +45,7 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
 import org.apache.ignite.spi.indexing.IndexingQueryFilter;
 import org.apache.ignite.spi.indexing.IndexingSpi;
 import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.apache.ignite.transactions.Transaction;
 import org.apache.ignite.transactions.TransactionConcurrency;
 import org.apache.ignite.transactions.TransactionIsolation;
@@ -57,25 +56,19 @@ import org.jetbrains.annotations.Nullable;
 /**
  * Indexing Spi query only test
  */
-public class IndexingSpiQuerySelfTest extends TestCase {
-    public static final String CACHE_NAME = "test-cache";
+public class IndexingSpiQuerySelfTest extends GridCommonAbstractTest {
+    private IndexingSpi indexingSpi;
 
     /** {@inheritDoc} */
-    @Override public void tearDown() throws Exception {
-        Ignition.stopAll(true);
-    }
-
-    /**
-     * @return Configuration.
-     */
-    protected IgniteConfiguration configuration() {
-        IgniteConfiguration cfg = new IgniteConfiguration();
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
 
         TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
         TcpDiscoverySpi disco = new TcpDiscoverySpi();
 
         disco.setIpFinder(ipFinder);
 
+        cfg.setIndexingSpi(indexingSpi);
         cfg.setDiscoverySpi(disco);
 
         return cfg;
@@ -86,17 +79,22 @@ public class IndexingSpiQuerySelfTest extends TestCase {
         return new CacheConfiguration<>(cacheName);
     }
 
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+    }
+
     /**
      * @throws Exception If failed.
      */
     public void testSimpleIndexingSpi() throws Exception {
-        IgniteConfiguration cfg = configuration();
-
-        cfg.setIndexingSpi(new MyIndexingSpi());
+        indexingSpi = new MyIndexingSpi();
 
-        Ignite ignite = Ignition.start(cfg);
+        Ignite ignite = startGrid(0);
 
-        CacheConfiguration<Integer, Integer> ccfg = cacheConfiguration(CACHE_NAME);
+        CacheConfiguration<Integer, Integer> ccfg = cacheConfiguration(DEFAULT_CACHE_NAME);
 
         IgniteCache<Integer, Integer> cache = ignite.createCache(ccfg);
 
@@ -113,13 +111,11 @@ public class IndexingSpiQuerySelfTest extends TestCase {
      * @throws Exception If failed.
      */
     public void testIndexingSpiWithDisabledQueryProcessor() throws Exception {
-        IgniteConfiguration cfg = configuration();
-
-        cfg.setIndexingSpi(new MyIndexingSpi());
+        indexingSpi = new MyIndexingSpi();
 
-        Ignite ignite = Ignition.start(cfg);
+        Ignite ignite = startGrid(0);
 
-        CacheConfiguration<Integer, Integer> ccfg = cacheConfiguration(CACHE_NAME);
+        CacheConfiguration<Integer, Integer> ccfg = cacheConfiguration(DEFAULT_CACHE_NAME);
 
         IgniteCache<Integer, Integer> cache = ignite.createCache(ccfg);
 
@@ -136,13 +132,11 @@ public class IndexingSpiQuerySelfTest extends TestCase {
      * @throws Exception If failed.
      */
     public void testBinaryIndexingSpi() throws Exception {
-        IgniteConfiguration cfg = configuration();
+        indexingSpi = new MyBinaryIndexingSpi();
 
-        cfg.setIndexingSpi(new MyBinaryIndexingSpi());
+        Ignite ignite = startGrid(0);
 
-        Ignite ignite = Ignition.start(cfg);
-
-        CacheConfiguration<PersonKey, Person> ccfg = cacheConfiguration(CACHE_NAME);
+        CacheConfiguration<PersonKey, Person> ccfg = cacheConfiguration(DEFAULT_CACHE_NAME);
 
         IgniteCache<PersonKey, Person> cache = ignite.createCache(ccfg);
 
@@ -168,13 +162,11 @@ public class IndexingSpiQuerySelfTest extends TestCase {
     public void testNonBinaryIndexingSpi() throws Exception {
         System.setProperty(IgniteSystemProperties.IGNITE_UNWRAP_BINARY_FOR_INDEXING_SPI, "true");
 
-        IgniteConfiguration cfg = configuration();
-
-        cfg.setIndexingSpi(new MyIndexingSpi());
+        indexingSpi = new MyIndexingSpi();
 
-        Ignite ignite = Ignition.start(cfg);
+        Ignite ignite = startGrid(0);
 
-        CacheConfiguration<PersonKey, Person> ccfg = cacheConfiguration(CACHE_NAME);
+        CacheConfiguration<PersonKey, Person> ccfg = cacheConfiguration(DEFAULT_CACHE_NAME);
 
         IgniteCache<PersonKey, Person> cache = ignite.createCache(ccfg);
 
@@ -198,13 +190,11 @@ public class IndexingSpiQuerySelfTest extends TestCase {
      */
     @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
     public void testIndexingSpiFailure() throws Exception {
-        IgniteConfiguration cfg = configuration();
-
-        cfg.setIndexingSpi(new MyBrokenIndexingSpi());
+        indexingSpi = new MyBrokenIndexingSpi();
 
-        Ignite ignite = Ignition.start(cfg);
+        Ignite ignite = startGrid(0);
 
-        CacheConfiguration<Integer, Integer> ccfg = cacheConfiguration(CACHE_NAME);
+        CacheConfiguration<Integer, Integer> ccfg = cacheConfiguration(DEFAULT_CACHE_NAME);
 
         ccfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/5eb871e1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQueryTxSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQueryTxSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQueryTxSelfTest.java
index e59deed..ca80b13 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQueryTxSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQueryTxSelfTest.java
@@ -17,6 +17,11 @@
 
 package org.apache.ignite.internal.processors.cache.query;
 
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.cache.Cache;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteTransactions;
 import org.apache.ignite.cache.CacheAtomicityMode;
@@ -37,61 +42,64 @@ import org.apache.ignite.transactions.TransactionIsolation;
 import org.apache.ignite.transactions.TransactionState;
 import org.jetbrains.annotations.Nullable;
 
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.concurrent.Callable;
-import java.util.concurrent.atomic.AtomicInteger;
-import javax.cache.Cache;
-
 /**
  * Indexing Spi transactional query test
  */
 public class IndexingSpiQueryTxSelfTest extends GridCacheAbstractSelfTest {
-    /** */
-    private static AtomicInteger cnt;
-
     /** {@inheritDoc} */
     @Override protected int gridCount() {
         return 4;
     }
 
     /** {@inheritDoc} */
-    @Override protected void beforeTestsStarted() throws Exception {
-        cnt = new AtomicInteger();
-
-        super.beforeTestsStarted();
-    }
-
-    /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
     @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
         ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setForceServerMode(true);
 
-        if (cnt.getAndIncrement() == 0)
-            cfg.setClientMode(true);
-        else {
-            cfg.setIndexingSpi(new MyBrokenIndexingSpi());
+        cfg.setClientMode("client".equals(igniteInstanceName));
+        cfg.setIndexingSpi(new MyBrokenIndexingSpi());
 
-            CacheConfiguration ccfg = cacheConfiguration(igniteInstanceName);
-            ccfg.setName("test-cache");
-            ccfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
+        CacheConfiguration ccfg = cacheConfiguration(igniteInstanceName);
+        ccfg.setName(DEFAULT_CACHE_NAME);
+        ccfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
 
-            ccfg.setIndexedTypes(Integer.class, Integer.class);
+        ccfg.setIndexedTypes(Integer.class, Integer.class);
+
+        cfg.setCacheConfiguration(ccfg);
 
-            cfg.setCacheConfiguration(ccfg);
-        }
         return cfg;
     }
 
+    /** */
+    public void testIndexingSpiWithTxClient() throws Exception {
+        IgniteEx client = startGrid("client");
+
+        assertNotNull(client.cache(DEFAULT_CACHE_NAME));
+
+        doTestIndexingSpiWithTx(client, 0);
+    }
+
+    /** */
+    public void testIndexingSpiWithTxLocal() throws Exception {
+        IgniteEx ignite = (IgniteEx)primaryNode(0, DEFAULT_CACHE_NAME);
+
+        doTestIndexingSpiWithTx(ignite, 0);
+    }
+
+    /** */
+    public void testIndexingSpiWithTxNotLocal() throws Exception {
+        IgniteEx ignite = (IgniteEx)primaryNode(0, DEFAULT_CACHE_NAME);
+
+        doTestIndexingSpiWithTx(ignite, 1);
+    }
+
     /**
      * @throws Exception If failed.
      */
     @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
-    public void testIndexingSpiWithTx() throws Exception {
-        IgniteEx ignite = grid(0);
-
-        final IgniteCache<Integer, Integer> cache = ignite.cache("test-cache");
+    private void doTestIndexingSpiWithTx(IgniteEx ignite, int key) throws Exception {
+        final IgniteCache<Integer, Integer> cache = ignite.cache(DEFAULT_CACHE_NAME);
 
         final IgniteTransactions txs = ignite.transactions();
 
@@ -104,7 +112,7 @@ public class IndexingSpiQueryTxSelfTest extends GridCacheAbstractSelfTest {
                         Transaction tx;
 
                         try (Transaction tx0 = tx = txs.txStart(concurrency, isolation)) {
-                            cache.put(1, 1);
+                            cache.put(key, key);
 
                             tx0.commit();
                         }
@@ -114,6 +122,8 @@ public class IndexingSpiQueryTxSelfTest extends GridCacheAbstractSelfTest {
                         return null;
                     }
                 }, IgniteTxHeuristicCheckedException.class);
+
+                checkFutures();
             }
         }
     }
@@ -135,7 +145,7 @@ public class IndexingSpiQueryTxSelfTest extends GridCacheAbstractSelfTest {
         /** {@inheritDoc} */
         @Override public Iterator<Cache.Entry<?, ?>> query(@Nullable String cacheName, Collection<Object> params,
             @Nullable IndexingQueryFilter filters) throws IgniteSpiException {
-           return null;
+            return null;
         }
 
         /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/5eb871e1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/AbstractTransactionIntergrityTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/AbstractTransactionIntergrityTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/AbstractTransactionIntergrityTest.java
index fe27e6e..01db747 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/AbstractTransactionIntergrityTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/AbstractTransactionIntergrityTest.java
@@ -33,6 +33,7 @@ import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.Affinity;
 import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
 import org.apache.ignite.cache.query.annotations.QuerySqlField;
 import org.apache.ignite.cluster.ClusterNode;
@@ -40,10 +41,10 @@ import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.DataRegionConfiguration;
 import org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.WALMode;
 import org.apache.ignite.failure.FailureHandler;
 import org.apache.ignite.failure.StopNodeFailureHandler;
 import org.apache.ignite.internal.IgniteEx;
-import org.apache.ignite.internal.TestRecordingCommunicationSpi;
 import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
@@ -73,7 +74,7 @@ public class AbstractTransactionIntergrityTest extends GridCommonAbstractTest {
     private static final int DFLT_ACCOUNTS_CNT = 32;
 
     /** Count of threads and caches. */
-    private static final int DFLT_TX_THREADS_CNT = 20;
+    private static final int DFLT_TX_THREADS_CNT = Runtime.getRuntime().availableProcessors();
 
     /** Count of nodes to start. */
     private static final int DFLT_NODES_CNT = 3;
@@ -126,16 +127,6 @@ public class AbstractTransactionIntergrityTest extends GridCommonAbstractTest {
         return true;
     }
 
-    /**
-     * @return Flag enables cross-node transactions,
-     *         when primary partitions participating in transaction spreaded across several cluster nodes.
-     */
-    protected boolean crossNodeTransactions() {
-        // Commit error during cross node transactions breaks transaction integrity
-        // TODO: https://issues.apache.org/jira/browse/IGNITE-9086
-        return false;
-    }
-
     /** {@inheritDoc} */
     @Override protected FailureHandler getFailureHandler(String igniteInstanceName) {
         return new StopNodeFailureHandler();
@@ -148,14 +139,15 @@ public class AbstractTransactionIntergrityTest extends GridCommonAbstractTest {
         cfg.setConsistentId(name);
 
         ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
-        cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
-        cfg.setLocalHost("127.0.0.1");
 
         cfg.setDataStorageConfiguration(new DataStorageConfiguration()
             .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
-                .setMaxSize(256 * 1024 * 1024)
-                .setPersistenceEnabled(persistent()))
-        );
+                    .setPersistenceEnabled(persistent())
+                    .setMaxSize(50 * 1024 * 1024)
+            )
+            .setWalSegmentSize(16 * 1024 * 1024)
+            .setPageSize(1024)
+            .setWalMode(WALMode.LOG_ONLY));
 
         CacheConfiguration[] cacheConfigurations = new CacheConfiguration[txThreadsCount()];
 
@@ -178,6 +170,8 @@ public class AbstractTransactionIntergrityTest extends GridCommonAbstractTest {
 
         cfg.setCacheConfiguration(cacheConfigurations);
 
+        cfg.setFailureDetectionTimeout(30_000);
+
         return cfg;
     }
 
@@ -219,8 +213,11 @@ public class AbstractTransactionIntergrityTest extends GridCommonAbstractTest {
 
     /**
      * Test transfer amount.
+     *
+     * @param failoverScenario Scenario.
+     * @param colocatedAccounts {@code True} to use colocated on same primary node accounts.
      */
-    public void doTestTransferAmount(FailoverScenario failoverScenario) throws Exception {
+    public void doTestTransferAmount(FailoverScenario failoverScenario, boolean colocatedAccounts) throws Exception {
         failoverScenario.beforeNodesStarted();
 
         //given: started some nodes with client.
@@ -230,26 +227,26 @@ public class AbstractTransactionIntergrityTest extends GridCommonAbstractTest {
 
         igniteClient.cluster().active(true);
 
-        int[] initAmount = new int[txThreadsCount()];
+        int[] initAmounts = new int[txThreadsCount()];
         completedTxs = new ConcurrentLinkedHashMap[txThreadsCount()];
 
         //and: fill all accounts on all caches and calculate total amount for every cache.
         for (int cachePrefixIdx = 0; cachePrefixIdx < txThreadsCount(); cachePrefixIdx++) {
             IgniteCache<Integer, AccountState> cache = igniteClient.getOrCreateCache(cacheName(cachePrefixIdx));
 
-            AtomicInteger coinsCounter = new AtomicInteger();
+            AtomicInteger coinsCntr = new AtomicInteger();
 
             try (Transaction tx = igniteClient.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
                 for (int accountId = 0; accountId < accountsCount(); accountId++) {
-                    Set<Integer> initialAmount = generateCoins(coinsCounter, 5);
+                    Set<Integer> initAmount = generateCoins(coinsCntr, 5);
 
-                    cache.put(accountId, new AccountState(accountId, tx.xid(), initialAmount));
+                    cache.put(accountId, new AccountState(accountId, tx.xid(), initAmount));
                 }
 
                 tx.commit();
             }
 
-            initAmount[cachePrefixIdx] = coinsCounter.get();
+            initAmounts[cachePrefixIdx] = coinsCntr.get();
             completedTxs[cachePrefixIdx] = new ConcurrentLinkedHashMap();
         }
 
@@ -259,7 +256,8 @@ public class AbstractTransactionIntergrityTest extends GridCommonAbstractTest {
         ArrayList<Thread> transferThreads = new ArrayList<>();
 
         for (int i = 0; i < txThreadsCount(); i++) {
-            transferThreads.add(new TransferAmountTxThread(firstTransactionDone, igniteClient, cacheName(i), i));
+            transferThreads.add(new TransferAmountTxThread(firstTransactionDone,
+                igniteClient, cacheName(i), i, colocatedAccounts));
 
             transferThreads.get(i).start();
         }
@@ -268,13 +266,12 @@ public class AbstractTransactionIntergrityTest extends GridCommonAbstractTest {
 
         failoverScenario.afterFirstTransaction();
 
-        for (Thread thread : transferThreads) {
+        for (Thread thread : transferThreads)
             thread.join();
-        }
 
         failoverScenario.afterTransactionsFinished();
 
-        consistencyCheck(initAmount);
+        consistencyCheck(initAmounts);
     }
 
     /**
@@ -385,11 +382,11 @@ public class AbstractTransactionIntergrityTest extends GridCommonAbstractTest {
 
         /**
          * @param txId Transaction id.
-         * @param coinsToRemove Coins to remove from current account.
+         * @param coinsToRmv Coins to remove from current account.
          * @return Account state with removed coins.
          */
-        public AccountState removeCoins(IgniteUuid txId, Set<Integer> coinsToRemove) {
-            return new AccountState(accId, txId, Sets.difference(coins, coinsToRemove).immutableCopy());
+        public AccountState removeCoins(IgniteUuid txId, Set<Integer> coinsToRmv) {
+            return new AccountState(accId, txId, Sets.difference(coins, coinsToRmv).immutableCopy());
         }
 
         /** {@inheritDoc} */
@@ -418,11 +415,11 @@ public class AbstractTransactionIntergrityTest extends GridCommonAbstractTest {
     /**
      * @param coinsNum Coins number.
      */
-    private Set<Integer> generateCoins(AtomicInteger coinsCounter, int coinsNum) {
+    private Set<Integer> generateCoins(AtomicInteger coinsCntr, int coinsNum) {
         Set<Integer> res = new HashSet<>();
 
         for (int i = 0; i < coinsNum; i++)
-            res.add(coinsCounter.incrementAndGet());
+            res.add(coinsCntr.incrementAndGet());
 
         return res;
     }
@@ -479,23 +476,35 @@ public class AbstractTransactionIntergrityTest extends GridCommonAbstractTest {
     private class TransferAmountTxThread extends Thread {
         /** */
         private CountDownLatch firstTransactionLatch;
+
         /** */
         private IgniteEx ignite;
+
         /** */
         private String cacheName;
+
         /** */
-        private int txIndex;
+        private int workerIdx;
+
         /** */
         private Random random = new Random();
 
+        /** */
+        private final boolean colocatedAccounts;
+
         /**
          * @param ignite Ignite.
          */
-        private TransferAmountTxThread(CountDownLatch firstTransactionLatch, final IgniteEx ignite, String cacheName, int txIndex) {
+        private TransferAmountTxThread(CountDownLatch firstTransactionLatch,
+            final IgniteEx ignite,
+            String cacheName,
+            int workerIdx,
+            boolean colocatedAccounts) {
             this.firstTransactionLatch = firstTransactionLatch;
             this.ignite = ignite;
             this.cacheName = cacheName;
-            this.txIndex = txIndex;
+            this.workerIdx = workerIdx;
+            this.colocatedAccounts = colocatedAccounts;
         }
 
         /** {@inheritDoc} */
@@ -514,7 +523,6 @@ public class AbstractTransactionIntergrityTest extends GridCommonAbstractTest {
         /**
          * @throws IgniteException if fails
          */
-        @SuppressWarnings("unchecked")
         private void updateInTransaction(IgniteCache<Integer, AccountState> cache) throws IgniteException {
             int accIdFrom;
             int accIdTo;
@@ -526,11 +534,16 @@ public class AbstractTransactionIntergrityTest extends GridCommonAbstractTest {
                 if (accIdFrom == accIdTo)
                     continue;
 
-                ClusterNode primaryForAccFrom = ignite.cachex(cacheName).affinity().mapKeyToNode(accIdFrom);
-                ClusterNode primaryForAccTo = ignite.cachex(cacheName).affinity().mapKeyToNode(accIdTo);
+                Affinity<Object> affinity = ignite.affinity(cacheName);
+
+                ClusterNode primaryForAccFrom = affinity.mapKeyToNode(accIdFrom);
+                assertNotNull(primaryForAccFrom);
+
+                ClusterNode primaryForAccTo = affinity.mapKeyToNode(accIdTo);
+                assertNotNull(primaryForAccTo);
 
                 // Allows only transaction between accounts that primary on the same node if corresponding flag is enabled.
-                if (!crossNodeTransactions() && !primaryForAccFrom.id().equals(primaryForAccTo.id()))
+                if (colocatedAccounts && !primaryForAccFrom.id().equals(primaryForAccTo.id()))
                     continue;
 
                 break;
@@ -541,7 +554,10 @@ public class AbstractTransactionIntergrityTest extends GridCommonAbstractTest {
 
             try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
                 acctFrom = cache.get(accIdFrom);
+                assertNotNull(acctFrom);
+
                 acctTo = cache.get(accIdTo);
+                assertNotNull(acctTo);
 
                 Set<Integer> coinsToTransfer = acctFrom.coinsToTransfer(random);
 
@@ -553,23 +569,8 @@ public class AbstractTransactionIntergrityTest extends GridCommonAbstractTest {
 
                 tx.commit();
 
-                completedTxs[txIndex].put(tx.xid(), new TxState(acctFrom, acctTo, nextFrom, nextTo, coinsToTransfer));
-            }
-        }
-
-        /**
-         * @param curr current
-         * @return random value
-         */
-        private long getNextAccountId(long curr) {
-            long randomVal;
-
-            do {
-                randomVal = random.nextInt(accountsCount());
+                completedTxs[workerIdx].put(tx.xid(), new TxState(acctFrom, acctTo, nextFrom, nextTo, coinsToTransfer));
             }
-            while (curr == randomVal);
-
-            return randomVal;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/5eb871e1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TransactionIntegrityWithPrimaryIndexCorruptionTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TransactionIntegrityWithPrimaryIndexCorruptionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TransactionIntegrityWithPrimaryIndexCorruptionTest.java
index 3260607..473eaf5 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TransactionIntegrityWithPrimaryIndexCorruptionTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TransactionIntegrityWithPrimaryIndexCorruptionTest.java
@@ -17,20 +17,26 @@
 
 package org.apache.ignite.internal.processors.cache.transactions;
 
+import java.util.Collection;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.function.BiFunction;
+import java.util.function.Consumer;
 import java.util.function.Supplier;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteIllegalStateException;
 import org.apache.ignite.Ignition;
-import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteEx;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
 import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
 import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
 import org.apache.ignite.internal.processors.cache.persistence.tree.util.PageHandler;
 import org.apache.ignite.internal.processors.cache.tree.SearchRow;
 import org.apache.ignite.testframework.GridTestUtils;
 
+import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING;
+
 /**
  * Test cases that check transaction data integrity after transaction commit failed.
  */
@@ -45,81 +51,96 @@ public class TransactionIntegrityWithPrimaryIndexCorruptionTest extends Abstract
         super.afterTest();
     }
 
-    /** {@inheritDoc} */
-    @Override protected long getTestTimeout() {
-        return 60 * 1000L;
+    /** */
+    public void testPrimaryIndexCorruptionDuringCommitPrimaryColocatedThrowsError() throws Exception {
+        doTestTransferAmount0(true, true, () -> new AssertionError("Test"));
     }
 
-    /**
-     * Throws a test {@link AssertionError} during tx commit from {@link BPlusTree} and checks after that data is consistent.
-     */
-    public void testPrimaryIndexCorruptionDuringCommitOnPrimaryNode1() throws Exception {
-        doTestTransferAmount(new IndexCorruptionFailoverScenario(
-            true,
-            (hnd, tree) -> hnd instanceof BPlusTree.Search,
-            failoverPredicate(true, () -> new AssertionError("Test")))
-        );
+    /** */
+    public void testPrimaryIndexCorruptionDuringCommitPrimaryColocatedThrowsUnchecked() throws Exception {
+        doTestTransferAmount0(true, true, () -> new RuntimeException("Test"));
     }
 
-    /**
-     * Throws a test {@link RuntimeException} during tx commit from {@link BPlusTree} and checks after that data is consistent.
-     */
-    public void testPrimaryIndexCorruptionDuringCommitOnPrimaryNode2() throws Exception {
-        doTestTransferAmount(new IndexCorruptionFailoverScenario(
-            true,
-            (hnd, tree) -> hnd instanceof BPlusTree.Search,
-            failoverPredicate(true, () -> new RuntimeException("Test")))
-        );
+    /** */
+    public void testPrimaryIndexCorruptionDuringCommitPrimaryColocatedThrowsChecked() throws Exception {
+        doTestTransferAmount0(true, true, () -> new IgniteCheckedException("Test"));
     }
 
-    /**
-     * Throws a test {@link AssertionError} during tx commit from {@link BPlusTree} and checks after that data is consistent.
-     */
-    public void testPrimaryIndexCorruptionDuringCommitOnBackupNode() throws Exception {
-        doTestTransferAmount(new IndexCorruptionFailoverScenario(
-            true,
-            (hnd, tree) -> hnd instanceof BPlusTree.Search,
-            failoverPredicate(false, () -> new AssertionError("Test")))
-        );
+    /** */
+    public void testPrimaryIndexCorruptionDuringCommitPrimaryNonColocatedThrowsError() throws Exception {
+        doTestTransferAmount0(false, true, () -> new AssertionError("Test"));
     }
 
-    /**
-     * Throws a test {@link IgniteCheckedException} during tx commit from {@link BPlusTree} and checks after that data is consistent.
-     */
-    public void testPrimaryIndexCorruptionDuringCommitOnPrimaryNode3() throws Exception {
-        fail("https://issues.apache.org/jira/browse/IGNITE-9082");
+    /** */
+    public void testPrimaryIndexCorruptionDuringCommitPrimaryNonColocatedThrowsUnchecked() throws Exception {
+        doTestTransferAmount0(false, true, () -> new RuntimeException("Test"));
+    }
 
-        doTestTransferAmount(new IndexCorruptionFailoverScenario(
-            false,
-            (hnd, tree) -> hnd instanceof BPlusTree.Search,
-            failoverPredicate(true, () -> new IgniteCheckedException("Test")))
-        );
+    /** */
+    public void testPrimaryIndexCorruptionDuringCommitPrimaryNonColocatedThrowsChecked() throws Exception {
+        doTestTransferAmount0(false, true, () -> new IgniteCheckedException("Test"));
+    }
+
+    /** */
+    public void testPrimaryIndexCorruptionDuringCommitBackupColocatedThrowsError() throws Exception {
+        doTestTransferAmount0(true, false, () -> new AssertionError("Test"));
+    }
+
+    /** */
+    public void testPrimaryIndexCorruptionDuringCommitBackupColocatedThrowsUnchecked() throws Exception {
+        doTestTransferAmount0(true, false, () -> new RuntimeException("Test"));
+    }
+
+    /** */
+    public void testPrimaryIndexCorruptionDuringCommitBackupColocatedThrowsChecked() throws Exception {
+        doTestTransferAmount0(true, false, () -> new IgniteCheckedException("Test"));
+    }
+
+    /** */
+    public void testPrimaryIndexCorruptionDuringCommitBackupNonColocatedThrowsError() throws Exception {
+        doTestTransferAmount0(false, false, () -> new AssertionError("Test"));
+    }
+
+    /** */
+    public void testPrimaryIndexCorruptionDuringCommitBackupNonColocatedThrowsUnchecked() throws Exception {
+        doTestTransferAmount0(false, false, () -> new RuntimeException("Test"));
+    }
+
+    /** */
+    public void testPrimaryIndexCorruptionDuringCommitBackupNonColocatedThrowsChecked() throws Exception {
+        doTestTransferAmount0(false, false, () -> new IgniteCheckedException("Test"));
     }
 
     /**
      * Creates failover predicate which generates error during transaction commmit.
      *
-     * @param failOnPrimary If {@code true} index should be failed on transaction primary node.
+     * @param failOnPrimary If {@code true} index should be failed on transaction primary node, otherwise on backup.
      * @param errorSupplier Supplier to create various errors.
+     * @param errorConsumer Consumer to track unexpected errors while committing.
      */
     private BiFunction<IgniteEx, SearchRow, Throwable> failoverPredicate(
         boolean failOnPrimary,
-        Supplier<Throwable> errorSupplier
+        Supplier<Throwable> errorSupplier,
+        Consumer<Throwable> errorConsumer
     ) {
         return (ignite, row) -> {
-            int cacheId = row.cacheId();
-            int partId = row.key().partition();
-
-            final ClusterNode locNode = ignite.localNode();
-            final AffinityTopologyVersion curTopVer = ignite.context().discovery().topologyVersionEx();
-
-            // Throw exception if current node is primary for given row.
-            return ignite.cachesx(c -> c.context().cacheId() == cacheId)
-                .stream()
-                .filter(c -> c.context().affinity().primaryByPartition(locNode, partId, curTopVer) == failOnPrimary)
-                .map(c -> errorSupplier.get())
-                .findFirst()
-                .orElse(null);
+            try {
+                int cacheId = row.cacheId();
+                int partId = row.key().partition();
+
+                GridDhtPartitionTopology top = ignite.context().cache().cacheGroup(cacheId).topology();
+
+                GridDhtLocalPartition part = top.localPartition(partId);
+
+                assertTrue("Illegal partition state for mapped tx: " + part, part != null && part.state() == OWNING);
+
+                return part.primary(top.readyTopologyVersion()) == failOnPrimary ? errorSupplier.get() : null;
+            }
+            catch (Throwable e) {
+                errorConsumer.accept(e);
+
+                throw e;
+            }
         };
     }
 
@@ -130,68 +151,68 @@ public class TransactionIntegrityWithPrimaryIndexCorruptionTest extends Abstract
         /** Failed node index. */
         static final int failedNodeIdx = 1;
 
-        /** Is node stopping expected after failover. */
-        private final boolean nodeStoppingExpected;
-
-        /** Predicate that will choose an instance of {@link BPlusTree} and page operation
-         * to make further failover in this tree using {@link #failoverPredicate}. */
-        private final BiFunction<PageHandler, BPlusTree, Boolean> treeCorruptionPredicate;
+        /**
+         * Predicate that will choose an instance of {@link BPlusTree} and page operation to make further failover in
+         * this tree using {@link #failoverPred}.
+         */
+        private final BiFunction<PageHandler, BPlusTree, Boolean> treeCorruptionPred;
 
         /** Function that may return error during row insertion into {@link BPlusTree}. */
-        private final BiFunction<IgniteEx, SearchRow, Throwable> failoverPredicate;
+        private final BiFunction<IgniteEx, SearchRow, Throwable> failoverPred;
 
         /**
-         * @param nodeStoppingExpected Node stopping expected.
-         * @param treeCorruptionPredicate Tree corruption predicate.
-         * @param failoverPredicate Failover predicate.
+         * @param treeCorruptionPred Tree corruption predicate.
+         * @param failoverPred Failover predicate.
          */
         IndexCorruptionFailoverScenario(
-            boolean nodeStoppingExpected,
-            BiFunction<PageHandler, BPlusTree, Boolean> treeCorruptionPredicate,
-            BiFunction<IgniteEx, SearchRow, Throwable> failoverPredicate
+            BiFunction<PageHandler, BPlusTree, Boolean> treeCorruptionPred,
+            BiFunction<IgniteEx, SearchRow, Throwable> failoverPred
         ) {
-            this.nodeStoppingExpected = nodeStoppingExpected;
-            this.treeCorruptionPredicate = treeCorruptionPredicate;
-            this.failoverPredicate = failoverPredicate;
+            this.treeCorruptionPred = treeCorruptionPred;
+            this.failoverPred = failoverPred;
         }
 
         /** {@inheritDoc} */
         @Override public void beforeNodesStarted() {
             BPlusTree.pageHndWrapper = (tree, hnd) -> {
-                final IgniteEx locIgnite = (IgniteEx) Ignition.localIgnite();
+                final IgniteEx locIgnite = (IgniteEx)Ignition.localIgnite();
 
-                if (!locIgnite.name().endsWith(String.valueOf(failedNodeIdx)))
+                if (getTestIgniteInstanceIndex(locIgnite.name()) != failedNodeIdx)
                     return hnd;
 
-                if (treeCorruptionPredicate.apply(hnd, tree)) {
-                    log.info("Created corrupted tree handler for -> " + hnd + " " + tree);
+                if (treeCorruptionPred.apply(hnd, tree)) {
+                    log.info("Created corrupted tree handler [nodeOrder=" + locIgnite.localNode().order() + ", hnd=" + hnd +
+                        ", tree=" + tree + ']');
 
-                    PageHandler<Object, BPlusTree.Result> delegate = (PageHandler<Object, BPlusTree.Result>) hnd;
+                    PageHandler<Object, BPlusTree.Result> delegate = (PageHandler<Object, BPlusTree.Result>)hnd;
 
                     return new PageHandler<BPlusTree.Get, BPlusTree.Result>() {
-                        @Override public BPlusTree.Result run(int cacheId, long pageId, long page, long pageAddr, PageIO io, Boolean walPlc, BPlusTree.Get arg, int lvl) throws IgniteCheckedException {
-                            log.info("Invoked " + " " + cacheId + " " + arg.toString() + " for BTree (" + corruptionEnabled + ") -> " + arg.row() + " / " + arg.row().getClass());
+                        @Override public BPlusTree.Result run(int cacheId, long pageId, long page, long pageAddr, PageIO io,
+                            Boolean walPlc, BPlusTree.Get arg, int lvl) throws IgniteCheckedException {
+                            log.info("Invoked [cachedId=" + cacheId + ", hnd=" + arg.toString() +
+                                ", corruption=" + corruptionEnabled + ", row=" + arg.row() + ", rowCls=" + arg.row().getClass() + ']');
 
                             if (corruptionEnabled && (arg.row() instanceof SearchRow)) {
-                                SearchRow row = (SearchRow) arg.row();
+                                SearchRow row = (SearchRow)arg.row();
 
                                 // Store cacheId to search row explicitly, as it can be zero if there is one cache in a group.
-                                Throwable res = failoverPredicate.apply(locIgnite, new SearchRow(cacheId, row.key()));
+                                Throwable res = failoverPred.apply(locIgnite, new SearchRow(cacheId, row.key()));
 
                                 if (res != null) {
                                     if (res instanceof Error)
-                                        throw (Error) res;
+                                        throw (Error)res;
                                     else if (res instanceof RuntimeException)
-                                        throw (RuntimeException) res;
+                                        throw (RuntimeException)res;
                                     else if (res instanceof IgniteCheckedException)
-                                        throw (IgniteCheckedException) res;
+                                        throw (IgniteCheckedException)res;
                                 }
                             }
 
                             return delegate.run(cacheId, pageId, page, pageAddr, io, walPlc, arg, lvl);
                         }
 
-                        @Override public boolean releaseAfterWrite(int cacheId, long pageId, long page, long pageAddr, BPlusTree.Get g, int lvl) {
+                        @Override public boolean releaseAfterWrite(int cacheId, long pageId, long page, long pageAddr,
+                            BPlusTree.Get g, int lvl) {
                             return g.canRelease(pageId, lvl);
                         }
                     };
@@ -212,27 +233,68 @@ public class TransactionIntegrityWithPrimaryIndexCorruptionTest extends Abstract
             // Disable index corruption.
             BPlusTree.pageHndWrapper = (tree, hnd) -> hnd;
 
-            if (nodeStoppingExpected) {
-                // Wait until node with corrupted index will left cluster.
-                GridTestUtils.waitForCondition(() -> {
-                    try {
-                        grid(failedNodeIdx);
-                    }
-                    catch (IgniteIllegalStateException e) {
-                        return true;
-                    }
+            // Wait until node with corrupted index will left cluster.
+            GridTestUtils.waitForCondition(() -> {
+                try {
+                    grid(failedNodeIdx);
+                }
+                catch (IgniteIllegalStateException e) {
+                    return true;
+                }
 
-                    return false;
-                }, getTestTimeout());
+                return false;
+            }, getTestTimeout());
 
-                // Failed node should be stopped.
-                GridTestUtils.assertThrows(log, () -> grid(failedNodeIdx), IgniteIllegalStateException.class, "");
+            // Failed node should be stopped.
+            GridTestUtils.assertThrows(log, () -> grid(failedNodeIdx), IgniteIllegalStateException.class, null);
 
-                // Re-start failed node.
-                startGrid(failedNodeIdx);
+            // Re-start failed node.
+            startGrid(failedNodeIdx);
 
-                awaitPartitionMapExchange();
-            }
+            awaitPartitionMapExchange();
+        }
+    }
+
+    /**
+     * Test transfer amount with extended error recording.
+     *
+     * @param colocatedAccount Colocated account.
+     * @param failOnPrimary {@code True} if fail on primary, else on backup.
+     * @param supplier Fail reason supplier.
+     * @throws Exception If failover predicate execution is failed.
+     */
+    private void doTestTransferAmount0(boolean colocatedAccount, boolean failOnPrimary, Supplier<Throwable> supplier) throws Exception {
+        ErrorTracker errTracker = new ErrorTracker();
+
+        doTestTransferAmount(
+            new IndexCorruptionFailoverScenario(
+                (hnd, tree) -> hnd instanceof BPlusTree.Search,
+                failoverPredicate(failOnPrimary, supplier, errTracker)),
+            colocatedAccount
+        );
+
+        for (Throwable throwable : errTracker.errors())
+            log.error("Recorded error", throwable);
+
+        if (!errTracker.errors().isEmpty())
+            fail("Test run has error");
+    }
+
+    /** */
+    private static class ErrorTracker implements Consumer<Throwable> {
+        /** Queue. */
+        private final Queue<Throwable> q = new ConcurrentLinkedQueue<>();
+
+        /** {@inheritDoc} */
+        @Override public void accept(Throwable throwable) {
+            q.add(throwable);
+        }
+
+        /**
+         * @return Recorded errors.
+         */
+        public Collection<Throwable> errors() {
+            return q;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/5eb871e1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TransactionIntegrityWithSystemWorkerDeathTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TransactionIntegrityWithSystemWorkerDeathTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TransactionIntegrityWithSystemWorkerDeathTest.java
index 25aae4b..551335f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TransactionIntegrityWithSystemWorkerDeathTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TransactionIntegrityWithSystemWorkerDeathTest.java
@@ -41,9 +41,7 @@ public class TransactionIntegrityWithSystemWorkerDeathTest extends AbstractTrans
         return false;
     }
 
-    /**
-     *
-     */
+    /** */
     public void testFailoverWithDiscoWorkerTermination() throws Exception {
         doTestTransferAmount(new FailoverScenario() {
             static final int failedNodeIdx = 1;
@@ -83,7 +81,7 @@ public class TransactionIntegrityWithSystemWorkerDeathTest extends AbstractTrans
 
                 awaitPartitionMapExchange();
             }
-        });
+        }, true);
     }
 
     /**