You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/03/15 14:37:19 UTC

[1/3] ignite git commit: ignite-4768 txs

Repository: ignite
Updated Branches:
  refs/heads/ignite-4768-1 684d66c0e -> 2272fadc5


http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheTxRecoveryRollbackTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheTxRecoveryRollbackTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheTxRecoveryRollbackTest.java
new file mode 100644
index 0000000..cfe9029
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheTxRecoveryRollbackTest.java
@@ -0,0 +1,501 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.cache.Cache;
+import javax.cache.configuration.Factory;
+import javax.cache.integration.CacheLoaderException;
+import javax.cache.integration.CacheWriterException;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.store.CacheStore;
+import org.apache.ignite.cache.store.CacheStoreAdapter;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishRequest;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.resources.LoggerResource;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheRebalanceMode.ASYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+
+/**
+ *
+ */
+public class IgniteCacheTxRecoveryRollbackTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static ConcurrentHashMap<Object, Object> storeMap = new ConcurrentHashMap<>();
+
+    /** */
+    private boolean client;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setConsistentId(gridName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+        TestRecordingCommunicationSpi commSpi = new TestRecordingCommunicationSpi();
+
+        cfg.setCommunicationSpi(commSpi);
+
+        cfg.setClientMode(client);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        try {
+            for (Ignite node : G.allGrids()) {
+                Collection<IgniteInternalTx> txs = ((IgniteKernal)node).context().cache().context().tm().txs();
+
+                assertTrue("Unfinished txs [node=" + node.name() + ", txs=" + txs + ']', txs.isEmpty());
+            }
+        }
+        finally {
+            stopAllGrids();
+
+            storeMap.clear();
+
+            super.afterTest();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNearTx1Implicit() throws Exception {
+        nearTx1(null);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNearTx1Optimistic() throws Exception {
+        nearTx1(OPTIMISTIC);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNearTx1Pessimistic() throws Exception {
+        nearTx1(PESSIMISTIC);
+    }
+
+    /**
+     * Stop tx near node (client2), near cache tx on client1 is either committed
+     * by primary or invalidated.
+     *
+     * @param concurrency Tx concurrency or {@code null} for implicit transaction.
+     * @throws Exception If failed.
+     */
+    private void nearTx1(final TransactionConcurrency concurrency) throws Exception {
+        startGrids(4);
+
+        Ignite srv0 = grid(0);
+
+        final IgniteCache<Integer, Integer> srvCache = srv0.createCache(cacheConfiguration(2, false, false));
+
+        awaitPartitionMapExchange();
+
+        client = true;
+
+        Ignite client1 = startGrid(4);
+        final Ignite client2 = startGrid(5);
+
+        final Integer key = primaryKey(srv0.cache(null));
+
+        final IgniteCache<Integer, Integer> cache1 =
+            client1.createNearCache(null, new NearCacheConfiguration<Integer, Integer>());
+
+        final IgniteCache<Integer, Integer> cache2 =
+            client2.createNearCache(null, new NearCacheConfiguration<Integer, Integer>());
+
+        cache1.put(key, 1);
+
+        final Integer newVal = 2;
+
+        testSpi(client2).blockMessages(GridNearTxFinishRequest.class, srv0.name());
+
+        IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                log.info("Start put, concurrency: " + concurrency);
+
+                if (concurrency != null) {
+                    try (Transaction tx = client2.transactions().txStart(concurrency, REPEATABLE_READ)) {
+                        cache2.put(key, newVal);
+
+                        tx.commit();
+                    }
+                }
+                else
+                    cache2.put(key, newVal);
+
+                return null;
+            }
+        });
+
+        U.sleep(500);
+
+        assertFalse(fut.isDone());
+
+        testSpi(client2).waitForMessage(GridNearTxFinishRequest.class, srv0.name());
+
+        stopGrid(client2.name());
+
+        try {
+            fut.get();
+        }
+        catch (IgniteCheckedException ignore) {
+            // No-op.
+        }
+
+        GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                return newVal.equals(srvCache.get(key)) && newVal.equals(cache1.get(key));
+            }
+        }, 5000);
+
+        checkData(F.asMap(key, newVal));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNearTx2Implicit() throws Exception {
+        nearTx2(null);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNearTx2Optimistic() throws Exception {
+        nearTx2(OPTIMISTIC);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNearTx2Pessimistic() throws Exception {
+        nearTx2(PESSIMISTIC);
+    }
+
+    /**
+     * Stop both tx near node (client2) and primary node, near cache tx on client1 is invalidated.
+     *
+     * @param concurrency Tx concurrency or {@code null} for implicit transaction.
+     * @throws Exception If failed.
+     */
+    private void nearTx2(final TransactionConcurrency concurrency) throws Exception {
+        startGrids(4);
+
+        Ignite srv0 = grid(0);
+
+        srv0.createCache(cacheConfiguration(2, false, false));
+
+        awaitPartitionMapExchange();
+
+        client = true;
+
+        Ignite client1 = startGrid(4);
+        final Ignite client2 = startGrid(5);
+
+        final Integer key = primaryKey(srv0.cache(null));
+
+        final IgniteCache<Integer, Integer> cache1 =
+            client1.createNearCache(null, new NearCacheConfiguration<Integer, Integer>());
+
+        final IgniteCache<Integer, Integer> cache2 =
+            client2.createNearCache(null, new NearCacheConfiguration<Integer, Integer>());
+
+        cache1.put(key, 1);
+
+        final Integer newVal = 2;
+
+        testSpi(client2).blockMessages(GridNearTxFinishRequest.class, srv0.name());
+
+        testSpi(srv0).blockMessages(new IgnitePredicate<GridIoMessage>() {
+            @Override public boolean apply(GridIoMessage msg) {
+                return msg.message() instanceof GridDhtTxFinishRequest;
+            }
+        });
+
+        IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                log.info("Start put, concurrency: " + concurrency);
+
+                if (concurrency != null) {
+                    try (Transaction tx = client2.transactions().txStart(concurrency, REPEATABLE_READ)) {
+                        cache2.put(key, newVal);
+
+                        tx.commit();
+                    }
+                }
+                else
+                    cache2.put(key, newVal);
+
+                return null;
+            }
+        });
+
+        U.sleep(500);
+
+        assertFalse(fut.isDone());
+
+        testSpi(client2).waitForMessage(GridNearTxFinishRequest.class, srv0.name());
+
+        stopGrid(client2.name());
+        stopGrid(srv0.name());
+
+        try {
+            fut.get();
+        }
+        catch (IgniteCheckedException ignore) {
+            // No-op.
+        }
+
+        final IgniteCache<Integer, Integer> srvCache = grid(1).cache(null);
+
+        GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                return newVal.equals(srvCache.get(key)) && newVal.equals(cache1.get(key));
+            }
+        }, 5000);
+
+        checkData(F.asMap(key, newVal));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxWithStoreImplicit() throws Exception {
+        txWithStore(null, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxWithStoreOptimistic() throws Exception {
+        txWithStore(OPTIMISTIC, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxWithStorePessimistic() throws Exception {
+        txWithStore(PESSIMISTIC, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxWithStoreNoWriteThroughImplicit() throws Exception {
+        txWithStore(null, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxWithStoreNoWriteThroughOptimistic() throws Exception {
+        txWithStore(OPTIMISTIC, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxWithStoreNoWriteThroughPessimistic() throws Exception {
+        txWithStore(PESSIMISTIC, false);
+    }
+
+    /**
+     * @param concurrency Tx concurrency or {@code null} for implicit transaction.
+     * @param writeThrough Store write through flag.
+     * @throws Exception If failed.
+     */
+    private void txWithStore(final TransactionConcurrency concurrency, boolean writeThrough) throws Exception {
+        startGrids(4);
+
+        Ignite srv0 = grid(0);
+
+        IgniteCache<Integer, Integer> srv0Cache = srv0.createCache(cacheConfiguration(1, true, writeThrough));
+
+        awaitPartitionMapExchange();
+
+        final Integer key = primaryKey(srv0Cache);
+
+        srv0Cache.put(key, 1);
+
+        client = true;
+
+        Ignite client = startGrid(4);
+
+        testSpi(srv0).blockMessages(GridNearTxPrepareResponse.class, client.name());
+
+        final IgniteCache<Integer, Integer> clientCache = client.cache(null);
+
+        IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                log.info("Start put");
+
+                clientCache.put(key, 2);
+
+                return null;
+            }
+        });
+
+        U.sleep(500);
+
+        assertFalse(fut.isDone());
+
+        testSpi(srv0).waitForMessage(GridNearTxPrepareResponse.class, client.name());
+
+        stopGrid(client.name());
+
+        try {
+            fut.get();
+        }
+        catch (IgniteCheckedException ignore) {
+            // No-op.
+        }
+
+        U.sleep(1000);
+
+        if (writeThrough)
+            checkData(F.asMap(key, 1));
+        else
+            checkData(F.asMap(key, 2));
+    }
+
+    /**
+     * @param node Node.
+     * @return Node communication SPI.
+     */
+    private TestRecordingCommunicationSpi testSpi(Ignite node) {
+        return (TestRecordingCommunicationSpi)node.configuration().getCommunicationSpi();
+    }
+
+    /**
+     * @param backups Number of backups.
+     * @param store Cache store flag.
+     * @param writeThrough Store write through flag.
+     * @return Cache configuration.
+     */
+    private CacheConfiguration<Integer, Integer> cacheConfiguration(int backups, boolean store, boolean writeThrough) {
+        CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>();
+
+        ccfg.setAtomicityMode(TRANSACTIONAL);
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+        ccfg.setBackups(backups);
+        ccfg.setRebalanceMode(ASYNC);
+
+        if (store) {
+            ccfg.setWriteThrough(writeThrough);
+
+            ccfg.setCacheStoreFactory(new TestStoreFactory());
+        }
+
+        return ccfg;
+    }
+
+    /**
+     * @param expData Expected cache data.
+     */
+    private void checkData(Map<Integer, Integer> expData) {
+        assert !expData.isEmpty();
+
+        List<Ignite> nodes = G.allGrids();
+
+        assertFalse(nodes.isEmpty());
+
+        for (Ignite node : nodes) {
+            IgniteCache<Integer, Integer> cache = node.cache(null);
+
+            for (Map.Entry<Integer, Integer> e : expData.entrySet()) {
+                assertEquals("Invalid value [key=" + e.getKey() + ", node=" + node.name() + ']',
+                    e.getValue(),
+                    cache.get(e.getKey()));
+            }
+        }
+    }
+
+    /**
+     *
+     */
+    private static class TestStoreFactory implements Factory<CacheStore<Object, Object>> {
+        /** */
+        @LoggerResource
+        private IgniteLogger log;
+
+        /** {@inheritDoc} */
+        @SuppressWarnings("unchecked")
+        @Override public CacheStore<Object, Object> create() {
+            return new CacheStoreAdapter() {
+                @Override public Object load(Object key) throws CacheLoaderException {
+                    return storeMap.get(key);
+                }
+
+                @Override public void write(Cache.Entry entry) throws CacheWriterException {
+                    log.info("Store write [key=" + entry.getKey() + ", val=" + entry.getValue() + ']');
+
+                    storeMap.put(entry.getKey(), entry.getValue());
+                }
+
+                @Override public void delete(Object key) throws CacheWriterException {
+                    log.info("Store delete [key=" + key + ']');
+
+                    storeMap.remove(key);
+                }
+            };
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedTxSalvageSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedTxSalvageSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedTxSalvageSelfTest.java
index 1517672..3e56b00 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedTxSalvageSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedTxSalvageSelfTest.java
@@ -28,7 +28,6 @@ import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.TransactionProxyImpl;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
@@ -153,7 +152,7 @@ public class GridCachePartitionedTxSalvageSelfTest extends GridCommonAbstractTes
      * Check whether caches has no transactions after salvage timeout.
      *
      * @param mode Transaction mode (PESSIMISTIC, OPTIMISTIC).
-     * @param prepare Whether to prepare transaction state (i.e. call {@link IgniteInternalTx#prepare()}).
+     * @param prepare Whether to prepare transaction state (i.e. call {@link GridNearTxLocal#prepare()}).
      * @throws Exception If failed.
      */
     private void checkSalvageAfterTimeout(TransactionConcurrency mode, boolean prepare) throws Exception {
@@ -172,7 +171,7 @@ public class GridCachePartitionedTxSalvageSelfTest extends GridCommonAbstractTes
      *
      * @param mode Transaction mode (PESSIMISTIC, OPTIMISTIC).
      * @param prepare Whether to prepare transaction state
-     *                (i.e. call {@link IgniteInternalTx#prepare()}).
+     *                (i.e. call {@link GridNearTxLocal#prepare()}).
      * @throws Exception If failed.
      */
     private void checkSalvageBeforeTimeout(TransactionConcurrency mode, boolean prepare) throws Exception {
@@ -198,7 +197,7 @@ public class GridCachePartitionedTxSalvageSelfTest extends GridCommonAbstractTes
      * Start new transaction on the grid(0) and put some keys to it.
      *
      * @param mode Transaction mode (PESSIMISTIC, OPTIMISTIC).
-     * @param prepare Whether to prepare transaction state (i.e. call {@link IgniteInternalTx#prepare()}).
+     * @param prepare Whether to prepare transaction state (i.e. call {@link GridNearTxLocal#prepare()}).
      * @throws Exception If failed.
      */
     private void startTxAndPutKeys(final TransactionConcurrency mode, final boolean prepare) throws Exception {

http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsBlockMessageSystemPoolStarvationSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsBlockMessageSystemPoolStarvationSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsBlockMessageSystemPoolStarvationSelfTest.java
index aa6a4f5..16596ed 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsBlockMessageSystemPoolStarvationSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsBlockMessageSystemPoolStarvationSelfTest.java
@@ -125,7 +125,7 @@ public class IgfsBlockMessageSystemPoolStarvationSelfTest extends IgfsCommonAbst
 
                     txCommitLatch.await();
 
-                    tx.commitTopLevelTx();
+                    tx.commit();
                 }
 
                 return null;

http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTxRecoverySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTxRecoverySelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTxRecoverySelfTestSuite.java
index 7363c7c..7bd7797 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTxRecoverySelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTxRecoverySelfTestSuite.java
@@ -25,6 +25,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCacheCo
 import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCachePartitionedNearDisabledPrimaryNodeFailureRecoveryTest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCachePartitionedPrimaryNodeFailureRecoveryTest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCachePartitionedTwoBackupsPrimaryNodeFailureRecoveryTest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCacheTxRecoveryRollbackTest;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheNearTxPessimisticOriginatingNodeFailureSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.replicated.GridCacheReplicatedTxOriginatingNodeFailureSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.replicated.GridCacheReplicatedTxPessimisticOriginatingNodeFailureSelfTest;
@@ -54,6 +55,8 @@ public class IgniteCacheTxRecoverySelfTestSuite extends TestSuite {
         suite.addTestSuite(GridCacheNearTxPessimisticOriginatingNodeFailureSelfTest.class);
         suite.addTestSuite(GridCacheReplicatedTxPessimisticOriginatingNodeFailureSelfTest.class);
 
+        suite.addTestSuite(IgniteCacheTxRecoveryRollbackTest.class);
+
         return suite;
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaResource.java
----------------------------------------------------------------------
diff --git a/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaResource.java b/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaResource.java
index f552cbc..649f7c4 100644
--- a/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaResource.java
+++ b/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaResource.java
@@ -28,7 +28,6 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.transactions.TransactionState;
@@ -101,7 +100,7 @@ final class CacheJtaResource implements XAResource, Synchronization {
             log.debug("XA resource rollback(...) [xid=" + xid + "]");
 
         try {
-            cacheTx.rollbackTopLevelTx();
+            cacheTx.rollback();
         }
         catch (IgniteCheckedException e) {
             throwException("Failed to rollback cache transaction: " + e.getMessage(), e);
@@ -147,7 +146,7 @@ final class CacheJtaResource implements XAResource, Synchronization {
             log.debug("XA resource commit(...) [xid=" + xid + ", onePhase=" + onePhase + "]");
 
         try {
-            cacheTx.commitTopLevelTx();
+            cacheTx.commit();
         }
         catch (IgniteCheckedException e) {
             throwException("Failed to commit cache transaction: " + e.getMessage(), e);
@@ -164,7 +163,7 @@ final class CacheJtaResource implements XAResource, Synchronization {
         try {
             cacheTx.invalidate(true);
 
-            cacheTx.commitTopLevelTx();
+            cacheTx.commit();
         }
         catch (IgniteCheckedException e) {
             throwException("Failed to forget cache transaction: " + e.getMessage(), e);
@@ -262,7 +261,7 @@ final class CacheJtaResource implements XAResource, Synchronization {
                     log.debug("Synchronization.afterCompletion(STATUS_COMMITTED) [xid=" + cacheTx.xid() + "]");
 
                 try {
-                    cacheTx.commitTopLevelTx();
+                    cacheTx.commit();
                 }
                 catch (IgniteCheckedException e) {
                     throw new CacheException("Failed to commit cache transaction.", e);
@@ -275,7 +274,7 @@ final class CacheJtaResource implements XAResource, Synchronization {
                     log.debug("Synchronization.afterCompletion(STATUS_ROLLEDBACK) [xid=" + cacheTx.xid() + "]");
 
                 try {
-                    cacheTx.rollbackTopLevelTx();
+                    cacheTx.rollback();
                 }
                 catch (IgniteCheckedException e) {
                     throw new CacheException("Failed to rollback cache transaction.", e);


[3/3] ignite git commit: ignite-4768 txs

Posted by sb...@apache.org.
ignite-4768 txs


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2272fadc
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2272fadc
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2272fadc

Branch: refs/heads/ignite-4768-1
Commit: 2272fadc5989b2787f14454c91eecde9b1fa7727
Parents: 684d66c
Author: sboikov <sb...@gridgain.com>
Authored: Wed Mar 15 11:01:45 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Mar 15 17:37:02 2017 +0300

----------------------------------------------------------------------
 .../ClientAbstractMultiNodeSelfTest.java        |  13 +-
 .../processors/cache/GridCacheAdapter.java      |  10 +-
 .../processors/cache/GridCacheIoManager.java    |   3 +
 .../cache/GridCacheSharedContext.java           |   8 +-
 .../processors/cache/GridCacheUtils.java        |   3 +-
 .../distributed/GridCacheCommittedTxInfo.java   | 117 -----
 .../GridDistributedTxRemoteAdapter.java         |  55 +-
 .../dht/GridDhtTransactionalCacheAdapter.java   |  18 +-
 .../cache/distributed/dht/GridDhtTxLocal.java   | 122 ++---
 .../distributed/dht/GridDhtTxLocalAdapter.java  |  12 +-
 .../distributed/dht/GridDhtTxPrepareFuture.java |  69 ++-
 .../colocated/GridDhtColocatedLockFuture.java   |   7 +-
 .../distributed/near/GridNearLockFuture.java    |   4 +-
 .../distributed/near/GridNearLockRequest.java   | 200 ++------
 .../near/GridNearOptimisticTxPrepareFuture.java |   2 +-
 .../near/GridNearTxFinishFuture.java            |   4 +-
 .../cache/distributed/near/GridNearTxLocal.java |  90 +++-
 .../near/GridNearTxPrepareFutureAdapter.java    |   5 +-
 .../near/GridNearTxPrepareRequest.java          |   2 +-
 .../distributed/near/GridNearTxRemote.java      |   4 +-
 .../cache/transactions/IgniteInternalTx.java    |  68 +--
 .../cache/transactions/IgniteTxAdapter.java     | 136 +----
 .../cache/transactions/IgniteTxHandler.java     |  27 +-
 .../IgniteTxImplicitSingleStateImpl.java        |   4 +-
 .../transactions/IgniteTxLocalAdapter.java      |  36 +-
 .../cache/transactions/IgniteTxLocalEx.java     |   9 +-
 .../cache/transactions/IgniteTxManager.java     | 148 ++----
 .../IgniteTxRemoteStateAdapter.java             |   2 +-
 .../cache/transactions/IgniteTxState.java       |   2 +-
 .../cache/transactions/IgniteTxStateImpl.java   |   4 +-
 .../datastructures/DataStructuresProcessor.java |  28 +-
 .../datastructures/GridCacheAtomicLongImpl.java |  16 +-
 .../GridCacheAtomicReferenceImpl.java           |   4 +-
 .../GridCacheAtomicSequenceImpl.java            |   2 +-
 .../GridCacheAtomicStampedImpl.java             |   4 +-
 .../GridCacheCountDownLatchImpl.java            |   4 +-
 .../datastructures/GridCacheLockImpl.java       |  11 +-
 .../datastructures/GridCacheSemaphoreImpl.java  |  11 +-
 .../GridTransactionalCacheQueueImpl.java        |  10 +-
 .../processors/igfs/IgfsDataManager.java        |   4 +-
 .../processors/igfs/IgfsMetaManager.java        |  34 +-
 .../service/GridServiceProcessor.java           |   2 +-
 .../internal/TestRecordingCommunicationSpi.java |  29 ++
 .../cache/IgniteTxConfigCacheSelfTest.java      |   2 +-
 .../IgniteCacheSystemTransactionsSelfTest.java  |   6 +-
 ...xOriginatingNodeFailureAbstractSelfTest.java |   2 +-
 ...cOriginatingNodeFailureAbstractSelfTest.java |   4 +-
 ...ePrimaryNodeFailureRecoveryAbstractTest.java |   4 +-
 .../dht/IgniteCacheTxRecoveryRollbackTest.java  | 501 +++++++++++++++++++
 .../GridCachePartitionedTxSalvageSelfTest.java  |   7 +-
 ...lockMessageSystemPoolStarvationSelfTest.java |   2 +-
 .../IgniteCacheTxRecoverySelfTestSuite.java     |   3 +
 .../processors/cache/jta/CacheJtaResource.java  |  11 +-
 53 files changed, 956 insertions(+), 929 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/clients/src/test/java/org/apache/ignite/internal/client/integration/ClientAbstractMultiNodeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/client/integration/ClientAbstractMultiNodeSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/client/integration/ClientAbstractMultiNodeSelfTest.java
index 7fb2385..2fba49a 100644
--- a/modules/clients/src/test/java/org/apache/ignite/internal/client/integration/ClientAbstractMultiNodeSelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/internal/client/integration/ClientAbstractMultiNodeSelfTest.java
@@ -56,6 +56,7 @@ import org.apache.ignite.internal.managers.communication.GridIoMessage;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedLockRequest;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalAdapter;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersionable;
@@ -480,11 +481,11 @@ public abstract class ClientAbstractMultiNodeSelfTest extends GridCommonAbstract
     @SuppressWarnings("unchecked")
     private static class TestCommunicationSpi extends TcpCommunicationSpi {
         /** {@inheritDoc} */
-        @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackClosure)
+        @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC)
             throws IgniteSpiException {
             checkSyncFlags((GridIoMessage)msg);
 
-            super.sendMessage(node, msg, ackClosure);
+            super.sendMessage(node, msg, ackC);
         }
 
         /**
@@ -512,13 +513,13 @@ public abstract class ClientAbstractMultiNodeSelfTest extends GridCommonAbstract
             IgniteInternalTx t = tm.tx(v);
 
             if (t.hasWriteKey(cacheCtx.txKey(cacheCtx.toCacheKeyObject("x1"))))
-                assertEquals("Invalid tx flags: " + t, FULL_ASYNC, t.syncMode());
+                assertEquals("Invalid tx flags: " + t, FULL_ASYNC, ((IgniteTxLocalAdapter)t).syncMode());
             else if (t.hasWriteKey(cacheCtx.txKey(cacheCtx.toCacheKeyObject("x2"))))
-                assertEquals("Invalid tx flags: " + t, FULL_SYNC, t.syncMode());
+                assertEquals("Invalid tx flags: " + t, FULL_SYNC, ((IgniteTxLocalAdapter)t).syncMode());
             else if (t.hasWriteKey(cacheCtx.txKey(cacheCtx.toCacheKeyObject("x3"))))
-                assertEquals("Invalid tx flags: " + t, FULL_ASYNC, t.syncMode());
+                assertEquals("Invalid tx flags: " + t, FULL_ASYNC, ((IgniteTxLocalAdapter)t).syncMode());
             else if (t.hasWriteKey(cacheCtx.txKey(cacheCtx.toCacheKeyObject("x4"))))
-                assertEquals("Invalid tx flags: " + t, FULL_SYNC, t.syncMode());
+                assertEquals("Invalid tx flags: " + t, FULL_SYNC, ((IgniteTxLocalAdapter)t).syncMode());
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index bbf19f1..3bfd1f8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -4150,7 +4150,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
                 IgniteInternalFuture<IgniteInternalTx> f = new GridEmbeddedFuture<>(fut,
                     new C2<Object, Exception, IgniteInternalFuture<IgniteInternalTx>>() {
                         @Override public IgniteInternalFuture<IgniteInternalTx> apply(Object o, Exception e) {
-                            return tx.commitTopLevelTxAsync();
+                            return tx.commitNearTxLocalAsync();
                         }
                     });
 
@@ -4159,7 +4159,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
                 return f;
             }
 
-            IgniteInternalFuture<IgniteInternalTx> f = tx.commitTopLevelTxAsync();
+            IgniteInternalFuture<IgniteInternalTx> f = tx.commitNearTxLocalAsync();
 
             saveFuture(holder, f);
 
@@ -4240,7 +4240,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
                 catch (IgniteCheckedException e) {
                     if (!(e instanceof IgniteTxRollbackCheckedException)) {
                         try {
-                            tx.rollbackTopLevelTx();
+                            tx.rollback();
 
                             e = new IgniteTxRollbackCheckedException("Transaction has been rolled back: " +
                                 tx.xid(), e);
@@ -4379,7 +4379,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
                                         throw e;
                                     }
                                     catch (IgniteCheckedException e1) {
-                                        tx0.rollbackAsync();
+                                        tx0.rollbackNearTxLocalAsync();
 
                                         throw e1;
                                     }
@@ -4405,7 +4405,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
                         throw e;
                     }
                     catch (IgniteCheckedException e1) {
-                        tx0.rollbackAsync();
+                        tx0.rollbackNearTxLocalAsync();
 
                         throw e1;
                     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 99878ec..9cb5c35 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -380,6 +380,9 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
 
             unmarshall(nodeId, cacheMsg);
 
+//            if (!cacheMsg.partitionExchangeMessage())
+//                log.info("Message [from=" + nodeId + ", msg=" + cacheMsg + ']');
+
             if (cacheMsg.classError() != null)
                 processFailedMessage(nodeId, cacheMsg, c);
             else

http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
index 989a810..39a3baa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
@@ -732,7 +732,7 @@ public class GridCacheSharedContext<K, V> {
      * @param tx Transaction to close.
      * @throws IgniteCheckedException If failed.
      */
-    public void endTx(IgniteInternalTx tx) throws IgniteCheckedException {
+    public void endTx(GridNearTxLocal tx) throws IgniteCheckedException {
         tx.txState().awaitLastFut(this);
 
         tx.close();
@@ -749,7 +749,7 @@ public class GridCacheSharedContext<K, V> {
         if (ctx == null) {
             tx.txState().awaitLastFut(this);
 
-            return tx.commitTopLevelTxAsync();
+            return tx.commitNearTxLocalAsync();
         }
         else
             return ctx.cache().commitTxAsync(tx);
@@ -760,10 +760,10 @@ public class GridCacheSharedContext<K, V> {
      * @throws IgniteCheckedException If failed.
      * @return Rollback future.
      */
-    public IgniteInternalFuture rollbackTxAsync(IgniteInternalTx tx) throws IgniteCheckedException {
+    public IgniteInternalFuture rollbackTxAsync(GridNearTxLocal tx) throws IgniteCheckedException {
         tx.txState().awaitLastFut(this);
 
-        return tx.rollbackAsync();
+        return tx.rollbackNearTxLocalAsync();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index 77a99fc..7131612 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -98,7 +98,6 @@ import org.jsr166.ConcurrentHashMap8;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK;
 import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
 import static org.apache.ignite.cache.CacheMode.LOCAL;
-import static org.apache.ignite.cache.CacheMode.PARTITIONED;
 import static org.apache.ignite.cache.CacheMode.REPLICATED;
 import static org.apache.ignite.cache.CacheRebalanceMode.SYNC;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
@@ -1261,7 +1260,7 @@ public class GridCacheUtils {
         try (GridNearTxLocal tx = cache.txStartEx(concurrency, isolation);) {
             clo.applyx(cache);
 
-            tx.commitTopLevelTx();
+            tx.commit();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheCommittedTxInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheCommittedTxInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheCommittedTxInfo.java
deleted file mode 100644
index 875ada0..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheCommittedTxInfo.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed;
-
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.UUID;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
-
-/**
- * Committed transaction information. Contains recovery writes that will be used to set commit values
- * in case if originating node crashes.
- */
-@Deprecated
-public class GridCacheCommittedTxInfo implements Externalizable {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** Originating transaction ID. */
-    private GridCacheVersion originatingTxId;
-
-    /** Originating node ID. */
-    private UUID originatingNodeId;
-
-    /** Recovery writes, i.e. values that have never been sent to remote nodes. */
-    @GridToStringInclude
-    private Collection<IgniteTxEntry> recoveryWrites;
-
-    /**
-     * Empty constructor required by {@link Externalizable}.
-     */
-    public GridCacheCommittedTxInfo() {
-        // No-op.
-    }
-
-    /**
-     * @param tx Committed cache transaction.
-     */
-    public GridCacheCommittedTxInfo(IgniteInternalTx tx) {
-        assert !tx.local() || !tx.replicated();
-
-        originatingTxId = tx.nearXidVersion();
-        originatingNodeId = tx.eventNodeId();
-    }
-
-    /**
-     * @return Originating transaction ID (the transaction ID for replicated cache and near transaction ID
-     *      for partitioned cache).
-     */
-    public GridCacheVersion originatingTxId() {
-        return originatingTxId;
-    }
-
-    /**
-     * @return Originating node ID (the local transaction node ID for replicated cache and near node ID
-     *      for partitioned cache).
-     */
-    public UUID originatingNodeId() {
-        return originatingNodeId;
-    }
-
-    /**
-     * @return Collection of recovery writes.
-     */
-    public Collection<IgniteTxEntry> recoveryWrites() {
-        return recoveryWrites == null ? Collections.<IgniteTxEntry>emptyList() : recoveryWrites;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void writeExternal(ObjectOutput out) throws IOException {
-        originatingTxId.writeExternal(out);
-
-        U.writeUuid(out, originatingNodeId);
-
-        U.writeCollection(out, recoveryWrites);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-        originatingTxId = new GridCacheVersion();
-
-        originatingTxId.readExternal(in);
-
-        originatingNodeId = U.readUuid(in);
-
-        recoveryWrites = U.readCollection(in);
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridCacheCommittedTxInfo.class, this, "recoveryWrites", recoveryWrites);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index 5be1fe9..3fe59c0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@ -342,12 +342,6 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteInternalFuture<IgniteInternalTx> prepareAsync() {
-        assert false;
-        return null;
-    }
-
-    /** {@inheritDoc} */
     @Override public Set<IgniteTxKey> readSet() {
         return txState.readSet();
     }
@@ -401,15 +395,6 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
     }
 
     /**
-     * Prepare phase.
-     *
-     * @throws IgniteCheckedException If prepare failed.
-     */
-    @Override public void prepare() throws IgniteCheckedException {
-        prepareRemoteTx();
-    }
-
-    /**
      * @throws IgniteCheckedException If commit failed.
      */
     @SuppressWarnings({"CatchGenericClass"})
@@ -756,11 +741,6 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
         commitIfLocked();
     }
 
-    /** {@inheritDoc} */
-    @Override public void commit() throws IgniteCheckedException {
-        commitRemoteTx();
-    }
-
     /**
      * Forces commit for this tx.
      *
@@ -783,6 +763,35 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
     }
 
     /** {@inheritDoc} */
+    @Override public IgniteInternalFuture<?>  salvageTx() {
+        try {
+            systemInvalidate(true);
+
+            prepareRemoteTx();
+
+            if (state() == PREPARING) {
+                if (log.isDebugEnabled())
+                    log.debug("Ignoring transaction in PREPARING state as it is currently handled " +
+                        "by another thread: " + this);
+
+                return null;
+            }
+
+            doneRemote(xidVersion(),
+                Collections.<GridCacheVersion>emptyList(),
+                Collections.<GridCacheVersion>emptyList(),
+                Collections.<GridCacheVersion>emptyList());
+
+            commitRemoteTx();
+        }
+        catch (IgniteCheckedException e) {
+            U.error(log, "Failed to invalidate transaction: " + xidVersion(), e);
+        }
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
     @Override public void rollbackRemoteTx() {
         try {
             // Note that we don't evict near entries here -
@@ -801,12 +810,6 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
     }
 
     /** {@inheritDoc} */
-    @SuppressWarnings({"CatchGenericClass"})
-    @Override public void rollback() {
-        rollbackRemoteTx();
-    }
-
-    /** {@inheritDoc} */
     @Override public IgniteInternalFuture<IgniteInternalTx> rollbackAsync() {
         rollbackRemoteTx();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
index 0c63e45..1e09eda 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
@@ -178,7 +178,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
      * @throws GridDistributedLockCancelledException If lock has been cancelled.
      */
     @SuppressWarnings({"RedundantTypeArguments"})
-    @Nullable GridDhtTxRemote startRemoteTx(UUID nodeId,
+    @Nullable private GridDhtTxRemote startRemoteTx(UUID nodeId,
         GridDhtLockRequest req,
         GridDhtLockResponse res)
         throws IgniteCheckedException, GridDistributedLockCancelledException {
@@ -561,10 +561,10 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
 
         if (fail) {
             if (dhtTx != null)
-                dhtTx.rollback();
+                dhtTx.rollbackRemoteTx();
 
             if (nearTx != null) // Even though this should never happen, we leave this check for consistency.
-                nearTx.rollback();
+                nearTx.rollbackRemoteTx();
 
             List<KeyCacheObject> keys = req.keys();
 
@@ -961,8 +961,8 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
                             req.futureId(),
                             req.miniId(),
                             req.threadId(),
-                            req.implicitTx(),
-                            req.implicitSingleTx(),
+                            /*implicitTx*/false,
+                            /*implicitSingleTx*/false,
                             ctx.systemTx(),
                             false,
                             ctx.ioPolicy(),
@@ -989,7 +989,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
                             U.warn(log, msg);
 
                             if (tx != null)
-                                tx.rollback();
+                                tx.rollbackDhtLocal();
 
                             return new GridDhtFinishedFuture<>(new IgniteCheckedException(msg));
                         }
@@ -1086,7 +1086,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
 
             if (tx != null) {
                 try {
-                    tx.rollback();
+                    tx.rollbackDhtLocal();
                 }
                 catch (IgniteCheckedException ex) {
                     U.error(log, "Failed to rollback the transaction: " + tx, ex);
@@ -1290,7 +1290,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
      */
     private void sendLockReply(
         ClusterNode nearNode,
-        @Nullable IgniteInternalTx tx,
+        @Nullable GridDhtTxLocal tx,
         GridNearLockRequest req,
         GridNearLockResponse res
     ) {
@@ -1328,7 +1328,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
                 ", res=" + res + ']', e);
 
             if (tx != null)
-                tx.rollbackAsync();
+                tx.rollbackDhtLocalAsync();
 
             // Convert to closure exception as this method is only called form closures.
             throw new GridClosureException(e);

http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
index 6dd40b0..b1c7e5b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
@@ -292,82 +292,22 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteInternalFuture<?> prepareAsync() {
-        if (optimistic()) {
-            assert isSystemInvalidate();
-
-            return prepareAsync(
-                null,
-                null,
-                Collections.<IgniteTxKey, GridCacheVersion>emptyMap(),
-                0,
-                nearMiniId,
-                null,
-                true);
-        }
-
-        long timeout = remainingTime();
-
-        // For pessimistic mode we don't distribute prepare request.
-        GridDhtTxPrepareFuture fut = prepFut;
-
-        if (fut == null) {
-            // Future must be created before any exception can be thrown.
-            if (!PREP_FUT_UPD.compareAndSet(this, null, fut = new GridDhtTxPrepareFuture(
-                cctx,
-                this,
-                timeout,
-                nearMiniId,
-                Collections.<IgniteTxKey, GridCacheVersion>emptyMap(),
-                true,
-                needReturnValue()))) {
-                if (timeout == -1)
-                    prepFut.onError(timeoutException());
+    @Override public IgniteInternalFuture<?> salvageTx() {
+        systemInvalidate(true);
 
-                return prepFut;
-            }
-        }
-        else
-            // Prepare was called explicitly.
-            return fut;
+        state(PREPARED);
 
-        if (!state(PREPARING)) {
-            if (setRollbackOnly()) {
-                if (timeout == -1)
-                    fut.onError(new IgniteTxTimeoutCheckedException("Transaction timed out and was rolled back: " +
-                        this));
-                else
-                    fut.onError(new IgniteCheckedException("Invalid transaction state for prepare [state=" + state() +
-                        ", tx=" + this + ']'));
-            }
-            else
-                fut.onError(new IgniteTxRollbackCheckedException("Invalid transaction state for prepare [state=" +
-                    state() + ", tx=" + this + ']'));
+        if (state() == PREPARING) {
+            if (log.isDebugEnabled())
+                log.debug("Ignoring transaction in PREPARING state as it is currently handled " +
+                    "by another thread: " + this);
 
-            return fut;
+            return null;
         }
 
-        try {
-            userPrepare();
-
-            if (!state(PREPARED)) {
-                setRollbackOnly();
-
-                fut.onError(new IgniteCheckedException("Invalid transaction state for commit [state=" + state() +
-                    ", tx=" + this + ']'));
-
-                return fut;
-            }
-
-            fut.complete();
-
-            return fut;
-        }
-        catch (IgniteCheckedException e) {
-            fut.onError(e);
+        setRollbackOnly();
 
-            return fut;
-        }
+        return rollbackDhtLocalAsync();
     }
 
     /**
@@ -382,7 +322,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
      * @param last {@code True} if this is last prepare request.
      * @return Future that will be completed when locks are acquired.
      */
-    public IgniteInternalFuture<GridNearTxPrepareResponse> prepareAsync(
+    public final IgniteInternalFuture<GridNearTxPrepareResponse> prepareAsync(
         @Nullable Collection<IgniteTxEntry> reads,
         @Nullable Collection<IgniteTxEntry> writes,
         Map<IgniteTxKey, GridCacheVersion> verMap,
@@ -478,7 +418,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
             fut.onError(new IgniteTxRollbackCheckedException("Failed to prepare transaction: " + this, e));
 
             try {
-                rollback();
+                rollbackDhtLocal();
             }
             catch (IgniteTxOptimisticCheckedException e1) {
                 if (log.isDebugEnabled())
@@ -523,7 +463,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
             if (prepFut != null)
                 prepFut.get(); // Check for errors.
 
-            boolean finished = finish(commit);
+            boolean finished = localFinish(commit);
 
             if (!finished)
                 err = new IgniteCheckedException("Failed to finish transaction [commit=" + commit +
@@ -544,9 +484,10 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
             fut.finish(commit);
     }
 
-    /** {@inheritDoc} */
-    @SuppressWarnings("unchecked")
-    @Override public IgniteInternalFuture<IgniteInternalTx> commitAsync() {
+    /**
+     * @return Commit future.
+     */
+    public IgniteInternalFuture<IgniteInternalTx> commitDhtLocalAsync() {
         if (log.isDebugEnabled())
             log.debug("Committing dht local tx: " + this);
 
@@ -577,15 +518,29 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
     }
 
     /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override public IgniteInternalFuture<IgniteInternalTx> commitAsync() {
+        return commitDhtLocalAsync();
+    }
+
+    /** {@inheritDoc} */
     @Override protected void clearPrepareFuture(GridDhtTxPrepareFuture fut) {
         assert optimistic();
 
         PREP_FUT_UPD.compareAndSet(this, fut, null);
     }
 
-    /** {@inheritDoc} */
-    @SuppressWarnings("unchecked")
-    @Override public IgniteInternalFuture<IgniteInternalTx> rollbackAsync() {
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    public void rollbackDhtLocal() throws IgniteCheckedException {
+        rollbackDhtLocalAsync().get();
+    }
+
+    /**
+     * @return Rollback future.
+     */
+    public IgniteInternalFuture<IgniteInternalTx> rollbackDhtLocalAsync() {
         final GridDhtTxFinishFuture fut = new GridDhtTxFinishFuture<>(cctx, this, false);
 
         cctx.mvcc().addFuture(fut, fut.futureId());
@@ -608,8 +563,13 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
     }
 
     /** {@inheritDoc} */
+    @Override public IgniteInternalFuture<IgniteInternalTx> rollbackAsync() {
+        return rollbackDhtLocalAsync();
+    }
+
+    /** {@inheritDoc} */
     @SuppressWarnings({"CatchGenericClass", "ThrowableInstanceNeverThrown"})
-    @Override public boolean finish(boolean commit) throws IgniteCheckedException {
+    @Override public boolean localFinish(boolean commit) throws IgniteCheckedException {
         assert nearFinFutId != null || isInvalidate() || !commit || isSystemInvalidate()
             || onePhaseCommit() || state() == PREPARED :
             "Invalid state [nearFinFutId=" + nearFinFutId + ", isInvalidate=" + isInvalidate() + ", commit=" + commit +
@@ -617,7 +577,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
 
         assert nearMiniId != 0;
 
-        return super.finish(commit);
+        return super.localFinish(commit);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
index 8e82c53..0329386 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
@@ -736,7 +736,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
 
     /** {@inheritDoc} */
     @SuppressWarnings({"CatchGenericClass", "ThrowableInstanceNeverThrown"})
-    @Override public boolean finish(boolean commit) throws IgniteCheckedException {
+    @Override public boolean localFinish(boolean commit) throws IgniteCheckedException {
         if (log.isDebugEnabled())
             log.debug("Finishing dht local tx [tx=" + this + ", commit=" + commit + "]");
 
@@ -854,16 +854,6 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
     }
 
     /** {@inheritDoc} */
-    @Override public void rollback() throws IgniteCheckedException {
-        try {
-            rollbackAsync().get();
-        }
-        finally {
-            cctx.tm().resetContext();
-        }
-    }
-
-    /** {@inheritDoc} */
     @Override public String toString() {
         return GridToStringBuilder.toString(GridDhtTxLocalAdapter.class, this, "nearNodes", nearMap.keySet(),
             "dhtNodes", dhtMap.keySet(), "explicitLock", explicitLock, "super", super.toString());

http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 56884ff..ae570aa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -718,18 +718,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
                     CIX1<IgniteInternalFuture<IgniteInternalTx>> resClo =
                         new CIX1<IgniteInternalFuture<IgniteInternalTx>>() {
                             @Override public void applyx(IgniteInternalFuture<IgniteInternalTx> fut) {
-                                try {
-                                    if (REPLIED_UPD.compareAndSet(GridDhtTxPrepareFuture.this, 0, 1))
-                                        sendPrepareResponse(res);
-                                }
-                                catch (IgniteCheckedException e) {
-                                    U.error(log, "Failed to send prepare response [txId=" + tx.nearXidVersion() + "," +
-                                        ", dhtTxId=" + tx.xidVersion() +
-                                        ", node=" + tx.nearNodeId() +
-                                        ", res=" + res,
-                                        ", tx=" + tx,
-                                        e);
-                                }
+                                if (REPLIED_UPD.compareAndSet(GridDhtTxPrepareFuture.this, 0, 1))
+                                    sendPrepareResponse(res);
                             }
                         };
 
@@ -761,18 +751,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
                 }
             }
             else {
-                try {
-                    if (REPLIED_UPD.compareAndSet(this, 0, 1))
-                        sendPrepareResponse(res);
-                }
-                catch (IgniteCheckedException e) {
-                    U.error(log, "Failed to send prepare response [txId=" + tx.nearXidVersion() + "," +
-                        ", dhtTxId=" + tx.xidVersion() +
-                        ", node=" + tx.nearNodeId() +
-                        ", res=" + res,
-                        ", tx=" + tx,
-                        e);
-                }
+                if (REPLIED_UPD.compareAndSet(this, 0, 1))
+                    sendPrepareResponse(res);
             }
 
             return true;
@@ -784,14 +764,6 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
                 try {
                     sendPrepareResponse(res);
                 }
-                catch (IgniteCheckedException e) {
-                    U.error(log, "Failed to send prepare response [txId=" + tx.nearXidVersion() + "," +
-                        ", dhtTxId=" + tx.xidVersion() +
-                        ", node=" + tx.nearNodeId() +
-                        ", res=" + res,
-                        ", tx=" + tx,
-                        e);
-                }
                 finally {
                     // Will call super.onDone().
                     onComplete(res);
@@ -819,9 +791,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
 
     /**
      * @param res Response.
-     * @throws IgniteCheckedException If failed to send response.
      */
-    private void sendPrepareResponse(GridNearTxPrepareResponse res) throws IgniteCheckedException {
+    private void sendPrepareResponse(GridNearTxPrepareResponse res) {
         if (!tx.nearNodeId().equals(cctx.localNodeId())) {
             Throwable err = this.err;
 
@@ -837,13 +808,31 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
                 return;
             }
 
-            cctx.io().send(tx.nearNodeId(), res, tx.ioPolicy());
+            try {
+                cctx.io().send(tx.nearNodeId(), res, tx.ioPolicy());
 
-            if (msgLog.isDebugEnabled()) {
-                msgLog.debug("DHT prepare fut, sent response [txId=" + tx.nearXidVersion() +
-                    ", dhtTxId=" + tx.xidVersion() +
-                    ", node=" + tx.nearNodeId() +
-                    ", res=" + res + ']');
+                if (msgLog.isDebugEnabled()) {
+                    msgLog.debug("DHT prepare fut, sent response [txId=" + tx.nearXidVersion() +
+                        ", dhtTxId=" + tx.xidVersion() +
+                        ", node=" + tx.nearNodeId() +
+                        ", res=" + res + ']');
+                }
+            }
+            catch (ClusterTopologyCheckedException e) {
+                if (msgLog.isDebugEnabled()) {
+                    msgLog.debug("Failed to send prepare response, node left [txId=" + tx.nearXidVersion() + "," +
+                        ", dhtTxId=" + tx.xidVersion() +
+                        ", node=" + tx.nearNodeId() +
+                        ", res=" + res + ']');
+                }
+            }
+            catch (IgniteCheckedException e) {
+                U.error(msgLog, "Failed to send prepare response [txId=" + tx.nearXidVersion() + "," +
+                        ", dhtTxId=" + tx.xidVersion() +
+                        ", node=" + tx.nearNodeId() +
+                        ", res=" + res,
+                    ", tx=" + tx + ']',
+                    e);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index 0ce380d..79c15fb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -917,6 +917,8 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
                                         first = false;
                                     }
 
+                                    assert !implicitTx() && !implicitSingleTx() : tx;
+
                                     req = new GridNearLockRequest(
                                         cctx.cacheId(),
                                         topVer,
@@ -925,8 +927,6 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
                                         futId,
                                         lockVer,
                                         inTx(),
-                                        implicitTx(),
-                                        implicitSingleTx(),
                                         read,
                                         retval,
                                         isolation(),
@@ -982,9 +982,6 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
                 }
             }
 
-            if (inTx() && req != null)
-                req.hasTransforms(tx.hasTransforms());
-
             if (!distributedKeys.isEmpty()) {
                 mapping.distributedKeys(distributedKeys);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
index ffc84d8..1948df0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
@@ -1045,6 +1045,8 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
                                                 first = false;
                                             }
 
+                                            assert !implicitTx() && !implicitSingleTx() : tx;
+
                                             req = new GridNearLockRequest(
                                                 cctx.cacheId(),
                                                 topVer,
@@ -1053,8 +1055,6 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
                                                 futId,
                                                 lockVer,
                                                 inTx(),
-                                                implicitTx(),
-                                                implicitSingleTx(),
                                                 read,
                                                 retval,
                                                 isolation(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
index 229961e..48b508b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
@@ -46,6 +46,15 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
     /** */
     private static final long serialVersionUID = 0L;
 
+    /** */
+    private static final int NEED_RETURN_VALUE_FLAG_MASK = 0x01;
+
+    /** */
+    private static final int FIRST_CLIENT_REQ_FLAG_MASK = 0x02;
+
+    /** */
+    private static final int SYNC_COMMIT_FLAG_MASK = 0x04;
+
     /** Topology version. */
     private AffinityTopologyVersion topVer;
 
@@ -55,15 +64,6 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
     /** Filter. */
     private CacheEntryPredicate[] filter;
 
-    /** Implicit flag. */
-    private boolean implicitTx;
-
-    /** Implicit transaction with one key flag. */
-    private boolean implicitSingleTx;
-
-    /** Flag is kept for backward compatibility. */
-    private boolean onePhaseCommit;
-
     /** Array of mapped DHT versions for this entry. */
     @GridToStringInclude
     private GridCacheVersion[] dhtVers;
@@ -74,23 +74,14 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
     /** Task name hash. */
     private int taskNameHash;
 
-    /** Has transforms flag. */
-    private boolean hasTransforms;
-
-    /** Sync commit flag. */
-    private boolean syncCommit;
-
     /** TTL for create operation. */
     private long createTtl;
 
     /** TTL for read operation. */
     private long accessTtl;
 
-    /** Flag indicating whether cache operation requires a previous value. */
-    private boolean retVal;
-
-    /** {@code True} if first lock request for lock operation sent from client node. */
-    private boolean firstClientReq;
+    /** */
+    private byte flags;
 
     /**
      * Empty constructor required for {@link Externalizable}.
@@ -107,8 +98,6 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
      * @param futId Future ID.
      * @param lockVer Cache version.
      * @param isInTx {@code True} if implicit transaction lock.
-     * @param implicitTx Flag to indicate that transaction is implicit.
-     * @param implicitSingleTx Implicit-transaction-with-one-key flag.
      * @param isRead Indicates whether implicit lock is for read or write operation.
      * @param retVal Return value flag.
      * @param isolation Transaction isolation.
@@ -133,8 +122,6 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
         IgniteUuid futId,
         GridCacheVersion lockVer,
         boolean isInTx,
-        boolean implicitTx,
-        boolean implicitSingleTx,
         boolean isRead,
         boolean retVal,
         TransactionIsolation isolation,
@@ -174,24 +161,43 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
         assert topVer.compareTo(AffinityTopologyVersion.ZERO) > 0;
 
         this.topVer = topVer;
-        this.implicitTx = implicitTx;
-        this.implicitSingleTx = implicitSingleTx;
-        this.syncCommit = syncCommit;
         this.subjId = subjId;
         this.taskNameHash = taskNameHash;
         this.createTtl = createTtl;
         this.accessTtl = accessTtl;
-        this.retVal = retVal;
-        this.firstClientReq = firstClientReq;
 
         dhtVers = new GridCacheVersion[keyCnt];
+
+        setFlag(syncCommit, SYNC_COMMIT_FLAG_MASK);
+        setFlag(firstClientReq, FIRST_CLIENT_REQ_FLAG_MASK);
+        setFlag(retVal, NEED_RETURN_VALUE_FLAG_MASK);
+    }
+
+    /**
+     * Sets flag mask.
+     *
+     * @param flag Set or clear.
+     * @param mask Mask.
+     */
+    private void setFlag(boolean flag, int mask) {
+        flags = flag ? (byte)(flags | mask) : (byte)(flags & ~mask);
+    }
+
+    /**
+     * Reags flag mask.
+     *
+     * @param mask Mask to read.
+     * @return Flag value.
+     */
+    private boolean isFlag(int mask) {
+        return (flags & mask) != 0;
     }
 
     /**
      * @return {@code True} if first lock request for lock operation sent from client node.
      */
     public boolean firstClientRequest() {
-        return firstClientReq;
+        return isFlag(FIRST_CLIENT_REQ_FLAG_MASK);
     }
 
     /**
@@ -216,24 +222,10 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
     }
 
     /**
-     * @return Implicit transaction flag.
-     */
-    public boolean implicitTx() {
-        return implicitTx;
-    }
-
-    /**
-     * @return Implicit-transaction-with-one-key flag.
-     */
-    public boolean implicitSingleTx() {
-        return implicitSingleTx;
-    }
-
-    /**
      * @return Sync commit flag.
      */
     public boolean syncCommit() {
-        return syncCommit;
+        return isFlag(SYNC_COMMIT_FLAG_MASK);
     }
 
     /**
@@ -268,24 +260,10 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
     }
 
     /**
-     * @param hasTransforms {@code True} if originating transaction has transform entries.
-     */
-    public void hasTransforms(boolean hasTransforms) {
-        this.hasTransforms = hasTransforms;
-    }
-
-    /**
-     * @return {@code True} if originating transaction has transform entries.
-     */
-    public boolean hasTransforms() {
-        return hasTransforms;
-    }
-
-    /**
      * @return Need return value flag.
      */
     public boolean needReturnValue() {
-        return retVal;
+        return isFlag(NEED_RETURN_VALUE_FLAG_MASK);
     }
 
     /**
@@ -399,66 +377,30 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
                 writer.incrementState();
 
             case 24:
-                if (!writer.writeBoolean("firstClientReq", firstClientReq))
+                if (!writer.writeByte("flags", flags))
                     return false;
 
                 writer.incrementState();
 
             case 25:
-                if (!writer.writeBoolean("hasTransforms", hasTransforms))
-                    return false;
-
-                writer.incrementState();
-
-            case 26:
-                if (!writer.writeBoolean("implicitSingleTx", implicitSingleTx))
-                    return false;
-
-                writer.incrementState();
-
-            case 27:
-                if (!writer.writeBoolean("implicitTx", implicitTx))
-                    return false;
-
-                writer.incrementState();
-
-            case 28:
                 if (!writer.writeInt("miniId", miniId))
                     return false;
 
                 writer.incrementState();
 
-            case 29:
-                if (!writer.writeBoolean("onePhaseCommit", onePhaseCommit))
-                    return false;
-
-                writer.incrementState();
-
-            case 30:
-                if (!writer.writeBoolean("retVal", retVal))
-                    return false;
-
-                writer.incrementState();
-
-            case 31:
+            case 26:
                 if (!writer.writeUuid("subjId", subjId))
                     return false;
 
                 writer.incrementState();
 
-            case 32:
-                if (!writer.writeBoolean("syncCommit", syncCommit))
-                    return false;
-
-                writer.incrementState();
-
-            case 33:
+            case 27:
                 if (!writer.writeInt("taskNameHash", taskNameHash))
                     return false;
 
                 writer.incrementState();
 
-            case 34:
+            case 28:
                 if (!writer.writeMessage("topVer", topVer))
                     return false;
 
@@ -513,7 +455,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
                 reader.incrementState();
 
             case 24:
-                firstClientReq = reader.readBoolean("firstClientReq");
+                flags = reader.readByte("flags");
 
                 if (!reader.isLastRead())
                     return false;
@@ -521,30 +463,6 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
                 reader.incrementState();
 
             case 25:
-                hasTransforms = reader.readBoolean("hasTransforms");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 26:
-                implicitSingleTx = reader.readBoolean("implicitSingleTx");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 27:
-                implicitTx = reader.readBoolean("implicitTx");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 28:
                 miniId = reader.readInt("miniId");
 
                 if (!reader.isLastRead())
@@ -552,23 +470,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
 
                 reader.incrementState();
 
-            case 29:
-                onePhaseCommit = reader.readBoolean("onePhaseCommit");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 30:
-                retVal = reader.readBoolean("retVal");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 31:
+            case 26:
                 subjId = reader.readUuid("subjId");
 
                 if (!reader.isLastRead())
@@ -576,15 +478,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
 
                 reader.incrementState();
 
-            case 32:
-                syncCommit = reader.readBoolean("syncCommit");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 33:
+            case 27:
                 taskNameHash = reader.readInt("taskNameHash");
 
                 if (!reader.isLastRead())
@@ -592,7 +486,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
 
                 reader.incrementState();
 
-            case 34:
+            case 28:
                 topVer = reader.readMessage("topVer");
 
                 if (!reader.isLastRead())
@@ -612,7 +506,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 35;
+        return 29;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index cd3b0ea..6189b38 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -160,7 +160,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
             if (e instanceof IgniteTxRollbackCheckedException) {
                 if (marked) {
                     try {
-                        tx.rollbackTopLevelTx();
+                        tx.rollback();
                     }
                     catch (IgniteCheckedException ex) {
                         U.error(log, "Failed to automatically rollback transaction: " + tx, ex);

http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index 7387501..1b0566b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@ -310,7 +310,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
                         err = new TransactionRollbackException("Failed to commit transaction.", err);
 
                     try {
-                        tx.finish(err == null);
+                        tx.localFinish(err == null);
                     }
                     catch (IgniteCheckedException e) {
                         if (err != null)
@@ -402,7 +402,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
         }
 
         try {
-            if (tx.finish(commit) || (!commit && tx.state() == UNKNOWN)) {
+            if (tx.localFinish(commit) || (!commit && tx.state() == UNKNOWN)) {
                 if ((tx.onePhaseCommit() && needFinishOnePhase(commit)) || (!tx.onePhaseCommit() && mappings != null)) {
                     if (mappings.single()) {
                         GridDistributedTxMapping mapping = mappings.singleMapping();

http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index 5a5470b..81606d4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -117,7 +117,7 @@ import static org.apache.ignite.transactions.TransactionState.UNKNOWN;
  * Replicated user transaction.
  */
 @SuppressWarnings("unchecked")
-public class GridNearTxLocal extends GridDhtTxLocalAdapter {
+public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoCloseable  {
     /** */
     private static final long serialVersionUID = 0L;
 
@@ -169,6 +169,9 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
     /** */
     private boolean hasRemoteLocks;
 
+    /** If this transaction contains transform entries. */
+    protected boolean transform;
+
     /** */
     @GridToStringExclude
     private TransactionProxyImpl proxy;
@@ -687,7 +690,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
         if (F.isEmpty(map0) && F.isEmpty(invokeMap0)) {
             if (implicit())
                 try {
-                    commitTopLevelTx();
+                    commit();
                 }
                 catch (IgniteCheckedException e) {
                     return new GridFinishedFuture<>(e);
@@ -965,7 +968,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
 
             for (Object key : keys) {
                 if (key == null) {
-                    rollbackTopLevelTx();
+                    rollback();
 
                     throw new NullPointerException("Null key.");
                 }
@@ -1473,7 +1476,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
         if (F.isEmpty(keys0)) {
             if (implicit()) {
                 try {
-                    commitTopLevelTx();
+                    commit();
                 }
                 catch (IgniteCheckedException e) {
                     return new GridFinishedFuture<>(e);
@@ -1604,7 +1607,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
                 // with prepare response, if required.
                 assert loadFut.isDone();
 
-                return nonInterruptable(commitTopLevelTxAsync().chain(new CX1<IgniteInternalFuture<IgniteInternalTx>, GridCacheReturn>() {
+                return nonInterruptable(commitNearTxLocalAsync().chain(new CX1<IgniteInternalFuture<IgniteInternalTx>, GridCacheReturn>() {
                     @Override public GridCacheReturn applyx(IgniteInternalFuture<IgniteInternalTx> txFut)
                         throws IgniteCheckedException {
                         try {
@@ -1614,7 +1617,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
                                 implicitRes.value(), implicitRes.success());
                         }
                         catch (IgniteCheckedException | RuntimeException e) {
-                            rollbackAsync();
+                            rollbackNearTxLocalAsync();
 
                             throw e;
                         }
@@ -2371,7 +2374,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
                 return new GridFinishedFuture<>(e);
             }
 
-            return nonInterruptable(commitTopLevelTxAsync().chain(
+            return nonInterruptable(commitNearTxLocalAsync().chain(
                 new CX1<IgniteInternalFuture<IgniteInternalTx>, GridCacheReturn>() {
                     @Override public GridCacheReturn applyx(IgniteInternalFuture<IgniteInternalTx> txFut)
                         throws IgniteCheckedException {
@@ -2390,7 +2393,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
                         }
                         catch (IgniteCheckedException | RuntimeException e) {
                             if (!(e instanceof NodeStoppingException))
-                                rollbackAsync();
+                                rollbackNearTxLocalAsync();
 
                             throw e;
                         }
@@ -2986,7 +2989,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
 
     /** {@inheritDoc} */
     @SuppressWarnings({"CatchGenericClass", "ThrowableInstanceNeverThrown"})
-    @Override public boolean finish(boolean commit) throws IgniteCheckedException {
+    @Override public boolean localFinish(boolean commit) throws IgniteCheckedException {
         if (log.isDebugEnabled())
             log.debug("Finishing near local tx [tx=" + this + ", commit=" + commit + "]");
 
@@ -3070,7 +3073,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
     /**
      * @return Tx prepare future.
      */
-    public IgniteInternalFuture<?> prepareTopLevelTx() {
+    public IgniteInternalFuture<?> prepareNearTxLocal() {
         GridNearTxPrepareFutureAdapter fut = (GridNearTxPrepareFutureAdapter)prepFut;
 
         if (fut == null) {
@@ -3106,21 +3109,37 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteInternalFuture<?> prepareAsync() {
-        return prepareTopLevelTx();
+    @Override public IgniteInternalFuture<?> salvageTx() {
+        assert false : "Should not be called for GridNearTxLocal";
+
+        return null;
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    public final void prepare() throws IgniteCheckedException {
+        prepareAsync().get();
+    }
+
+    /**
+     * @return Prepare future.
+     */
+    private IgniteInternalFuture<?> prepareAsync() {
+        return prepareNearTxLocal();
     }
 
     /**
      * @throws IgniteCheckedException If failed.
      */
-    public void commitTopLevelTx() throws IgniteCheckedException {
-        commitTopLevelTxAsync().get();
+    public void commit() throws IgniteCheckedException {
+        commitNearTxLocalAsync().get();
     }
 
     /**
      * @return Finish future.
      */
-    public IgniteInternalFuture<IgniteInternalTx> commitTopLevelTxAsync() {
+    public IgniteInternalFuture<IgniteInternalTx> commitNearTxLocalAsync() {
         if (log.isDebugEnabled())
             log.debug("Committing near local tx: " + this);
 
@@ -3136,7 +3155,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
             return new GridFinishedFuture<>((IgniteInternalTx)this);
         }
 
-        final IgniteInternalFuture<?> prepareFut = prepareTopLevelTx();
+        final IgniteInternalFuture<?> prepareFut = prepareNearTxLocal();
 
         GridNearTxFinishFuture fut = commitFut;
 
@@ -3177,20 +3196,20 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
 
     /** {@inheritDoc} */
     @Override public IgniteInternalFuture<IgniteInternalTx> commitAsync() {
-        return commitTopLevelTxAsync();
+        return commitNearTxLocalAsync();
     }
 
     /**
      * @throws IgniteCheckedException If failed.
      */
-    public void rollbackTopLevelTx() throws IgniteCheckedException {
-        rollbackTopLevelTxAsync().get();
+    public void rollback() throws IgniteCheckedException {
+        rollbackNearTxLocalAsync().get();
     }
 
     /**
      * @return Rollback future.
      */
-    public IgniteInternalFuture<IgniteInternalTx> rollbackTopLevelTxAsync() {
+    public IgniteInternalFuture<IgniteInternalTx> rollbackNearTxLocalAsync() {
         if (log.isDebugEnabled())
             log.debug("Rolling back near tx: " + this);
 
@@ -3255,7 +3274,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
 
     /** {@inheritDoc} */
     @Override public IgniteInternalFuture<IgniteInternalTx> rollbackAsync() {
-        return rollbackTopLevelTxAsync();
+        return rollbackNearTxLocalAsync();
     }
 
     /**
@@ -3332,7 +3351,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
             fut.onError(new IgniteTxRollbackCheckedException("Failed to prepare transaction: " + this, e));
 
             try {
-                rollbackTopLevelTx();
+                rollback();
             }
             catch (IgniteTxOptimisticCheckedException e1) {
                 if (log.isDebugEnabled())
@@ -3625,7 +3644,24 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
 
     /** {@inheritDoc} */
     @Override public void close() throws IgniteCheckedException {
-        super.close();
+        TransactionState state = state();
+
+        if (state != ROLLING_BACK && state != ROLLED_BACK && state != COMMITTING && state != COMMITTED)
+            rollback();
+
+        synchronized (this) {
+            try {
+                while (!done())
+                    wait();
+            }
+            catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+
+                if (!done())
+                    throw new IgniteCheckedException("Got interrupted while waiting for transaction to complete: " +
+                        this, e);
+            }
+        }
 
         if (accessMap != null) {
             assert optimistic();
@@ -3651,8 +3687,10 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
         return prepFut;
     }
 
-    /** {@inheritDoc} */
-    @Override public void onRemap(AffinityTopologyVersion topVer) {
+    /**
+     * @param topVer New topology version.
+     */
+    public void onRemap(AffinityTopologyVersion topVer) {
         assert cctx.kernalContext().clientNode();
 
         mapped = false;
@@ -3933,7 +3971,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
 
                 // Commit implicit transactions.
                 if (implicit())
-                    commitTopLevelTx();
+                    commit();
 
                 rollback = false;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
index a0f28c5..f9a6353 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
@@ -38,7 +38,6 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.future.GridCompoundFuture;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteReducer;
@@ -160,8 +159,8 @@ public abstract class GridNearTxPrepareFutureAdapter extends
      *
      * @param txMapping Transaction mapping.
      */
-    protected final void checkOnePhase(GridDhtTxMapping txMapping) {
-        if (tx.storeUsed())
+    final void checkOnePhase(GridDhtTxMapping txMapping) {
+        if (tx.storeWriteThrough())
             return;
 
         Map<UUID, Collection<UUID>> map = txMapping.transactionNodes();

http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
index ffeeb51..5b0807f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
@@ -103,7 +103,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
     public GridNearTxPrepareRequest(
         IgniteUuid futId,
         AffinityTopologyVersion topVer,
-        IgniteInternalTx tx,
+        GridNearTxLocal tx,
         long timeout,
         Collection<IgniteTxEntry> reads,
         Collection<IgniteTxEntry> writes,

http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java
index 4f4be57..c961f6f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java
@@ -115,7 +115,7 @@ public class GridNearTxRemote extends GridDistributedTxRemoteAdapter {
             ctx, 
             nodeId, 
             xidVer,
-            commitVer, 
+            commitVer,
             sys, 
             plc, 
             concurrency, 
@@ -289,7 +289,7 @@ public class GridNearTxRemote extends GridDistributedTxRemoteAdapter {
      *
      * @param key Evicted key.
      */
-    public void addEvicted(IgniteTxKey key) {
+    void addEvicted(IgniteTxKey key) {
         evicted.add(key);
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
index d26696e..6d1e0c0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
@@ -21,7 +21,6 @@ import java.util.Collection;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
-import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
@@ -44,7 +43,7 @@ import org.jetbrains.annotations.Nullable;
 /**
  * Transaction managed by cache ({@code 'Ex'} stands for external).
  */
-public interface IgniteInternalTx extends AutoCloseable {
+public interface IgniteInternalTx {
     /**
      *
      */
@@ -179,27 +178,6 @@ public interface IgniteInternalTx extends AutoCloseable {
     public boolean isRollbackOnly();
 
     /**
-     * Commits this transaction by initiating {@code two-phase-commit} process.
-     *
-     * @throws IgniteCheckedException If commit failed.
-     */
-    public void commit() throws IgniteCheckedException;
-
-    /**
-     * Ends the transaction. Transaction will be rolled back if it has not been committed.
-     *
-     * @throws IgniteCheckedException If transaction could not be gracefully ended.
-     */
-    @Override public void close() throws IgniteCheckedException;
-
-    /**
-     * Rolls back this transaction.
-     *
-     * @throws IgniteCheckedException If rollback failed.
-     */
-    public void rollback() throws IgniteCheckedException;
-
-    /**
      * Removes metadata by key.
      *
      * @param key Key of the metadata to remove.
@@ -242,7 +220,7 @@ public interface IgniteInternalTx extends AutoCloseable {
      * @return {@code True} if transaction is allowed to use store and transactions spans one or more caches with
      *      store enabled.
      */
-    public boolean storeUsed();
+    public boolean storeWriteThrough();
 
     /**
      * Checks if this is system cache transaction. System transactions are isolated from user transactions
@@ -300,11 +278,6 @@ public interface IgniteInternalTx extends AutoCloseable {
     public boolean empty();
 
     /**
-     * @return {@code True} if preparing flag was set with this call.
-     */
-    public boolean markPreparing();
-
-    /**
      * @param status Finalization status to set.
      * @return {@code True} if could mark was set.
      */
@@ -399,11 +372,6 @@ public interface IgniteInternalTx extends AutoCloseable {
     public boolean local();
 
     /**
-     * @return {@code True} if transaction is replicated.
-     */
-    public boolean replicated();
-
-    /**
      * @return Subject ID initiated this transaction.
      */
     public UUID subjectId();
@@ -426,11 +394,6 @@ public interface IgniteInternalTx extends AutoCloseable {
     public boolean user();
 
     /**
-     * @return Transaction write synchronization mode.
-     */
-    public CacheWriteSynchronizationMode syncMode();
-
-    /**
      * @param key Key to check.
      * @return {@code True} if key is present.
      */
@@ -518,18 +481,9 @@ public interface IgniteInternalTx extends AutoCloseable {
     public void commitVersion(GridCacheVersion commitVer);
 
     /**
-     * Prepare state.
-     *
-     * @throws IgniteCheckedException If failed.
-     */
-    public void prepare() throws IgniteCheckedException;
-
-    /**
-     * Prepare stage.
-     *
-     * @return Future for prepare step.
+     * @return Future.
      */
-    public IgniteInternalFuture<?> prepareAsync();
+    @Nullable public IgniteInternalFuture<?> salvageTx();
 
     /**
      * @param endVer End version (a.k.a. <tt>'tnc'</tt> or <tt>'transaction number counter'</tt>)
@@ -663,7 +617,8 @@ public interface IgniteInternalTx extends AutoCloseable {
      * @param committed Committed transactions relative to base.
      * @param rolledback Rolled back transactions relative to base.
      */
-    public void completedVersions(GridCacheVersion base, Collection<GridCacheVersion> committed,
+    public void completedVersions(GridCacheVersion base,
+        Collection<GridCacheVersion> committed,
         Collection<GridCacheVersion> rolledback);
 
     /**
@@ -677,17 +632,6 @@ public interface IgniteInternalTx extends AutoCloseable {
     public boolean onePhaseCommit();
 
     /**
-     * @return {@code True} if transaction has transform entries. This flag will be only set for local
-     *      transactions.
-     */
-    public boolean hasTransforms();
-
-    /**
-     * @param topVer New topology version.
-     */
-    public void onRemap(AffinityTopologyVersion topVer);
-
-    /**
      * @param e Commit error.
      */
     public void commitError(Throwable e);


[2/3] ignite git commit: ignite-4768 txs

Posted by sb...@apache.org.
http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index 98f1140..13ca26a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -39,7 +39,6 @@ import javax.cache.expiry.ExpiryPolicy;
 import javax.cache.processor.EntryProcessor;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
@@ -91,7 +90,6 @@ import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED
 import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
 import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;
 import static org.apache.ignite.transactions.TransactionState.ACTIVE;
-import static org.apache.ignite.transactions.TransactionState.COMMITTED;
 import static org.apache.ignite.transactions.TransactionState.COMMITTING;
 import static org.apache.ignite.transactions.TransactionState.MARKED_ROLLBACK;
 import static org.apache.ignite.transactions.TransactionState.PREPARED;
@@ -192,12 +190,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
     /** */
     protected boolean onePhaseCommit;
 
-    /** */
-    protected CacheWriteSynchronizationMode syncMode;
-
-    /** If this transaction contains transform entries. */
-    protected boolean transform;
-
     /** Commit version. */
     private volatile GridCacheVersion commitVer;
 
@@ -207,9 +199,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
     /** Done marker. */
     protected volatile boolean isDone;
 
-    /** Preparing flag (no need for volatile modifier). */
-    private boolean preparing;
-
     /** */
     @GridToStringInclude
     private Map<Integer, Set<Integer>> invalidParts;
@@ -416,8 +405,8 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
     }
 
     /** {@inheritDoc} */
-    @Override public boolean storeUsed() {
-        return storeEnabled() && txState().storeUsed(cctx);
+    @Override public boolean storeWriteThrough() {
+        return storeEnabled() && txState().storeWriteThrough(cctx);
     }
 
     /**
@@ -508,32 +497,10 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
         }
     }
 
-    /** {@inheritDoc} */
-    @Override public void onRemap(AffinityTopologyVersion topVer) {
-        assert false : this;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean hasTransforms() {
-        return transform;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean markPreparing() {
-        synchronized (this) {
-            if (preparing)
-                return false;
-
-            preparing = true;
-
-            return true;
-        }
-    }
-
     /**
      * @return {@code True} if marked.
      */
-    @Override public boolean markFinalizing(FinalizationStatus status) {
+    @Override public final boolean markFinalizing(FinalizationStatus status) {
         boolean res;
 
         switch (status) {
@@ -625,26 +592,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
     }
 
     /** {@inheritDoc} */
-    @Override public boolean replicated() {
-        return false;
-    }
-
-    /** {@inheritDoc} */
-    @Override public CacheWriteSynchronizationMode syncMode() {
-        if (syncMode != null)
-            return syncMode;
-
-        return txState().syncMode(cctx);
-    }
-
-    /**
-     * @param syncMode Write synchronization mode.
-     */
-    public void syncMode(CacheWriteSynchronizationMode syncMode) {
-        this.syncMode = syncMode;
-    }
-
-    /** {@inheritDoc} */
     @Override public IgniteUuid xid() {
         return xidVer.asGridUuid();
     }
@@ -897,30 +844,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
         }
     }
 
-    /**
-     *
-     */
-    @Override public void close() throws IgniteCheckedException {
-        TransactionState state = state();
-
-        if (state != ROLLING_BACK && state != ROLLED_BACK && state != COMMITTING && state != COMMITTED)
-            rollback();
-
-        synchronized (this) {
-            try {
-                while (!done())
-                    wait();
-            }
-            catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-
-                if (!done())
-                    throw new IgniteCheckedException("Got interrupted while waiting for transaction to complete: " +
-                        this, e);
-            }
-        }
-    }
-
     /** {@inheritDoc} */
     @Override public boolean needsCompletedVersions() {
         return false;
@@ -1176,12 +1099,12 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
     }
 
     /** {@inheritDoc} */
-    @Override public boolean isSystemInvalidate() {
+    @Override public final boolean isSystemInvalidate() {
         return sysInvalidate;
     }
 
     /** {@inheritDoc} */
-    @Override public void systemInvalidate(boolean sysInvalidate) {
+    @Override public final void systemInvalidate(boolean sysInvalidate) {
         this.sysInvalidate = sysInvalidate;
     }
 
@@ -1950,21 +1873,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
         }
 
         /** {@inheritDoc} */
-        @Override public void commit() {
-            throw new IllegalStateException("Deserialized transaction can only be used as read-only.");
-        }
-
-        /** {@inheritDoc} */
-        @Override public void close() {
-            throw new IllegalStateException("Deserialized transaction can only be used as read-only.");
-        }
-
-        /** {@inheritDoc} */
-        @Override public void rollback() {
-            throw new IllegalStateException("Deserialized transaction can only be used as read-only.");
-        }
-
-        /** {@inheritDoc} */
         @Override public boolean activeCachesDeploymentEnabled() {
             return false;
         }
@@ -1995,7 +1903,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
         }
 
         /** {@inheritDoc} */
-        @Override public boolean storeUsed() {
+        @Override public boolean storeWriteThrough() {
             throw new IllegalStateException("Deserialized transaction can only be used as read-only.");
         }
 
@@ -2029,11 +1937,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
         }
 
         /** {@inheritDoc} */
-        @Override public void onRemap(AffinityTopologyVersion topVer) {
-            throw new IllegalStateException("Deserialized transaction can only be used as read-only.");
-        }
-
-        /** {@inheritDoc} */
         @Override public void commitError(Throwable e) {
             throw new IllegalStateException("Deserialized transaction can only be used as read-only.");
         }
@@ -2044,11 +1947,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
         }
 
         /** {@inheritDoc} */
-        @Override public boolean markPreparing() {
-            throw new IllegalStateException("Deserialized transaction can only be used as read-only.");
-        }
-
-        /** {@inheritDoc} */
         @Override public boolean markFinalizing(FinalizationStatus status) {
             throw new IllegalStateException("Deserialized transaction can only be used as read-only.");
         }
@@ -2134,11 +2032,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
         }
 
         /** {@inheritDoc} */
-        @Override public boolean replicated() {
-            return false;
-        }
-
-        /** {@inheritDoc} */
         @Override public UUID subjectId() {
             return null;
         }
@@ -2154,11 +2047,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
         }
 
         /** {@inheritDoc} */
-        @Override public CacheWriteSynchronizationMode syncMode() {
-            return null;
-        }
-
-        /** {@inheritDoc} */
         @Override public boolean hasWriteKey(IgniteTxKey key) {
             return false;
         }
@@ -2236,12 +2124,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
         }
 
         /** {@inheritDoc} */
-        @Override public void prepare() throws IgniteCheckedException {
-            // No-op.
-        }
-
-        /** {@inheritDoc} */
-        @Override public IgniteInternalFuture<IgniteInternalTx> prepareAsync() {
+        @Override public IgniteInternalFuture<?> salvageTx() {
             return null;
         }
 
@@ -2371,11 +2254,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
         }
 
         /** {@inheritDoc} */
-        @Override public boolean hasTransforms() {
-            return false;
-        }
-
-        /** {@inheritDoc} */
         @Override public boolean equals(Object o) {
             return this == o || o instanceof IgniteInternalTx && xid.equals(((IgniteInternalTx)o).xid());
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index 77387b0..4a1e085 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -255,7 +255,7 @@ public class IgniteTxHandler {
             req.last());
 
         if (locTx.isRollbackOnly())
-            locTx.rollbackAsync();
+            locTx.rollbackNearTxLocalAsync();
 
         return fut.chain(new C1<IgniteInternalFuture<GridNearTxPrepareResponse>, GridNearTxPrepareResponse>() {
             @Override public GridNearTxPrepareResponse apply(IgniteInternalFuture<GridNearTxPrepareResponse> f) {
@@ -491,7 +491,7 @@ public class IgniteTxHandler {
 
             if (tx.isRollbackOnly() && !tx.commitOnPrepare()) {
                 if (tx.state() != TransactionState.ROLLED_BACK && tx.state() != TransactionState.ROLLING_BACK)
-                    tx.rollbackAsync();
+                    tx.rollbackDhtLocalAsync();
             }
 
             final GridDhtTxLocal tx0 = tx;
@@ -849,10 +849,11 @@ public class IgniteTxHandler {
             assert req.syncMode() != null : req;
 
             tx.syncMode(req.syncMode());
+            tx.nearFinishFutureId(req.futureId());
+            tx.nearFinishMiniId(req.miniId());
+            tx.storeEnabled(req.storeEnabled());
 
             if (req.commit()) {
-                tx.storeEnabled(req.storeEnabled());
-
                 if (!tx.markFinalizing(USER_FINISH)) {
                     if (log.isDebugEnabled())
                         log.debug("Will not finish transaction (it is handled by another thread): " + tx);
@@ -860,10 +861,7 @@ public class IgniteTxHandler {
                     return null;
                 }
 
-                tx.nearFinishFutureId(req.futureId());
-                tx.nearFinishMiniId(req.miniId());
-
-                IgniteInternalFuture<IgniteInternalTx> commitFut = tx.commitAsync();
+                IgniteInternalFuture<IgniteInternalTx> commitFut = tx.commitDhtLocalAsync();
 
                 // Only for error logging.
                 commitFut.listen(CU.errorLogger(log));
@@ -871,10 +869,7 @@ public class IgniteTxHandler {
                 return commitFut;
             }
             else {
-                tx.nearFinishFutureId(req.futureId());
-                tx.nearFinishMiniId(req.miniId());
-
-                IgniteInternalFuture<IgniteInternalTx> rollbackFut = tx.rollbackAsync();
+                IgniteInternalFuture<IgniteInternalTx> rollbackFut = tx.rollbackDhtLocalAsync();
 
                 // Only for error logging.
                 rollbackFut.listen(CU.errorLogger(log));
@@ -891,7 +886,7 @@ public class IgniteTxHandler {
 
             IgniteInternalFuture<IgniteInternalTx> res;
 
-            IgniteInternalFuture<IgniteInternalTx> rollbackFut = tx.rollbackAsync();
+            IgniteInternalFuture<IgniteInternalTx> rollbackFut = tx.rollbackDhtLocalAsync();
 
             // Only for error logging.
             rollbackFut.listen(CU.errorLogger(log));
@@ -932,7 +927,7 @@ public class IgniteTxHandler {
                 throw e;
 
             if (tx != null)
-                return tx.rollbackAsync();
+                return tx.rollbackNearTxLocalAsync();
 
             return new GridFinishedFuture<>(e);
         }
@@ -1157,7 +1152,7 @@ public class IgniteTxHandler {
 
             if (completeFut != null) {
                 completeFut.listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() {
-                    @Override public void apply(IgniteInternalFuture<IgniteInternalTx> igniteTxIgniteFuture) {
+                    @Override public void apply(IgniteInternalFuture<IgniteInternalTx> fut) {
                         sendReply(nodeId, req, true, nearTxId);
                     }
                 });
@@ -1561,8 +1556,6 @@ public class IgniteTxHandler {
                 assert !F.isEmpty(req.transactionNodes()) :
                     "Received last prepare request with empty transaction nodes: " + req;
 
-                tx.transactionNodes(req.transactionNodes());
-
                 tx.state(PREPARED);
             }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
index bffb295..9417e1f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
@@ -154,13 +154,13 @@ public class IgniteTxImplicitSingleStateImpl extends IgniteTxLocalStateAdapter {
     }
 
     /** {@inheritDoc} */
-    @Override public boolean storeUsed(GridCacheSharedContext cctx) {
+    @Override public boolean storeWriteThrough(GridCacheSharedContext cctx) {
         if (cacheCtx == null)
             return false;
 
         CacheStoreManager store = cacheCtx.store();
 
-        return store.configured();
+        return store.configured() && store.isWriteThrough();
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index d457399..dc4e52f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -30,6 +30,7 @@ import javax.cache.expiry.Duration;
 import javax.cache.expiry.ExpiryPolicy;
 import javax.cache.processor.EntryProcessor;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
@@ -139,6 +140,9 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
     @GridToStringInclude
     protected IgniteTxLocalState txState;
 
+    /** */
+    protected CacheWriteSynchronizationMode syncMode;
+
     /**
      * Empty constructor required for {@link Externalizable}.
      */
@@ -199,6 +203,23 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
         txState = implicitSingle ? new IgniteTxImplicitSingleStateImpl() : new IgniteTxStateImpl();
     }
 
+    /**
+     * @return Transaction write synchronization mode.
+     */
+    public final CacheWriteSynchronizationMode syncMode() {
+        if (syncMode != null)
+            return syncMode;
+
+        return txState().syncMode(cctx);
+    }
+
+    /**
+     * @param syncMode Write synchronization mode.
+     */
+    public void syncMode(CacheWriteSynchronizationMode syncMode) {
+        this.syncMode = syncMode;
+    }
+
     /** {@inheritDoc} */
     @Override public IgniteTxState txState() {
         return txState;
@@ -410,21 +431,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
         }
     }
 
-    /** {@inheritDoc} */
-    @Override public void commit() throws IgniteCheckedException {
-        try {
-            commitAsync().get();
-        }
-        finally {
-            cctx.tm().resetContext();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void prepare() throws IgniteCheckedException {
-        prepareAsync().get();
-    }
-
     /**
      * Checks that locks are in proper state for commit.
      *

http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
index 0cf1d67..307c348 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
@@ -17,15 +17,8 @@
 
 package org.apache.ignite.internal.processors.cache.transactions;
 
-import java.util.Collection;
-import javax.cache.expiry.ExpiryPolicy;
 import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.util.lang.GridInClosure3;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -59,5 +52,5 @@ public interface IgniteTxLocalEx extends IgniteInternalTx {
      * @return {@code True} if state has been changed.
      * @throws IgniteCheckedException If finish failed.
      */
-    public boolean finish(boolean commit) throws IgniteCheckedException;
+    public boolean localFinish(boolean commit) throws IgniteCheckedException;
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index ff4a4e6..af406fe 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -82,7 +82,6 @@ import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteFuture;
-import org.apache.ignite.lang.IgniteProductVersion;
 import org.apache.ignite.lang.IgniteReducer;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.transactions.TransactionConcurrency;
@@ -127,7 +126,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
     /** Slow tx warn timeout (initialized to 0). */
     private static final int SLOW_TX_WARN_TIMEOUT = Integer.getInteger(IGNITE_SLOW_TX_WARN_TIMEOUT, 0);
 
-    /** Tx salvage timeout (default 3s). */
+    /** Tx salvage timeout. */
     private static final int TX_SALVAGE_TIMEOUT = Integer.getInteger(IGNITE_TX_SALVAGE_TIMEOUT, 100);
 
     /** One phase commit deferred ack request timeout. */
@@ -138,9 +137,6 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
     private static final int DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_BUFFER_SIZE =
         Integer.getInteger(IGNITE_DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_BUFFER_SIZE, 256);
 
-    /** Version in which deadlock detection introduced. */
-    public static final IgniteProductVersion TX_DEADLOCK_DETECTION_SINCE = IgniteProductVersion.fromString("1.5.19");
-
     /** Deadlock detection maximum iterations. */
     static int DEADLOCK_MAX_ITERS =
         IgniteSystemProperties.getInteger(IGNITE_TX_DEADLOCK_DETECTION_MAX_ITERS, 1000);
@@ -184,7 +180,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
             PER_SEGMENT_Q);
 
     /** Pending one phase commit ack requests sender. */
-    private GridDeferredAckMessageSender deferredAckMessageSender;
+    private GridDeferredAckMessageSender deferredAckMsgSnd;
 
     /** Transaction finish synchronizer. */
     private GridCacheTxFinishSync txFinishSync;
@@ -216,7 +212,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
 
         txHnd = new IgniteTxHandler(cctx);
 
-        deferredAckMessageSender = new GridDeferredAckMessageSender(cctx.time(), cctx.kernalContext().closure()) {
+        deferredAckMsgSnd = new GridDeferredAckMessageSender(cctx.time(), cctx.kernalContext().closure()) {
             @Override public int getTimeout() {
                 return DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_TIMEOUT;
             }
@@ -256,6 +252,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
 
                     UUID nodeId = discoEvt.eventNode().id();
 
+                    // Wait some time in case there are some unprocessed messages from failed node.
                     cctx.time().addTimeoutObject(new NodeFailureTimeoutObject(nodeId));
 
                     if (txFinishSync != null)
@@ -305,85 +302,35 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
      * Invalidates transaction.
      *
      * @param tx Transaction.
-     * @return {@code True} if transaction was salvaged by this call.
      */
-    public boolean salvageTx(IgniteInternalTx tx) {
-        return salvageTx(tx, false, USER_FINISH);
+    public void salvageTx(IgniteInternalTx tx) {
+        salvageTx(tx, USER_FINISH);
     }
 
     /**
      * Invalidates transaction.
      *
      * @param tx Transaction.
-     * @param warn {@code True} if warning should be logged.
      * @param status Finalization status.
-     * @return {@code True} if transaction was salvaged by this call.
      */
-    private boolean salvageTx(IgniteInternalTx tx, boolean warn, IgniteInternalTx.FinalizationStatus status) {
+    private void salvageTx(IgniteInternalTx tx, IgniteInternalTx.FinalizationStatus status) {
         assert tx != null;
 
         TransactionState state = tx.state();
 
-        if (state == ACTIVE || state == PREPARING || state == PREPARED) {
-            try {
-                if (!tx.markFinalizing(status)) {
-                    if (log.isDebugEnabled())
-                        log.debug("Will not try to commit invalidate transaction (could not mark finalized): " + tx);
-
-                    return false;
-                }
-
-                tx.systemInvalidate(true);
-
-                tx.prepare();
-
-                if (tx.state() == PREPARING) {
-                    if (log.isDebugEnabled())
-                        log.debug("Ignoring transaction in PREPARING state as it is currently handled " +
-                            "by another thread: " + tx);
-
-                    return false;
-                }
-
-                if (tx instanceof IgniteTxRemoteEx) {
-                    IgniteTxRemoteEx rmtTx = (IgniteTxRemoteEx)tx;
-
-                    rmtTx.doneRemote(tx.xidVersion(), Collections.<GridCacheVersion>emptyList(),
-                        Collections.<GridCacheVersion>emptyList(), Collections.<GridCacheVersion>emptyList());
-                }
-
-                tx.commit();
-
-                if (warn) {
-                    // This print out cannot print any peer-deployed entity either
-                    // directly or indirectly.
-                    U.warn(log, "Invalidated transaction because originating node either " +
-                        "crashed or left grid: " + CU.txString(tx));
-                }
-            }
-            catch (IgniteCheckedException ignore) {
+        if (state == ACTIVE || state == PREPARING || state == PREPARED || state == MARKED_ROLLBACK) {
+            if (!tx.markFinalizing(status)) {
                 if (log.isDebugEnabled())
-                    log.debug("Optimistic failure while invalidating transaction (will rollback): " +
-                        tx.xidVersion());
+                    log.debug("Will not try to commit invalidate transaction (could not mark finalized): " + tx);
 
-                try {
-                    tx.rollback();
-                }
-                catch (IgniteCheckedException e) {
-                    U.error(log, "Failed to rollback transaction: " + tx.xidVersion(), e);
-                }
-            }
-        }
-        else if (state == MARKED_ROLLBACK) {
-            try {
-                tx.rollback();
-            }
-            catch (IgniteCheckedException e) {
-                U.error(log, "Failed to rollback transaction: " + tx.xidVersion(), e);
+                return;
             }
-        }
 
-        return true;
+            tx.salvageTx();
+
+            if (log.isDebugEnabled())
+                log.debug("Invalidated transaction because originating node left grid: " + CU.txString(tx));
+        }
     }
 
     /**
@@ -427,7 +374,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
      * @return {@code True} if transaction has been committed or rolled back,
      *      {@code false} otherwise.
      */
-    public boolean isCompleted(IgniteInternalTx tx) {
+    private boolean isCompleted(IgniteInternalTx tx) {
         boolean completed = completedVersHashMap.containsKey(tx.xidVersion());
 
         // Need check that for tx with timeout rollback message was not received before lock.
@@ -1237,7 +1184,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
                 unlockMultiple(tx, tx.readEntries());
 
             // 6. Notify evictions.
-            notifyEvitions(tx);
+            notifyEvictions(tx);
 
             // 7. Remove obsolete entries from cache.
             removeObsolete(tx);
@@ -1310,7 +1257,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
                 unlockMultiple(tx, tx.readEntries());
 
             // 4. Notify evictions.
-            notifyEvitions(tx);
+            notifyEvictions(tx);
 
             // 5. Remove obsolete entries.
             removeObsolete(tx);
@@ -1360,7 +1307,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
 
         if (txIdMap.remove(tx.xidVersion(), tx)) {
             // 1. Notify evictions.
-            notifyEvitions(tx);
+            notifyEvictions(tx);
 
             // 2. Evict near entries.
             if (!tx.readMap().isEmpty()) {
@@ -1396,7 +1343,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
      *
      * @param tx Tx to uncommit.
      */
-    public void uncommitTx(IgniteInternalTx tx) {
+    void uncommitTx(IgniteInternalTx tx) {
         assert tx != null;
 
         if (log.isDebugEnabled())
@@ -1413,15 +1360,16 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
                 unlockMultiple(tx, tx.readEntries());
 
             // 3. Notify evictions.
-            notifyEvitions(tx);
+            notifyEvictions(tx);
 
             // 4. Remove from per-thread storage.
             clearThreadMap(tx);
 
             // 5. Unregister explicit locks.
-            if (!tx.alternateVersions().isEmpty())
+            if (!tx.alternateVersions().isEmpty()) {
                 for (GridCacheVersion ver : tx.alternateVersions())
                     idMap.remove(ver);
+            }
 
             // 6. Remove Near-2-DHT mappings.
             if (tx instanceof GridCacheMappedVersion)
@@ -1477,7 +1425,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
     /**
      * @param tx Transaction to notify evictions for.
      */
-    private void notifyEvitions(IgniteInternalTx tx) {
+    private void notifyEvictions(IgniteInternalTx tx) {
         if (tx.internal())
             return;
 
@@ -2056,43 +2004,27 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
             return;
         }
 
-        if (supportsDeadlockDetection(node)) {
-            TxLocksRequest req = new TxLocksRequest(fut.futureId(), txKeys);
+        TxLocksRequest req = new TxLocksRequest(fut.futureId(), txKeys);
 
-            try {
-                if (!cctx.localNodeId().equals(nodeId))
-                    req.prepareMarshal(cctx);
-
-                cctx.gridIO().sendToGridTopic(node, TOPIC_TX, req, SYSTEM_POOL);
-            }
-            catch (IgniteCheckedException e) {
-                if (e instanceof ClusterTopologyCheckedException) {
-                    if (log.isDebugEnabled())
-                        log.debug("Failed to finish deadlock detection, node left: " + nodeId);
-                }
-                else
-                    U.warn(log, "Failed to finish deadlock detection: " + e, e);
+        try {
+            if (!cctx.localNodeId().equals(nodeId))
+                req.prepareMarshal(cctx);
 
-                fut.onDone();
-            }
+            cctx.gridIO().sendToGridTopic(node, TOPIC_TX, req, SYSTEM_POOL);
         }
-        else {
-            if (log.isDebugEnabled())
-                log.debug("Failed to finish deadlock detection, node does not support deadlock detection: " + node);
+        catch (IgniteCheckedException e) {
+            if (e instanceof ClusterTopologyCheckedException) {
+                if (log.isDebugEnabled())
+                    log.debug("Failed to finish deadlock detection, node left: " + nodeId);
+            }
+            else
+                U.warn(log, "Failed to finish deadlock detection: " + e, e);
 
             fut.onDone();
         }
     }
 
     /**
-     * @param node Node.
-     * @return {@code True} if node supports deadlock detection protocol.
-     */
-    private boolean supportsDeadlockDetection(ClusterNode node) {
-        return TX_DEADLOCK_DETECTION_SINCE.compareToIgnoreTimestamp(node.version()) <= 0;
-    }
-
-    /**
      * @param tx Tx.
      * @param txKeys Tx keys.
      * @return {@code True} if key is involved into tx.
@@ -2263,7 +2195,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
      * @param ver Version to ack.
      */
     public void sendDeferredAckResponse(UUID nodeId, GridCacheVersion ver) {
-        deferredAckMessageSender.sendDeferredAckMessage(nodeId, ver);
+        deferredAckMsgSnd.sendDeferredAckMessage(nodeId, ver);
     }
 
     /**
@@ -2312,9 +2244,9 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
                         ", failedNodeId=" + evtNodeId + ']');
 
                 for (final IgniteInternalTx tx : txs()) {
-                    if ((tx.near() && !tx.local()) || (tx.storeUsed() && tx.masterNodeIds().contains(evtNodeId))) {
+                    if ((tx.near() && !tx.local()) || (tx.storeWriteThrough() && tx.masterNodeIds().contains(evtNodeId))) {
                         // Invalidate transactions.
-                        salvageTx(tx, false, RECOVERY_FINISH);
+                        salvageTx(tx, RECOVERY_FINISH);
                     }
                     else {
                         // Check prepare only if originating node ID failed. Otherwise parent node will finish this tx.

http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java
index 1c2ccbe..3c27bad 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java
@@ -88,7 +88,7 @@ public abstract class IgniteTxRemoteStateAdapter implements IgniteTxRemoteState
     }
 
     /** {@inheritDoc} */
-    @Override public boolean storeUsed(GridCacheSharedContext cctx) {
+    @Override public boolean storeWriteThrough(GridCacheSharedContext cctx) {
         return false;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java
index c121b1b..822e44e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java
@@ -104,7 +104,7 @@ public interface IgniteTxState {
      * @return {@code True} if transaction is allowed to use store and transactions spans one or more caches with
      *      store enabled.
      */
-    public boolean storeUsed(GridCacheSharedContext cctx);
+    public boolean storeWriteThrough(GridCacheSharedContext cctx);
 
     /**
      * @param cctx Context.

http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
index 76751de..399eea3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
@@ -289,14 +289,14 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter {
     }
 
     /** {@inheritDoc} */
-    @Override public boolean storeUsed(GridCacheSharedContext cctx) {
+    @Override public boolean storeWriteThrough(GridCacheSharedContext cctx) {
         if (!activeCacheIds.isEmpty()) {
             for (int i = 0; i < activeCacheIds.size(); i++) {
                 int cacheId = (int)activeCacheIds.get(i);
 
                 CacheStoreManager store = cctx.cacheContext(cacheId).store();
 
-                if (store.configured())
+                if (store.configured() && store.isWriteThrough())
                     return true;
             }
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
index c8c9219..0420182 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
@@ -395,7 +395,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
 
                     dsMap.put(key, seq);
 
-                    tx.commitTopLevelTx();
+                    tx.commit();
 
                     return seq;
                 }
@@ -496,7 +496,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
 
                     dsMap.put(key, a);
 
-                    tx.commitTopLevelTx();
+                    tx.commit();
 
                     return a;
                 }
@@ -560,7 +560,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
 
                     T dataStructure = c.applyx();
 
-                    tx.commitTopLevelTx();
+                    tx.commit();
 
                     return dataStructure;
                 }
@@ -641,7 +641,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
 
                     T rmvInfo = c.applyx();
 
-                    tx.commitTopLevelTx();
+                    tx.commit();
 
                     if (afterRmv != null && rmvInfo != null)
                         afterRmv.applyx(rmvInfo);
@@ -709,7 +709,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
 
                     dsMap.put(key, ref);
 
-                    tx.commitTopLevelTx();
+                    tx.commit();
 
                     return ref;
                 }
@@ -813,7 +813,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
 
                     dsMap.put(key, stmp);
 
-                    tx.commitTopLevelTx();
+                    tx.commit();
 
                     return stmp;
                 }
@@ -1048,7 +1048,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
 
                     T col = c.applyx(cacheCtx);
 
-                    tx.commitTopLevelTx();
+                    tx.commit();
 
                     return col;
                 }
@@ -1162,7 +1162,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
 
                     dsMap.put(key, latch);
 
-                    tx.commitTopLevelTx();
+                    tx.commit();
 
                     return latch;
                 }
@@ -1211,7 +1211,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
 
                         dsView.remove(key);
 
-                        tx.commitTopLevelTx();
+                        tx.commit();
                     }
                     else
                         tx.setRollbackOnly();
@@ -1283,7 +1283,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
 
                     dsMap.put(key, sem0);
 
-                    tx.commitTopLevelTx();
+                    tx.commit();
 
                     return sem0;
                 }
@@ -1329,7 +1329,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
 
                         dsView.remove(key);
 
-                        tx.commitTopLevelTx();
+                        tx.commit();
                     }
                     else
                         tx.setRollbackOnly();
@@ -1401,7 +1401,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
 
                     dsMap.put(key, reentrantLock0);
 
-                    tx.commitTopLevelTx();
+                    tx.commit();
 
                     return reentrantLock0;
                 }
@@ -1448,7 +1448,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
 
                         dsView.remove(key);
 
-                        tx.commitTopLevelTx();
+                        tx.commit();
                     }
                     else
                         tx.setRollbackOnly();
@@ -1481,7 +1481,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
                         if (val != null) {
                             dsView.remove(key);
 
-                            tx.commitTopLevelTx();
+                            tx.commit();
                         }
                         else
                             tx.setRollbackOnly();

http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java
index 9ebea2c..640b72d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java
@@ -102,7 +102,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext
 
                 atomicView.put(key, val);
 
-                tx.commitTopLevelTx();
+                tx.commit();
 
                 return retVal;
             }
@@ -129,7 +129,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext
 
                 atomicView.put(key, val);
 
-                tx.commitTopLevelTx();
+                tx.commit();
 
                 return retVal;
             }
@@ -156,7 +156,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext
 
                 atomicView.put(key, val);
 
-                tx.commitTopLevelTx();
+                tx.commit();
 
                 return retVal;
             }
@@ -183,7 +183,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext
 
                 atomicView.put(key, val);
 
-                tx.commitTopLevelTx();
+                tx.commit();
 
                 return retVal;
             }
@@ -442,7 +442,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext
 
                     atomicView.put(key, val);
 
-                    tx.commitTopLevelTx();
+                    tx.commit();
 
                     return retVal;
                 }
@@ -476,7 +476,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext
 
                     atomicView.put(key, val);
 
-                    tx.commitTopLevelTx();
+                    tx.commit();
 
                     return retVal;
                 }
@@ -510,7 +510,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext
 
                     atomicView.put(key, val);
 
-                    tx.commitTopLevelTx();
+                    tx.commit();
 
                     return retVal;
                 }
@@ -547,7 +547,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext
 
                         atomicView.getAndPut(key, val);
 
-                        tx.commitTopLevelTx();
+                        tx.commit();
                     }
 
                     return retVal;

http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java
index 51568bc..6911b3f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java
@@ -223,7 +223,7 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef
 
                     atomicView.put(key, ref);
 
-                    tx.commitTopLevelTx();
+                    tx.commit();
 
                     return true;
                 }
@@ -265,7 +265,7 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef
 
                         atomicView.getAndPut(key, ref);
 
-                        tx.commitTopLevelTx();
+                        tx.commit();
 
                         return expVal;
                     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
index 2572f19..87aae8b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
@@ -545,7 +545,7 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc
 
                     seqView.put(key, seq);
 
-                    tx.commitTopLevelTx();
+                    tx.commit();
 
                     return curLocVal;
                 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java
index ec1e766..14f80e2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java
@@ -277,7 +277,7 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt
 
                     atomicView.put(key, stmp);
 
-                    tx.commitTopLevelTx();
+                    tx.commit();
 
                     return true;
                 }
@@ -321,7 +321,7 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt
 
                         atomicView.getAndPut(key, stmp);
 
-                        tx.commitTopLevelTx();
+                        tx.commit();
 
                         return true;
                     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
index 03a7fb6..45c3677 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
@@ -292,7 +292,7 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
                                     return new CountDownLatch(0);
                                 }
 
-                                tx.commitTopLevelTx();
+                                tx.commit();
 
                                 return new CountDownLatch(val.get());
                             }
@@ -432,7 +432,7 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
 
                 latchView.put(key, latchVal);
 
-                tx.commitTopLevelTx();
+                tx.commit();
 
                 return retVal;
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java
index a62b656..5f0cb44 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java
@@ -50,7 +50,6 @@ import org.apache.ignite.internal.IgnitionEx;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -561,7 +560,7 @@ public final class GridCacheLockImpl implements GridCacheLockEx, Externalizable
 
                                         lockView.put(key, val);
 
-                                        tx.commitTopLevelTx();
+                                        tx.commit();
 
                                         return true;
                                     }
@@ -629,7 +628,7 @@ public final class GridCacheLockImpl implements GridCacheLockEx, Externalizable
 
                                     lockView.put(key, val);
 
-                                    tx.commitTopLevelTx();
+                                    tx.commit();
 
                                     // Keep track of all threads that are queued in global queue.
                                     // We deliberately don't use #sync.isQueued(), because AQS
@@ -647,7 +646,7 @@ public final class GridCacheLockImpl implements GridCacheLockEx, Externalizable
 
                                         lockView.put(key, val);
 
-                                        tx.commitTopLevelTx();
+                                        tx.commit();
 
                                         sync.waitingThreads.remove(thread.getId());
 
@@ -806,7 +805,7 @@ public final class GridCacheLockImpl implements GridCacheLockEx, Externalizable
 
                                 lockView.put(key, val);
 
-                                tx.commitTopLevelTx();
+                                tx.commit();
 
                                 return true;
                             }
@@ -1099,7 +1098,7 @@ public final class GridCacheLockImpl implements GridCacheLockEx, Externalizable
                                     return null;
                                 }
 
-                                tx.rollbackTopLevelTx();
+                                tx.rollback();
 
                                 return new Sync(val);
                             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
index c3e9218..a1c0515 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
@@ -41,7 +41,6 @@ import org.apache.ignite.internal.IgnitionEx;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -320,7 +319,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
 
                                     semView.put(key, val);
 
-                                    tx.commitTopLevelTx();
+                                    tx.commit();
                                 }
 
                                 return retVal;
@@ -373,7 +372,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
                                 Map<UUID, Integer> map = val.getWaiters();
 
                                 if (!map.containsKey(nodeId)) {
-                                    tx.rollbackTopLevelTx();
+                                    tx.rollback();
 
                                     return false;
                                 }
@@ -391,7 +390,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
 
                                 sync.nodeMap = map;
 
-                                tx.commitTopLevelTx();
+                                tx.commit();
 
                                 return true;
                             }
@@ -472,7 +471,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
 
                                 final boolean failoverSafe = val.isFailoverSafe();
 
-                                tx.commitTopLevelTx();
+                                tx.commit();
 
                                 return new Sync(cnt, waiters, failoverSafe);
                             }
@@ -687,7 +686,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
 
                             int cnt = val.getCount();
 
-                            tx.rollbackTopLevelTx();
+                            tx.rollback();
 
                             return cnt;
                         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java
index 4b2d6cc..846eb69 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java
@@ -70,7 +70,7 @@ public class GridTransactionalCacheQueueImpl<T> extends GridCacheQueueAdapter<T>
                         else
                             retVal = false;
 
-                        tx.commitTopLevelTx();
+                        tx.commit();
 
                         return retVal;
                     }
@@ -106,7 +106,7 @@ public class GridTransactionalCacheQueueImpl<T> extends GridCacheQueueAdapter<T>
                                 retVal = (T)cache.getAndRemove(itemKey(idx));
 
                                 if (retVal == null) { // Possible if data was lost.
-                                    tx.commitTopLevelTx();
+                                    tx.commit();
 
                                     continue;
                                 }
@@ -114,7 +114,7 @@ public class GridTransactionalCacheQueueImpl<T> extends GridCacheQueueAdapter<T>
                             else
                                 retVal = null;
 
-                            tx.commitTopLevelTx();
+                            tx.commit();
 
                             return retVal;
                         }
@@ -164,7 +164,7 @@ public class GridTransactionalCacheQueueImpl<T> extends GridCacheQueueAdapter<T>
                         else
                             retVal = false;
 
-                        tx.commitTopLevelTx();
+                        tx.commit();
 
                         return retVal;
                     }
@@ -197,7 +197,7 @@ public class GridTransactionalCacheQueueImpl<T> extends GridCacheQueueAdapter<T>
                             cache.remove(itemKey(idx));
                         }
 
-                        tx.commitTopLevelTx();
+                        tx.commit();
                     }
 
                     return null;

http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
index dce97c7..acd0a1f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
@@ -661,7 +661,7 @@ public class IgfsDataManager extends IgfsManager {
                                 if (val != null) {
                                     putBlock(fileInfo.blockSize(), key, val);
 
-                                    tx.commitTopLevelTx();
+                                    tx.commit();
                                 }
                                 else {
                                     // File is being concurrently deleted.
@@ -1086,7 +1086,7 @@ public class IgfsDataManager extends IgfsManager {
                     "[key=" + colocatedKey + ", relaxedKey=" + key + ", startOff=" + startOff +
                     ", dataLen=" + data.length + ']');
 
-            tx.commitTopLevelTx();
+            tx.commit();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
index 9ff3d40..77272e1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
@@ -615,7 +615,7 @@ public class IgfsMetaManager extends IgfsManager {
 
                     IgfsEntryInfo newInfo = invokeLock(fileId, del);
 
-                    tx.commitTopLevelTx();
+                    tx.commit();
 
                     return newInfo;
                 }
@@ -1039,7 +1039,7 @@ public class IgfsMetaManager extends IgfsManager {
 
                     transferEntry(srcEntry, srcParentInfo.id(), srcName, dstParentInfo.id(), dstName);
 
-                    tx.commitTopLevelTx();
+                    tx.commit();
 
                     // Fire events.
                     IgfsPath newPath = new IgfsPath(dstPathIds.path(), dstName);
@@ -1172,7 +1172,7 @@ public class IgfsMetaManager extends IgfsManager {
                     // Note that root directory properties and other attributes are preserved:
                     id2InfoPrj.put(IgfsUtils.ROOT_ID, rootInfo.listing(null));
 
-                    tx.commitTopLevelTx();
+                    tx.commit();
 
                     signalDeleteWorker();
 
@@ -1310,7 +1310,7 @@ public class IgfsMetaManager extends IgfsManager {
 
                         transferEntry(parentInfo.listing().get(victimName), parentId, victimName, trashId, trashName);
 
-                        tx.commitTopLevelTx();
+                        tx.commit();
 
                         signalDeleteWorker();
 
@@ -1401,7 +1401,7 @@ public class IgfsMetaManager extends IgfsManager {
                         id2InfoPrj.put(parentId, parentInfo.listing(newListing));
                     }
 
-                    tx.commitTopLevelTx();
+                    tx.commit();
 
                     return res;
                 }
@@ -1454,7 +1454,7 @@ public class IgfsMetaManager extends IgfsManager {
 
                         id2InfoPrj.remove(id);
 
-                        tx.commitTopLevelTx();
+                        tx.commit();
 
                         return true;
                     }
@@ -1519,7 +1519,7 @@ public class IgfsMetaManager extends IgfsManager {
                 try (GridNearTxLocal tx = startTx()) {
                     IgfsEntryInfo info = updatePropertiesNonTx(fileId, props);
 
-                    tx.commitTopLevelTx();
+                    tx.commit();
 
                     return info;
                 }
@@ -1560,7 +1560,7 @@ public class IgfsMetaManager extends IgfsManager {
                     IgfsEntryInfo newInfo =
                         invokeAndGet(fileId, new IgfsMetaFileReserveSpaceProcessor(space, affRange));
 
-                    tx.commitTopLevelTx();
+                    tx.commit();
 
                     return newInfo;
                 }
@@ -1616,7 +1616,7 @@ public class IgfsMetaManager extends IgfsManager {
                         throw fsException("Failed to update file info (file types differ)" +
                             " [oldInfo=" + oldInfo + ", newInfo=" + newInfo + ", proc=" + proc + ']');
 
-                    tx.commitTopLevelTx();
+                    tx.commit();
 
                     return newInfo;
                 }
@@ -1679,7 +1679,7 @@ public class IgfsMetaManager extends IgfsManager {
                             continue;
 
                         // Commit TX.
-                        tx.commitTopLevelTx();
+                        tx.commit();
 
                         generateCreateEvents(res.createdPaths(), false);
 
@@ -1711,7 +1711,7 @@ public class IgfsMetaManager extends IgfsManager {
                 try (GridNearTxLocal tx = startTx()) {
                     Object prev = val != null ? metaCache.getAndPut(sampling, val) : metaCache.getAndRemove(sampling);
 
-                    tx.commitTopLevelTx();
+                    tx.commit();
 
                     return !F.eq(prev, val);
                 }
@@ -2756,7 +2756,7 @@ public class IgfsMetaManager extends IgfsManager {
                     }
                 }
 
-                tx.commitTopLevelTx();
+                tx.commit();
             }
             catch (IgniteCheckedException e) {
                 if (!finished) {
@@ -2839,7 +2839,7 @@ public class IgfsMetaManager extends IgfsManager {
                                 modificationTime == -1 ? targetInfo.modificationTime() : modificationTime)
                             );
 
-                            tx.commitTopLevelTx();
+                            tx.commit();
 
                             return;
                         }
@@ -2948,7 +2948,7 @@ public class IgfsMetaManager extends IgfsManager {
                             // At this point we can open the stream safely.
                             info = invokeLock(info.id(), false);
 
-                            tx.commitTopLevelTx();
+                            tx.commit();
 
                             IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, EventType.EVT_IGFS_FILE_OPENED_WRITE);
 
@@ -2963,7 +2963,7 @@ public class IgfsMetaManager extends IgfsManager {
                                 continue;
 
                             // Commit.
-                            tx.commitTopLevelTx();
+                            tx.commit();
 
                             // Generate events.
                             generateCreateEvents(res.createdPaths(), true);
@@ -3103,7 +3103,7 @@ public class IgfsMetaManager extends IgfsManager {
                                     newBlockSize, affKey, newLockId, evictExclude, newLen));
 
                             // Prepare result and commit.
-                            tx.commitTopLevelTx();
+                            tx.commit();
 
                             IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, EventType.EVT_IGFS_FILE_OPENED_WRITE);
 
@@ -3131,7 +3131,7 @@ public class IgfsMetaManager extends IgfsManager {
                                 continue;
 
                             // Commit.
-                            tx.commitTopLevelTx();
+                            tx.commit();
 
                             // Generate events.
                             generateCreateEvents(res.createdPaths(), true);

http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index 280817c..a680a88 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -1042,7 +1042,7 @@ public class GridServiceProcessor extends GridProcessorAdapter {
 
                 cache.put(key, assigns);
 
-                tx.commitTopLevelTx();
+                tx.commit();
 
                 break;
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java b/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java
index 53e6add..c4d8a79 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java
@@ -88,6 +88,8 @@ public class TestRecordingCommunicationSpi extends TcpCommunicationSpi {
 
                     blockedMsgs.add(new T2<>(node, ioMsg));
 
+                    notifyAll();
+
                     return;
                 }
             }
@@ -137,6 +139,33 @@ public class TestRecordingCommunicationSpi extends TcpCommunicationSpi {
     }
 
     /**
+     * @param cls Message class.
+     * @param nodeName Node name.
+     * @throws InterruptedException If interrupted.
+     */
+    public void waitForMessage(Class<?> cls, String nodeName) throws InterruptedException {
+        synchronized (this) {
+            while (!hasMessage(cls, nodeName))
+                wait();
+        }
+    }
+
+    /**
+     * @param cls Message class.
+     * @param nodeName Node name.
+     * @return {@code True} if has blocked message.
+     */
+    private boolean hasMessage(Class<?> cls, String nodeName) {
+        for (T2<ClusterNode, GridIoMessage> msg : blockedMsgs) {
+            if (msg.get2().message().getClass() == cls &&
+                nodeName.equals(msg.get1().attribute(ATTR_IGNITE_INSTANCE_NAME)))
+                return true;
+        }
+
+        return false;
+    }
+
+    /**
      * @param blockP Message block predicate.
      */
     public void blockMessages(IgnitePredicate<GridIoMessage> blockP) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxConfigCacheSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxConfigCacheSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxConfigCacheSelfTest.java
index 84e439f..4fd4989 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxConfigCacheSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxConfigCacheSelfTest.java
@@ -217,7 +217,7 @@ public class IgniteTxConfigCacheSelfTest extends GridCommonAbstractTest {
 
             cache.put("key", "val");
 
-            tx.commitTopLevelTx();
+            tx.commit();
         }
 
         assert cache.containsKey("key");

http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSystemTransactionsSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSystemTransactionsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSystemTransactionsSelfTest.java
index f79c3e7..f821a45 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSystemTransactionsSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSystemTransactionsSelfTest.java
@@ -18,9 +18,7 @@
 package org.apache.ignite.internal.processors.cache.distributed;
 
 import java.util.Map;
-import java.util.concurrent.Callable;
 import org.apache.ignite.IgniteCache;
-import org.apache.ignite.IgniteException;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.processors.cache.GridCacheAbstractSelfTest;
@@ -28,12 +26,10 @@ import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.transactions.Transaction;
 
 import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
@@ -92,7 +88,7 @@ public class IgniteCacheSystemTransactionsSelfTest extends GridCacheAbstractSelf
 
                 utilityCache.getAndPut("3", "3");
 
-                itx.commitTopLevelTx();
+                itx.commit();
             }
 
             jcache.put("2", "22");

http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxOriginatingNodeFailureAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxOriginatingNodeFailureAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxOriginatingNodeFailureAbstractSelfTest.java
index 65fa7e0..91e3b26 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxOriginatingNodeFailureAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxOriginatingNodeFailureAbstractSelfTest.java
@@ -176,7 +176,7 @@ public abstract class IgniteTxOriginatingNodeFailureAbstractSelfTest extends Gri
                 cache.putAll(map);
 
                 try {
-                    txEx.prepareTopLevelTx().get(3, TimeUnit.SECONDS);
+                    txEx.prepareNearTxLocal().get(3, TimeUnit.SECONDS);
                 }
                 catch (IgniteFutureTimeoutCheckedException ignored) {
                     info("Failed to wait for prepare future completion: " + partial);

http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxPessimisticOriginatingNodeFailureAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxPessimisticOriginatingNodeFailureAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxPessimisticOriginatingNodeFailureAbstractSelfTest.java
index 3c1ae8e..4997b20 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxPessimisticOriginatingNodeFailureAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxPessimisticOriginatingNodeFailureAbstractSelfTest.java
@@ -42,7 +42,7 @@ import org.apache.ignite.internal.managers.communication.GridIoMessage;
 import org.apache.ignite.internal.processors.cache.GridCacheAbstractSelfTest;
 import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager;
 import org.apache.ignite.internal.processors.cache.transactions.TransactionProxyImpl;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
@@ -349,7 +349,7 @@ public abstract class IgniteTxPessimisticOriginatingNodeFailureAbstractSelfTest
 
             TransactionProxyImpl txProxy = (TransactionProxyImpl)tx;
 
-            IgniteInternalTx txEx = txProxy.tx();
+            GridNearTxLocal txEx = txProxy.tx();
 
             assertTrue(txEx.pessimistic());
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/2272fadc/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java
index 97385ab..7ca3914 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java
@@ -215,7 +215,7 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends
 
             commSpi.blockMessages(ignite(2).cluster().localNode().id()); // Do not allow to finish prepare for key2.
 
-            IgniteInternalFuture<?> prepFut = txEx.prepareTopLevelTx();
+            IgniteInternalFuture<?> prepFut = txEx.prepareNearTxLocal();
 
             waitPrepared(ignite(1));
 
@@ -376,7 +376,7 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends
 
         commSpi.blockMessages(ignite(2).cluster().localNode().id()); // Do not allow to finish prepare for key2.
 
-        IgniteInternalFuture<?> prepFut = txEx.prepareTopLevelTx();
+        IgniteInternalFuture<?> prepFut = txEx.prepareNearTxLocal();
 
         waitPrepared(ignite(1));