You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2018/04/20 17:45:58 UTC

[5/6] ignite git commit: IGNITE-7910 Transaction rollback on PME

IGNITE-7910 Transaction rollback on PME


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ee5fbce3
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ee5fbce3
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ee5fbce3

Branch: refs/heads/ignite-2.5.1.b5
Commit: ee5fbce35f95ede8f115a21a785adccb7f85aca6
Parents: 553e0ff
Author: Alexey Goncharuk <al...@gmail.com>
Authored: Fri Apr 20 20:18:33 2018 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Fri Apr 20 20:18:34 2018 +0300

----------------------------------------------------------------------
 .../ignite/tests/utils/TestTransaction.java     |   5 +
 .../java/org/apache/ignite/IgniteCluster.java   |   7 +
 .../org/apache/ignite/IgniteTransactions.java   |  23 +-
 .../configuration/TransactionConfiguration.java |  41 +
 .../ignite/internal/TransactionsMXBeanImpl.java | 162 ++++
 .../cluster/IgniteClusterAsyncImpl.java         |   5 +
 .../internal/cluster/IgniteClusterImpl.java     |  15 +
 .../ignite/internal/commandline/Arguments.java  |  16 +-
 .../ignite/internal/commandline/Command.java    |   5 +-
 .../internal/commandline/CommandHandler.java    | 345 ++++++-
 .../optimized/OptimizedMarshallerUtils.java     |   2 +-
 .../processors/cache/GridCacheAdapter.java      |  88 +-
 .../GridCachePartitionExchangeManager.java      |  29 +-
 .../processors/cache/GridCacheProcessor.java    | 103 ++-
 .../cache/GridCacheSharedContext.java           |  19 +-
 .../processors/cache/GridCacheUtils.java        |   4 +-
 ...eoutOnPartitionMapExchangeChangeMessage.java | 128 +++
 ...TimeoutOnPartitionMapExchangeChangeTask.java |  57 ++
 .../distributed/GridCacheTxFinishSync.java      |   3 +-
 .../distributed/GridDistributedTxMapping.java   |   2 +-
 .../GridDistributedTxRemoteAdapter.java         |  23 +-
 .../distributed/dht/GridDhtLockFuture.java      | 363 ++++----
 .../dht/GridDhtTransactionalCacheAdapter.java   |  48 +-
 .../cache/distributed/dht/GridDhtTxLocal.java   |  29 +-
 .../distributed/dht/GridDhtTxLocalAdapter.java  | 187 +++-
 .../distributed/dht/GridDhtTxPrepareFuture.java |   2 +-
 .../dht/GridDhtTxPrepareRequest.java            |  36 +-
 .../dht/colocated/GridDhtColocatedCache.java    |   4 +-
 .../colocated/GridDhtColocatedLockFuture.java   | 121 ++-
 .../GridDhtPartitionsExchangeFuture.java        |  25 +-
 .../distributed/near/GridNearLockFuture.java    | 115 ++-
 ...arOptimisticSerializableTxPrepareFuture.java |   6 +-
 .../near/GridNearOptimisticTxPrepareFuture.java |   3 +-
 .../near/GridNearTxFastFinishFuture.java        |  12 +-
 .../near/GridNearTxFinishFuture.java            |  96 +-
 .../cache/distributed/near/GridNearTxLocal.java | 395 +++++---
 .../cache/local/GridLocalLockFuture.java        |  80 +-
 .../GridCacheDatabaseSharedManager.java         |   1 -
 .../store/GridCacheStoreManagerAdapter.java     |   5 +
 .../transactions/IgniteTransactionsImpl.java    |  36 +-
 .../cache/transactions/IgniteTxAdapter.java     |  19 +-
 .../cache/transactions/IgniteTxHandler.java     |   7 +-
 .../transactions/IgniteTxLocalAdapter.java      |  20 +-
 .../cache/transactions/IgniteTxManager.java     | 121 ++-
 .../transactions/TransactionProxyImpl.java      |   9 +
 .../TransactionProxyRollbackOnlyImpl.java       |  80 ++
 .../internal/util/GridPartitionStateMap.java    |   2 +-
 .../ignite/internal/visor/tx/VisorTxInfo.java   | 130 +++
 .../internal/visor/tx/VisorTxOperation.java     |  43 +
 .../internal/visor/tx/VisorTxProjection.java    |  42 +
 .../internal/visor/tx/VisorTxSortOrder.java     |  55 ++
 .../ignite/internal/visor/tx/VisorTxTask.java   | 248 +++++
 .../internal/visor/tx/VisorTxTaskArg.java       | 205 +++++
 .../internal/visor/tx/VisorTxTaskResult.java    |  80 ++
 .../ignite/mxbean/TransactionsMXBean.java       |  99 ++
 .../apache/ignite/transactions/Transaction.java |  18 +-
 .../resources/META-INF/classnames.properties    |   7 +
 .../internal/TestRecordingCommunicationSpi.java |   3 +-
 .../internal/TransactionsMXBeanImplTest.java    | 118 +++
 .../commandline/CommandHandlerParsingTest.java  | 100 ++
 .../SetTxTimeoutOnPartitionMapExchangeTest.java | 166 ++++
 .../CacheLateAffinityAssignmentTest.java        |   5 -
 .../IgniteTxRemoveTimeoutObjectsNearTest.java   |  30 +
 .../IgniteTxRemoveTimeoutObjectsTest.java       |   6 +
 ...ePrimaryNodeFailureRecoveryAbstractTest.java |   2 +-
 .../dht/IgniteCacheTxRecoveryRollbackTest.java  |   2 +-
 .../near/GridCacheNearTxMultiNodeSelfTest.java  |   2 +-
 .../GridCachePartitionedTxSalvageSelfTest.java  |   8 +-
 .../GridCacheLocalTxMultiThreadedSelfTest.java  |   2 +-
 .../cache/transactions/TxLabelTest.java         |  63 ++
 .../transactions/TxMultiCacheAsyncOpsTest.java  | 136 +++
 ...OptimisticPrepareOnUnstableTopologyTest.java | 250 +++++
 .../TxPessimisticDeadlockDetectionTest.java     |   2 +-
 .../TxRollbackAsyncNearCacheTest.java           |  28 +
 .../cache/transactions/TxRollbackAsyncTest.java | 919 +++++++++++++++++++
 .../TxRollbackAsyncWithPersistenceTest.java     |  59 ++
 ...ollbackOnTimeoutNoDeadlockDetectionTest.java |   6 +-
 .../transactions/TxRollbackOnTimeoutTest.java   | 118 ++-
 .../TxRollbackOnTopologyChangeTest.java         | 228 +++++
 .../igfs/IgfsDataManagerSelfTest.java           |   6 +-
 .../cache/GridAbstractCacheStoreSelfTest.java   |   7 +-
 .../multijvm/IgniteClusterProcessProxy.java     |   5 +
 .../ignite/testsuites/IgniteBasicTestSuite.java |   4 +
 .../testsuites/IgniteCacheTestSuite3.java       |   2 +
 .../testsuites/IgniteCacheTestSuite6.java       |  18 +
 .../ignite/util/GridCommandHandlerTest.java     | 247 +++++
 .../processors/cache/jta/CacheJtaManager.java   |   3 +-
 .../ApiParity/ClusterParityTest.cs              |   1 +
 .../ApiParity/TransactionsParityTest.cs         |  13 +-
 .../Cache/CacheAbstractTransactionalTest.cs     |  18 +-
 90 files changed, 5599 insertions(+), 809 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/ee5fbce3/modules/cassandra/store/src/test/java/org/apache/ignite/tests/utils/TestTransaction.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/utils/TestTransaction.java b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/utils/TestTransaction.java
index e587bd7..be2211f 100644
--- a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/utils/TestTransaction.java
+++ b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/utils/TestTransaction.java
@@ -147,6 +147,11 @@ public class TestTransaction implements Transaction {
     }
 
     /** {@inheritDoc} */
+    @Nullable @Override public String label() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
     @Override public void resume() throws IgniteException {
         // No-op.
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/ee5fbce3/modules/core/src/main/java/org/apache/ignite/IgniteCluster.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteCluster.java b/modules/core/src/main/java/org/apache/ignite/IgniteCluster.java
index 7329d68..b501333 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteCluster.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteCluster.java
@@ -421,6 +421,13 @@ public interface IgniteCluster extends ClusterGroup, IgniteAsyncSupport {
     public void enableStatistics(Collection<String> caches, boolean enabled);
 
     /**
+     * Sets transaction timeout on partition map exchange.
+     *
+     * @param timeout Transaction timeout on partition map exchange in milliseconds.
+     */
+    public void setTxTimeoutOnPartitionMapExchange(long timeout);
+
+    /**
      * If local client node disconnected from cluster returns future
      * that will be completed when client reconnected.
      *

http://git-wip-us.apache.org/repos/asf/ignite/blob/ee5fbce3/modules/core/src/main/java/org/apache/ignite/IgniteTransactions.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteTransactions.java b/modules/core/src/main/java/org/apache/ignite/IgniteTransactions.java
index dfe6a1a..2bb7101 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteTransactions.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteTransactions.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite;
 
+import java.util.Collection;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.configuration.TransactionConfiguration;
 import org.apache.ignite.transactions.Transaction;
@@ -103,4 +104,24 @@ public interface IgniteTransactions {
      * Resets transaction metrics.
      */
     public void resetMetrics();
-}
\ No newline at end of file
+
+    /**
+     * Returns a list of active transactions initiated by this node.
+     * <p>
+     * Note: returned transaction handle will only support getters, {@link Transaction#close()},
+     * {@link Transaction#rollback()}, {@link Transaction#rollbackAsync()} methods.
+     * Trying to invoke other methods will lead to UnsupportedOperationException.
+     *
+     * @return Transactions started on local node.
+     */
+    public Collection<Transaction> localActiveTransactions();
+
+    /**
+     * Returns instance of Ignite Transactions to mark a transaction with a special label.
+     *
+     * @param lb label.
+     * @return {@code This} for chaining.
+     * @throws NullPointerException if label is null.
+     */
+    public IgniteTransactions withLabel(String lb);
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/ee5fbce3/modules/core/src/main/java/org/apache/ignite/configuration/TransactionConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/TransactionConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/TransactionConfiguration.java
index 0063afc..818f823 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/TransactionConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/TransactionConfiguration.java
@@ -42,6 +42,9 @@ public class TransactionConfiguration implements Serializable {
     /** Default transaction timeout. */
     public static final long DFLT_TRANSACTION_TIMEOUT = 0;
 
+    /** Transaction timeout on partition map synchronization. */
+    public static final long TX_TIMEOUT_ON_PARTITION_MAP_EXCHANGE = 0;
+
     /** Default size of pessimistic transactions log. */
     public static final int DFLT_PESSIMISTIC_TX_LOG_LINGER = 10_000;
 
@@ -57,6 +60,9 @@ public class TransactionConfiguration implements Serializable {
     /** Default transaction timeout. */
     private long dfltTxTimeout = DFLT_TRANSACTION_TIMEOUT;
 
+    /** Transaction timeout on partition map exchange. */
+    private volatile long txTimeoutOnPartitionMapExchange = TX_TIMEOUT_ON_PARTITION_MAP_EXCHANGE;
+
     /** Pessimistic tx log size. */
     private int pessimisticTxLogSize;
 
@@ -89,6 +95,7 @@ public class TransactionConfiguration implements Serializable {
         dfltConcurrency = cfg.getDefaultTxConcurrency();
         dfltIsolation = cfg.getDefaultTxIsolation();
         dfltTxTimeout = cfg.getDefaultTxTimeout();
+        txTimeoutOnPartitionMapExchange = cfg.getTxTimeoutOnPartitionMapExchange();
         pessimisticTxLogLinger = cfg.getPessimisticTxLogLinger();
         pessimisticTxLogSize = cfg.getPessimisticTxLogSize();
         txSerEnabled = cfg.isTxSerializableEnabled();
@@ -192,6 +199,40 @@ public class TransactionConfiguration implements Serializable {
     }
 
     /**
+     * Some Ignite operations provoke partition map exchange process within Ignite to ensure the partitions distribution
+     * state is synchronized cluster-wide. Topology update events and a start of a new distributed cache are examples
+     * of those operations.
+     * <p>
+     * When the partition map exchange starts, Ignite acquires a global lock at a particular stage. The lock can't be
+     * obtained until pending transactions are running in parallel. If there is a transaction that runs for a while,
+     * then it will prevent the partition map exchange process from the start freezing some operations such as a new
+     * node join process.
+     * <p>
+     * This property allows to rollback such long transactions to let Ignite acquire the lock faster and initiate the
+     * partition map exchange process. The timeout is enforced only at the time of the partition map exchange process.
+     * <p>
+     * If not set, default value is {@link #TX_TIMEOUT_ON_PARTITION_MAP_EXCHANGE} which means transactions will never be
+     * rolled back on partition map exchange.
+     *
+     * @return Transaction timeout for partition map synchronization in milliseconds.
+     */
+    public long getTxTimeoutOnPartitionMapExchange() {
+        return txTimeoutOnPartitionMapExchange;
+    }
+
+    /**
+     * Sets the transaction timeout that will be enforced if the partition map exchange process starts.
+     *
+     * @param txTimeoutOnPartitionMapExchange Transaction timeout value in milliseconds.
+     * @return {@code this} for chaining.
+     */
+    public TransactionConfiguration setTxTimeoutOnPartitionMapExchange(long txTimeoutOnPartitionMapExchange) {
+        this.txTimeoutOnPartitionMapExchange = txTimeoutOnPartitionMapExchange;
+
+        return this;
+    }
+
+    /**
      * Gets size of pessimistic transactions log stored on node in order to recover transaction commit if originating
      * node has left grid before it has sent all messages to transaction nodes.
      * <p>

http://git-wip-us.apache.org/repos/asf/ignite/blob/ee5fbce3/modules/core/src/main/java/org/apache/ignite/internal/TransactionsMXBeanImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/TransactionsMXBeanImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/TransactionsMXBeanImpl.java
new file mode 100644
index 0000000..6937ebd
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/TransactionsMXBeanImpl.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.ignite.IgniteCompute;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.internal.visor.VisorTaskArgument;
+import org.apache.ignite.internal.visor.tx.VisorTxInfo;
+import org.apache.ignite.internal.visor.tx.VisorTxOperation;
+import org.apache.ignite.internal.visor.tx.VisorTxProjection;
+import org.apache.ignite.internal.visor.tx.VisorTxSortOrder;
+import org.apache.ignite.internal.visor.tx.VisorTxTask;
+import org.apache.ignite.internal.visor.tx.VisorTxTaskArg;
+import org.apache.ignite.internal.visor.tx.VisorTxTaskResult;
+import org.apache.ignite.lang.IgniteClosure;
+import org.apache.ignite.mxbean.TransactionsMXBean;
+
+/**
+ * TransactionsMXBean implementation.
+ */
+public class TransactionsMXBeanImpl implements TransactionsMXBean {
+    /** */
+    private final GridKernalContextImpl ctx;
+
+    /**
+     * @param ctx Context.
+     */
+    TransactionsMXBeanImpl(GridKernalContextImpl ctx) {
+        this.ctx = ctx;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String getActiveTransactions(Long minDuration, Integer minSize, String prj, String consistentIds,
+        String xid, String lbRegex, Integer limit, String order, boolean detailed, boolean kill) {
+        try {
+            IgniteCompute compute = ctx.cluster().get().compute();
+
+            VisorTxProjection proj = null;
+
+            if (prj != null) {
+                if ("clients".equals(prj))
+                    proj = VisorTxProjection.CLIENT;
+                else if ("servers".equals(prj))
+                    proj = VisorTxProjection.SERVER;
+            }
+
+            List<String> consIds = null;
+
+            if (consistentIds != null)
+                consIds = Arrays.stream(consistentIds.split(",")).collect(Collectors.toList());
+
+            VisorTxSortOrder sortOrder = null;
+
+            if (order != null) {
+                if ("DURATION".equals(order))
+                    sortOrder = VisorTxSortOrder.DURATION;
+                else if ("SIZE".equals(order))
+                    sortOrder = VisorTxSortOrder.SIZE;
+            }
+
+            VisorTxTaskArg arg = new VisorTxTaskArg(kill ? VisorTxOperation.KILL : VisorTxOperation.LIST,
+                limit, minDuration == null ? null : minDuration * 1000, minSize, null, proj, consIds, xid, lbRegex, sortOrder);
+
+            Map<ClusterNode, VisorTxTaskResult> res = compute.execute(new VisorTxTask(),
+                new VisorTaskArgument<>(ctx.cluster().get().localNode().id(), arg, false));
+
+            if (detailed) {
+                StringWriter sw = new StringWriter();
+
+                PrintWriter w = new PrintWriter(sw);
+
+                for (Map.Entry<ClusterNode, VisorTxTaskResult> entry : res.entrySet()) {
+                    if (entry.getValue().getInfos().isEmpty())
+                        continue;
+
+                    ClusterNode key = entry.getKey();
+
+                    w.println(key.toString());
+
+                    for (VisorTxInfo info : entry.getValue().getInfos())
+                        w.println("    Tx: [xid=" + info.getXid() +
+                            ", label=" + info.getLabel() +
+                            ", state=" + info.getState() +
+                            ", duration=" + info.getDuration() / 1000 +
+                            ", isolation=" + info.getIsolation() +
+                            ", concurrency=" + info.getConcurrency() +
+                            ", timeout=" + info.getTimeout() +
+                            ", size=" + info.getSize() +
+                            ", dhtNodes=" + F.transform(info.getPrimaryNodes(), new IgniteClosure<UUID, String>() {
+                            @Override public String apply(UUID id) {
+                                return U.id8(id);
+                            }
+                        }) +
+                            ']');
+                }
+
+                w.flush();
+
+                return sw.toString();
+            }
+            else {
+                int cnt = 0;
+
+                for (VisorTxTaskResult result : res.values())
+                    cnt += result.getInfos().size();
+
+                return Integer.toString(cnt);
+            }
+        }
+        catch (Exception e) {
+            throw new RuntimeException(e.getMessage());
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getTxTimeoutOnPartitionMapExchange() {
+        return ctx.config().getTransactionConfiguration().getTxTimeoutOnPartitionMapExchange();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setTxTimeoutOnPartitionMapExchange(long timeout) {
+        try {
+            ctx.grid().context().cache().setTxTimeoutOnPartitionMapExchange(timeout);
+        }
+        catch (Exception e) {
+            throw new RuntimeException(e.getMessage());
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(TransactionsMXBeanImpl.class, this);
+    }
+}
+
+

http://git-wip-us.apache.org/repos/asf/ignite/blob/ee5fbce3/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterAsyncImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterAsyncImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterAsyncImpl.java
index 43e97b5..98fb8ce 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterAsyncImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterAsyncImpl.java
@@ -172,6 +172,11 @@ public class IgniteClusterAsyncImpl extends AsyncSupportAdapter<IgniteCluster>
     }
 
     /** {@inheritDoc} */
+    @Override public void setTxTimeoutOnPartitionMapExchange(long timeout) {
+        cluster.setTxTimeoutOnPartitionMapExchange(timeout);
+    }
+
+    /** {@inheritDoc} */
     @Override public Ignite ignite() {
         return cluster.ignite();
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/ee5fbce3/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java
index b69923b..e28342c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java
@@ -502,6 +502,21 @@ public class IgniteClusterImpl extends ClusterGroupAdapter implements IgniteClus
     }
 
     /** {@inheritDoc} */
+    @Override public void setTxTimeoutOnPartitionMapExchange(long timeout) {
+        guard();
+
+        try {
+            ctx.cache().setTxTimeoutOnPartitionMapExchange(timeout);
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+        finally {
+            unguard();
+        }
+    }
+
+    /** {@inheritDoc} */
     @Override public IgniteCluster withAsync() {
         return new IgniteClusterAsyncImpl(this);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/ee5fbce3/modules/core/src/main/java/org/apache/ignite/internal/commandline/Arguments.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/commandline/Arguments.java b/modules/core/src/main/java/org/apache/ignite/internal/commandline/Arguments.java
index 23c8eec..660edf4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/commandline/Arguments.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/commandline/Arguments.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.commandline;
 
+import org.apache.ignite.internal.visor.tx.VisorTxTaskArg;
+
 /**
  * Bean with all parsed and validated arguments.
  */
@@ -49,6 +51,9 @@ public class Arguments {
      */
     private String baselineArgs;
 
+    /** Transaction arguments. */
+    private final VisorTxTaskArg txArg;
+
     /**
      * @param cmd Command.
      * @param host Host.
@@ -57,11 +62,12 @@ public class Arguments {
      * @param pwd Password.
      * @param baselineAct Baseline action.
      * @param baselineArgs Baseline args.
+     * @param txArg TX arg.
      * @param force Force flag.
      */
     public Arguments(Command cmd, String host, String port, String user, String pwd,
         String baselineAct, String baselineArgs,
-        boolean force
+        VisorTxTaskArg txArg, boolean force
     ) {
         this.cmd = cmd;
         this.host = host;
@@ -71,6 +77,7 @@ public class Arguments {
         this.baselineAct = baselineAct;
         this.baselineArgs = baselineArgs;
         this.force = force;
+        this.txArg = txArg;
     }
 
     /**
@@ -123,6 +130,13 @@ public class Arguments {
     }
 
     /**
+     * @return Transaction arguments.
+     */
+    public VisorTxTaskArg transactionArguments() {
+        return txArg;
+    }
+
+    /**
      * @return Force option.
      */
     public boolean force() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/ee5fbce3/modules/core/src/main/java/org/apache/ignite/internal/commandline/Command.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/commandline/Command.java b/modules/core/src/main/java/org/apache/ignite/internal/commandline/Command.java
index e73a24f..c8c7db5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/commandline/Command.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/commandline/Command.java
@@ -31,7 +31,10 @@ public enum Command {
     STATE("--state"),
 
     /** */
-    BASELINE("--baseline");
+    BASELINE("--baseline"),
+
+    /** */
+    TX("--tx");
 
     /** */
     private final String text;

http://git-wip-us.apache.org/repos/asf/ignite/blob/ee5fbce3/modules/core/src/main/java/org/apache/ignite/internal/commandline/CommandHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/commandline/CommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/commandline/CommandHandler.java
index 5993f59..ba267d8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/commandline/CommandHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/commandline/CommandHandler.java
@@ -24,6 +24,11 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Scanner;
+import java.util.UUID;
+import java.util.logging.Logger;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.client.GridClient;
 import org.apache.ignite.internal.client.GridClientAuthenticationException;
 import org.apache.ignite.internal.client.GridClientClosedException;
@@ -39,15 +44,23 @@ import org.apache.ignite.internal.client.GridServerUnreachableException;
 import org.apache.ignite.internal.client.impl.connection.GridClientConnectionResetException;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.internal.visor.VisorTaskArgument;
 import org.apache.ignite.internal.visor.baseline.VisorBaselineNode;
 import org.apache.ignite.internal.visor.baseline.VisorBaselineOperation;
 import org.apache.ignite.internal.visor.baseline.VisorBaselineTask;
 import org.apache.ignite.internal.visor.baseline.VisorBaselineTaskArg;
 import org.apache.ignite.internal.visor.baseline.VisorBaselineTaskResult;
+import org.apache.ignite.internal.visor.tx.VisorTxInfo;
+import org.apache.ignite.internal.visor.tx.VisorTxOperation;
+import org.apache.ignite.internal.visor.tx.VisorTxProjection;
+import org.apache.ignite.internal.visor.tx.VisorTxSortOrder;
+import org.apache.ignite.internal.visor.tx.VisorTxTask;
+import org.apache.ignite.internal.visor.tx.VisorTxTaskArg;
+import org.apache.ignite.internal.visor.tx.VisorTxTaskResult;
+import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.plugin.security.SecurityCredentials;
 import org.apache.ignite.plugin.security.SecurityCredentialsBasicProvider;
-import org.jetbrains.annotations.NotNull;
 
 import static org.apache.ignite.internal.IgniteVersionUtils.ACK_VER_STR;
 import static org.apache.ignite.internal.IgniteVersionUtils.COPYRIGHT;
@@ -55,6 +68,7 @@ import static org.apache.ignite.internal.commandline.Command.ACTIVATE;
 import static org.apache.ignite.internal.commandline.Command.BASELINE;
 import static org.apache.ignite.internal.commandline.Command.DEACTIVATE;
 import static org.apache.ignite.internal.commandline.Command.STATE;
+import static org.apache.ignite.internal.commandline.Command.TX;
 import static org.apache.ignite.internal.visor.baseline.VisorBaselineOperation.ADD;
 import static org.apache.ignite.internal.visor.baseline.VisorBaselineOperation.COLLECT;
 import static org.apache.ignite.internal.visor.baseline.VisorBaselineOperation.REMOVE;
@@ -65,6 +79,9 @@ import static org.apache.ignite.internal.visor.baseline.VisorBaselineOperation.V
  * Class that execute several commands passed via command line.
  */
 public class CommandHandler {
+    /** Logger. */
+    private static final Logger log = Logger.getLogger(CommandHandler.class.getName());
+
     /** */
     static final String DFLT_HOST = "127.0.0.1";
 
@@ -87,6 +104,9 @@ public class CommandHandler {
     private static final String CMD_USER = "--user";
 
     /** */
+    public static final String CONFIRM_MSG = "yes";
+
+    /** */
     private static final String BASELINE_ADD = "add";
 
     /** */
@@ -126,11 +146,44 @@ public class CommandHandler {
     private static final Scanner IN = new Scanner(System.in);
 
     /** */
+    private static final String TX_LIMIT = "limit";
+
+    /** */
+    private static final String TX_ORDER = "order";
+
+    /** */
+    private static final String TX_SERVERS = "servers";
+
+    /** */
+    private static final String TX_CLIENTS = "clients";
+
+    /** */
+    private static final String TX_DURATION = "minDuration";
+
+    /** */
+    private static final String TX_SIZE = "minSize";
+
+    /** */
+    private static final String TX_LABEL = "label";
+
+    /** */
+    private static final String TX_NODES = "nodes";
+
+    /** */
+    private static final String TX_XID = "xid";
+
+    /** */
+    private static final String TX_KILL = "kill";
+
+    /** */
     private Iterator<String> argsIt;
 
     /** */
     private String peekedArg;
 
+    /** */
+    private Object lastOperationResult;
+
     /**
      * Output specified string to console.
      *
@@ -198,7 +251,7 @@ public class CommandHandler {
         if (prompt == null)
             return true;
 
-        return "y".equalsIgnoreCase(readLine(prompt));
+        return CONFIRM_MSG.equalsIgnoreCase(readLine(prompt));
     }
 
     /**
@@ -219,9 +272,15 @@ public class CommandHandler {
             case BASELINE:
                 if (!BASELINE_COLLECT.equals(args.baselineAction()))
                     str = "Warning: the command will perform changes in baseline.";
+                break;
+
+            case TX:
+                if (args.transactionArguments().getOperation() == VisorTxOperation.KILL)
+                    str = "Warning: the command will kill some transactions.";
+                break;
         }
 
-        return str == null ? null : str + "\nPress 'y' to continue...";
+        return str == null ? null : str + "\nPress 'yes' to continue . . . ";
     }
 
     /**
@@ -301,14 +360,39 @@ public class CommandHandler {
     }
 
     /**
+     * @param client Client.
+     * @param arg Task argument.
+     * @return Task result.
+     * @throws GridClientException If failed to execute task.
+     */
+    private Map<UUID, VisorTxTaskResult> executeTransactionsTask(GridClient client,
+        VisorTxTaskArg arg) throws GridClientException {
+
+        return executeTask(client, VisorTxTask.class, arg);
+    }
+
+    /**
      *
-     * @param client Client
+     * @param client Client.
+     * @param taskCls Task class.
+     * @param taskArgs Task arguments.
      * @return Task result.
      * @throws GridClientException If failed to execute task.
      */
     private <R> R executeTask(GridClient client, Class<?> taskCls, Object taskArgs) throws GridClientException {
         GridClientCompute compute = client.compute();
 
+        GridClientNode node = getBalancedNode(compute);
+
+        return compute.execute(taskCls.getName(),
+            new VisorTaskArgument<>(node.nodeId(), taskArgs, false));
+    }
+
+    /**
+     * @param compute instance
+     * @return balanced node
+     */
+    private GridClientNode getBalancedNode(GridClientCompute compute) throws GridClientException {
         List<GridClientNode> nodes = new ArrayList<>();
 
         for (GridClientNode node : compute.nodes())
@@ -318,10 +402,7 @@ public class CommandHandler {
         if (F.isEmpty(nodes))
             throw new GridClientDisconnectedException("Connectable node not found", null);
 
-        GridClientNode node = compute.balancer().balancedNode(nodes);
-
-        return compute.projection(node).execute(taskCls.getName(),
-            new VisorTaskArgument<>(node.nodeId(), taskArgs, false));
+        return compute.balancer().balancedNode(nodes);
     }
 
     /**
@@ -368,13 +449,7 @@ public class CommandHandler {
             case ADD:
             case REMOVE:
             case SET:
-                if(F.isEmpty(s))
-                    throw new IllegalArgumentException("Empty list of consistent IDs");
-
-                List<String> consistentIds = new ArrayList<>();
-
-                for (String consistentId : s.split(","))
-                    consistentIds.add(consistentId.trim());
+                List<String> consistentIds = getConsistentIds(s);
 
                 return new VisorBaselineTaskArg(op, -1, consistentIds);
 
@@ -394,6 +469,22 @@ public class CommandHandler {
     }
 
     /**
+     * @param s String of consisted ids delimited by comma.
+     * @return List of consistent ids.
+     */
+    private List<String> getConsistentIds(String s) {
+        if (F.isEmpty(s))
+            throw new IllegalArgumentException("Empty list of consistent IDs");
+
+        List<String> consistentIds = new ArrayList<>();
+
+        for (String consistentId : s.split(","))
+            consistentIds.add(consistentId.trim());
+
+        return consistentIds;
+    }
+
+    /**
      * Print baseline topology.
      *
      * @param res Task result with baseline topology.
@@ -404,6 +495,7 @@ public class CommandHandler {
         nl();
 
         Map<String, VisorBaselineNode> baseline = res.getBaseline();
+
         Map<String, VisorBaselineNode> servers = res.getServers();
 
         if (F.isEmpty(baseline))
@@ -532,6 +624,57 @@ public class CommandHandler {
     }
 
     /**
+     * Dump transactions information.
+     *
+     * @param client Client.
+     * @param arg Transaction search arguments
+     */
+    private void transactions(GridClient client, VisorTxTaskArg arg) throws GridClientException {
+        try {
+            Map<ClusterNode, VisorTxTaskResult> res = executeTask(client, VisorTxTask.class, arg);
+
+            lastOperationResult = res;
+
+            if (res.isEmpty())
+                log("Nothing found.");
+            else if (arg.getOperation() == VisorTxOperation.KILL)
+                log("Killed transactions:");
+            else
+                log("Matching transactions:");
+
+            for (Map.Entry<ClusterNode, VisorTxTaskResult> entry : res.entrySet()) {
+                if (entry.getValue().getInfos().isEmpty())
+                    continue;
+
+                ClusterNode key = entry.getKey();
+
+                log(key.toString());
+
+                for (VisorTxInfo info : entry.getValue().getInfos())
+                    log("    Tx: [xid=" + info.getXid() +
+                        ", label=" + info.getLabel() +
+                        ", state=" + info.getState() +
+                        ", duration=" + info.getDuration() / 1000 +
+                        ", isolation=" + info.getIsolation() +
+                        ", concurrency=" + info.getConcurrency() +
+                        ", timeout=" + info.getTimeout() +
+                        ", size=" + info.getSize() +
+                        ", dhtNodes=" + F.transform(info.getPrimaryNodes(), new IgniteClosure<UUID, String>() {
+                        @Override public String apply(UUID id) {
+                            return U.id8(id);
+                        }
+                    }) +
+                        ']');
+            }
+        }
+        catch (Throwable e) {
+            log("Failed to perform operation.");
+
+            throw e;
+        }
+    }
+
+    /**
      * @param e Exception to check.
      * @return {@code true} if specified exception is {@link GridClientAuthenticationException}.
      */
@@ -603,7 +746,7 @@ public class CommandHandler {
      * @return Arguments bean.
      * @throws IllegalArgumentException In case arguments aren't valid.
      */
-    @NotNull Arguments parseAndValidate(List<String> rawArgs) {
+    Arguments parseAndValidate(List<String> rawArgs) {
         String host = DFLT_HOST;
 
         String port = DFLT_PORT;
@@ -622,6 +765,8 @@ public class CommandHandler {
 
         initArgIterator(rawArgs);
 
+        VisorTxTaskArg txArgs = null;
+
         while (hasNextArg()) {
             String str = nextArg("").toLowerCase();
 
@@ -632,7 +777,14 @@ public class CommandHandler {
                     case ACTIVATE:
                     case DEACTIVATE:
                     case STATE:
-                        commands.add(Command.of(str));
+                        commands.add(cmd);
+                        break;
+
+                    case TX:
+                        commands.add(TX);
+
+                        txArgs = parseTransactionArguments();
+
                         break;
 
                     case BASELINE:
@@ -652,6 +804,11 @@ public class CommandHandler {
                                 baselineArgs = nextArg("Expected baseline arguments");
                             }
                         }
+
+                        break;
+
+                    default:
+                        throw new IllegalArgumentException("Unexpected command: " + str);
                 }
             }
             else {
@@ -685,6 +842,7 @@ public class CommandHandler {
                     case CMD_FORCE:
                         force = true;
                         break;
+
                     default:
                         throw new IllegalArgumentException("Unexpected argument: " + str);
                 }
@@ -707,7 +865,138 @@ public class CommandHandler {
         if (hasUsr != hasPwd)
             throw new IllegalArgumentException("Both user and password should be specified");
 
-        return new Arguments(cmd, host, port, user, pwd, baselineAct, baselineArgs, force);
+        return new Arguments(cmd, host, port, user, pwd, baselineAct, baselineArgs, txArgs, force);
+    }
+
+    /**
+     * @return Transaction arguments.
+     */
+    private VisorTxTaskArg parseTransactionArguments() {
+        VisorTxProjection proj = null;
+
+        Integer limit = null;
+
+        VisorTxSortOrder sortOrder = null;
+
+        Long duration = null;
+
+        Integer size = null;
+
+        String lbRegex = null;
+
+        List<String> consistentIds = null;
+
+        VisorTxOperation op = VisorTxOperation.LIST;
+
+        String xid = null;
+
+        boolean end = false;
+
+        do {
+            String str = peekNextArg();
+
+            if (str == null)
+                break;
+
+            switch (str) {
+                case TX_LIMIT:
+                    nextArg("");
+
+                    limit = (int) nextLongArg(TX_LIMIT);
+                    break;
+
+                case TX_ORDER:
+                    nextArg("");
+
+                    sortOrder = VisorTxSortOrder.fromString(nextArg(TX_ORDER));
+
+                    break;
+
+                case TX_SERVERS:
+                    nextArg("");
+
+                    proj = VisorTxProjection.SERVER;
+                    break;
+
+                case TX_CLIENTS:
+                    nextArg("");
+
+                    proj = VisorTxProjection.CLIENT;
+                    break;
+
+                case TX_NODES:
+                    nextArg("");
+
+                    consistentIds = getConsistentIds(nextArg(TX_NODES));
+                    break;
+
+                case TX_DURATION:
+                    nextArg("");
+
+                    duration = nextLongArg(TX_DURATION) * 1000L;
+                    break;
+
+                case TX_SIZE:
+                    nextArg("");
+
+                    size = (int) nextLongArg(TX_SIZE);
+                    break;
+
+                case TX_LABEL:
+                    nextArg("");
+
+                    lbRegex = nextArg(TX_LABEL);
+
+                    try {
+                        Pattern.compile(lbRegex);
+                    }
+                    catch (PatternSyntaxException e) {
+                        throw new IllegalArgumentException("Illegal regex syntax");
+                    }
+
+                    break;
+
+                case TX_XID:
+                    nextArg("");
+
+                    xid = nextArg(TX_XID);
+                    break;
+
+                case TX_KILL:
+                    nextArg("");
+
+                    op = VisorTxOperation.KILL;
+                    break;
+
+                default:
+                    end = true;
+            }
+        }
+        while (!end);
+
+        if (proj != null && consistentIds != null)
+            throw new IllegalArgumentException("Projection can't be used together with list of consistent ids.");
+
+        return new VisorTxTaskArg(op, limit, duration, size, null, proj, consistentIds, xid, lbRegex, sortOrder);
+    }
+
+    /**
+     * @return Numeric value.
+     */
+    private long nextLongArg(String lb) {
+        String str = nextArg("Expecting " + lb);
+
+        try {
+            long val = Long.parseLong(str);
+
+            if (val < 0)
+                throw new IllegalArgumentException("Invalid value for " + lb + ": " + val);
+
+            return val;
+        }
+        catch (NumberFormatException e) {
+            throw new IllegalArgumentException("Invalid value for " + lb + ": " + str);
+        }
     }
 
     /**
@@ -734,8 +1023,11 @@ public class CommandHandler {
                 usage("  Remove nodes from baseline topology:", BASELINE, " remove consistentId1[,consistentId2,....,consistentIdN] [--force]");
                 usage("  Set baseline topology:", BASELINE, " set consistentId1[,consistentId2,....,consistentIdN] [--force]");
                 usage("  Set baseline topology based on version:", BASELINE, " version topologyVersion [--force]");
+                usage("  List or kill transactions:", TX, " [xid XID] [minDuration SECONDS] " +
+                    "[minSize SIZE] [label PATTERN_REGEX] [servers|clients] " +
+                    "[nodes consistentId1[,consistentId2,....,consistentIdN] [limit NUMBER] [order DURATION|SIZE] [kill] [--force]");
 
-                log("By default cluster deactivation and changes in baseline topology commands request interactive confirmation. ");
+                log("By default commands affecting the cluster require interactive confirmation. ");
                 log("  --force option can be used to execute commands without prompting for confirmation.");
                 nl();
 
@@ -757,7 +1049,7 @@ public class CommandHandler {
             Arguments args = parseAndValidate(rawArgs);
 
             if (!confirm(args)) {
-                log("Operation canceled.");
+                log("Operation cancelled.");
 
                 return EXIT_CODE_OK;
             }
@@ -789,6 +1081,10 @@ public class CommandHandler {
                     case BASELINE:
                         baseline(client, args.baselineAction(), args.baselineArguments());
                         break;
+
+                    case TX:
+                        transactions(client, args.transactionArguments());
+                        break;
                 }
             }
 
@@ -816,5 +1112,14 @@ public class CommandHandler {
 
         System.exit(hnd.execute(Arrays.asList(args)));
     }
+
+    /**
+     * Used for tests.
+     * @return Last operation result;
+     */
+    @SuppressWarnings("unchecked")
+    public <T> T getLastOperationResult() {
+        return (T)lastOperationResult;
+    }
 }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/ee5fbce3/modules/core/src/main/java/org/apache/ignite/internal/marshaller/optimized/OptimizedMarshallerUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/marshaller/optimized/OptimizedMarshallerUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/marshaller/optimized/OptimizedMarshallerUtils.java
index aa4bfd6..4626df7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/marshaller/optimized/OptimizedMarshallerUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/marshaller/optimized/OptimizedMarshallerUtils.java
@@ -203,7 +203,7 @@ class OptimizedMarshallerUtils {
             try {
                 registered = ctx.registerClassName(JAVA_ID, typeId, cls.getName());
             }
-            catch (IgniteCheckedException e) {
+            catch (Exception e) {
                 throw new IOException("Failed to register class: " + cls.getName(), e);
             }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/ee5fbce3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index bd613a1..44a64f8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -4081,7 +4081,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
                     READ_COMMITTED,
                     tCfg.getDefaultTxTimeout(),
                     !ctx.skipStore(),
-                    0
+                    0,
+                    null
                 );
 
                 assert tx != null;
@@ -4105,7 +4106,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
                                 tx.xid(), e);
                         }
                         catch (IgniteCheckedException | AssertionError | RuntimeException e1) {
-                            U.error(log, "Failed to rollback transaction (cache may contain stale locks): " + tx, e1);
+                            U.error(log, "Failed to rollback transaction (cache may contain stale locks): " +
+                                CU.txString(tx), e1);
 
                             if (e != e1)
                                 e.addSuppressed(e1);
@@ -4180,7 +4182,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
                     READ_COMMITTED,
                     txCfg.getDefaultTxTimeout(),
                     !skipStore,
-                    0);
+                    0,
+                    null);
 
                 return asyncOp(tx, op, opCtx, /*retry*/false);
             }
@@ -4224,6 +4227,31 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
 
             final GridNearTxLocal tx0 = tx;
 
+            final CX1 clo = new CX1<IgniteInternalFuture<T>, T>() {
+                @Override public T applyx(IgniteInternalFuture<T> tFut) throws IgniteCheckedException {
+                    try {
+                        return tFut.get();
+                    }
+                    catch (IgniteTxTimeoutCheckedException | IgniteTxRollbackCheckedException | NodeStoppingException e) {
+                        throw e;
+                    }
+                    catch (IgniteCheckedException e1) {
+                        try {
+                            tx0.rollbackNearTxLocalAsync();
+                        }
+                        catch (Throwable e2) {
+                            if (e1 != e2)
+                                e1.addSuppressed(e2);
+                        }
+
+                        throw e1;
+                    }
+                    finally {
+                        ctx.shared().txContextReset();
+                    }
+                }
+            };
+
             if (fut != null && !fut.isDone()) {
                 IgniteInternalFuture<T> f = new GridEmbeddedFuture(fut,
                     new IgniteOutClosure<IgniteInternalFuture>() {
@@ -4233,31 +4261,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
                                     new IgniteCheckedException("Operation has been cancelled (node is stopping)."));
 
                             try {
-                                return op.op(tx0, opCtx).chain(new CX1<IgniteInternalFuture<T>, T>() {
-                                    @Override
-                                    public T applyx(IgniteInternalFuture<T> tFut) throws IgniteCheckedException {
-                                        try {
-                                            return tFut.get();
-                                        }
-                                        catch (IgniteTxRollbackCheckedException | NodeStoppingException e) {
-                                            throw e;
-                                        }
-                                        catch (IgniteCheckedException e1) {
-                                            try {
-                                                tx0.rollbackNearTxLocalAsync();
-                                            }
-                                            catch (Throwable e2) {
-                                                if (e1 != e2)
-                                                    e1.addSuppressed(e2);
-                                            }
-
-                                            throw e1;
-                                        }
-                                        finally {
-                                            ctx.shared().txContextReset();
-                                        }
-                                    }
-                                });
+                                return op.op(tx0, opCtx).chain(clo);
                             }
                             finally {
                                 // It is necessary to clear tx context in this thread as well.
@@ -4274,30 +4278,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
             IgniteInternalFuture<T> f;
 
             try {
-                f = op.op(tx, opCtx).chain(new CX1<IgniteInternalFuture<T>, T>() {
-                    @Override public T applyx(IgniteInternalFuture<T> tFut) throws IgniteCheckedException {
-                        try {
-                            return tFut.get();
-                        }
-                        catch (IgniteTxRollbackCheckedException | NodeStoppingException e) {
-                            throw e;
-                        }
-                        catch (IgniteCheckedException e1) {
-                            try {
-                                tx0.rollbackNearTxLocalAsync(e1 instanceof IgniteTxTimeoutCheckedException);
-                            }
-                            catch (Throwable e2) {
-                                if (e2 != e1)
-                                    e1.addSuppressed(e2);
-                            }
-
-                            throw e1;
-                        }
-                        finally {
-                            ctx.shared().txContextReset();
-                        }
-                    }
-                });
+                f = op.op(tx, opCtx).chain(clo);
             }
             finally {
                 // It is necessary to clear tx context in this thread as well.
@@ -4858,7 +4839,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
                 READ_COMMITTED,
                 CU.transactionConfiguration(ctx, ctx.kernalContext().config()).getDefaultTxTimeout(),
                 opCtx == null || !opCtx.skipStore(),
-                0);
+                0,
+                null);
 
             IgniteInternalFuture<T> fut = asyncOp(tx, op, opCtx, retry);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/ee5fbce3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 7ea161a..90f57c2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -36,7 +36,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
@@ -47,6 +46,7 @@ import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cache.affinity.AffinityFunction;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.failure.FailureContext;
 import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
@@ -2417,23 +2417,28 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
                             int dumpCnt = 0;
 
-                            final long futTimeout = 2 * cctx.gridConfig().getNetworkTimeout();
+                            IgniteConfiguration cfg = cctx.gridConfig();
+
+                            long rollbackTimeout = cfg.getTransactionConfiguration().getTxTimeoutOnPartitionMapExchange();
+
+                            final long dumpTimeout = 2 * cctx.gridConfig().getNetworkTimeout();
 
                             long nextDumpTime = 0;
 
                             while (true) {
                                 try {
-                                    resVer = exchFut.get(futTimeout, TimeUnit.MILLISECONDS);
+                                    resVer = exchFut.get(rollbackTimeout > 0 ? rollbackTimeout : dumpTimeout);
 
                                     break;
                                 }
                                 catch (IgniteFutureTimeoutCheckedException ignored) {
-                                    U.warn(diagnosticLog, "Failed to wait for partition map exchange [" +
-                                        "topVer=" + exchFut.initialVersion() +
-                                        ", node=" + cctx.localNodeId() + "]. " +
-                                        "Dumping pending objects that might be the cause: ");
-
                                     if (nextDumpTime <= U.currentTimeMillis()) {
+                                        U.warn(diagnosticLog, "Failed to wait for partition map exchange [" +
+                                            "topVer=" + exchFut.initialVersion() +
+                                            ", node=" + cctx.localNodeId() + "]. " +
+                                            (rollbackTimeout == 0 ? "Consider changing TransactionConfiguration.txTimeoutOnPartitionMapSynchronization to non default value to avoid this message. " : "") +
+                                            "Dumping pending objects that might be the cause: ");
+
                                         try {
                                             dumpDebugInfo(exchFut);
                                         }
@@ -2441,7 +2446,13 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                                             U.error(diagnosticLog, "Failed to dump debug information: " + e, e);
                                         }
 
-                                        nextDumpTime = U.currentTimeMillis() + nextDumpTimeout(dumpCnt++, futTimeout);
+                                        nextDumpTime = U.currentTimeMillis() + nextDumpTimeout(dumpCnt++, dumpTimeout);
+                                    }
+
+                                    if (rollbackTimeout > 0) {
+                                        rollbackTimeout = 0; // Try automatic rollback only once.
+
+                                        cctx.tm().rollbackOnTopologyChange(exchFut.initialVersion());
                                     }
                                 }
                                 catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/ee5fbce3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index bceb8c7..39c7e71 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -229,6 +229,10 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     /** Enable/disable cache statistics futures. */
     private ConcurrentMap<UUID, EnableStatisticsFuture> enableStatisticsFuts = new ConcurrentHashMap<>();
 
+    /** The futures for changing transaction timeout on partition map exchange. */
+    private ConcurrentMap<UUID, TxTimeoutOnPartitionMapExchangeChangeFuture> txTimeoutOnPartitionMapExchangeFuts =
+        new ConcurrentHashMap<>();
+
     /** */
     private ClusterCachesInfo cachesInfo;
 
@@ -380,6 +384,11 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
             processStatisticsModeChange(task0.message());
         }
+        else if (task instanceof TxTimeoutOnPartitionMapExchangeChangeTask) {
+            TxTimeoutOnPartitionMapExchangeChangeTask task0 = (TxTimeoutOnPartitionMapExchangeChangeTask)task;
+
+            processTxTimeoutOnPartitionMapExchangeChange(task0.message());
+        }
         else if (task instanceof StopCachesOnClientReconnectExchangeTask) {
             StopCachesOnClientReconnectExchangeTask task0 = (StopCachesOnClientReconnectExchangeTask)task;
 
@@ -683,7 +692,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
         sharedCtx = createSharedContext(ctx, sessionListeners);
 
-        transactions = new IgniteTransactionsImpl(sharedCtx);
+        transactions = new IgniteTransactionsImpl(sharedCtx, null);
 
         // Start shared managers.
         for (GridCacheSharedManager mgr : sharedCtx.managers())
@@ -1057,6 +1066,9 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         for (EnableStatisticsFuture fut : enableStatisticsFuts.values())
             fut.onDone(err);
 
+        for (TxTimeoutOnPartitionMapExchangeChangeFuture fut : txTimeoutOnPartitionMapExchangeFuts.values())
+            fut.onDone(err);
+
         for (CacheGroupContext grp : cacheGrps.values())
             grp.onDisconnected(reconnectFut);
 
@@ -2618,6 +2630,43 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * Callback invoked from discovery thread when discovery custom message is received.
+     *
+     * @param msg Discovery message for changing transaction timeout on partition map exchange.
+     */
+    public void onTxTimeoutOnPartitionMapExchangeChange(TxTimeoutOnPartitionMapExchangeChangeMessage msg) {
+        assert msg != null;
+
+        if (msg.isInit()) {
+            TransactionConfiguration cfg = ctx.config().getTransactionConfiguration();
+
+            if (cfg.getTxTimeoutOnPartitionMapExchange() != msg.getTimeout())
+                cfg.setTxTimeoutOnPartitionMapExchange(msg.getTimeout());
+        }
+        else {
+            TxTimeoutOnPartitionMapExchangeChangeFuture fut = txTimeoutOnPartitionMapExchangeFuts.get(
+                msg.getRequestId());
+
+            if (fut != null)
+                fut.onDone();
+        }
+    }
+
+    /**
+     * The task for changing transaction timeout on partition map exchange processed by exchange worker.
+     *
+     * @param msg Message.
+     */
+    public void processTxTimeoutOnPartitionMapExchangeChange(TxTimeoutOnPartitionMapExchangeChangeMessage msg) {
+        assert msg != null;
+
+        long timeout = ctx.config().getTransactionConfiguration().getTxTimeoutOnPartitionMapExchange();
+
+        if (timeout != msg.getTimeout())
+            ctx.config().getTransactionConfiguration().setTxTimeoutOnPartitionMapExchange(msg.getTimeout());
+    }
+
+    /**
      * @param stoppedCaches Stopped caches.
      */
     private void stopCachesOnClientReconnect(Collection<GridCacheAdapter> stoppedCaches) {
@@ -3413,6 +3462,9 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         if (msg instanceof CacheStatisticsModeChangeMessage)
             onCacheStatisticsModeChange((CacheStatisticsModeChangeMessage)msg);
 
+        if (msg instanceof TxTimeoutOnPartitionMapExchangeChangeMessage)
+            onTxTimeoutOnPartitionMapExchangeChange((TxTimeoutOnPartitionMapExchangeChangeMessage)msg);
+
         return false;
     }
 
@@ -3963,6 +4015,9 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
         for (EnableStatisticsFuture fut : enableStatisticsFuts.values())
             fut.onDone(err);
+
+        for (TxTimeoutOnPartitionMapExchangeChangeFuture fut : txTimeoutOnPartitionMapExchangeFuts.values())
+            fut.onDone(err);
     }
 
     /**
@@ -4392,6 +4447,26 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * Sets transaction timeout on partition map exchange.
+     *
+     * @param timeout Transaction timeout on partition map exchange in milliseconds.
+     */
+    public void setTxTimeoutOnPartitionMapExchange(long timeout) throws IgniteCheckedException {
+        UUID requestId = UUID.randomUUID();
+
+        TxTimeoutOnPartitionMapExchangeChangeFuture fut = new TxTimeoutOnPartitionMapExchangeChangeFuture(requestId);
+
+        txTimeoutOnPartitionMapExchangeFuts.put(requestId, fut);
+
+        TxTimeoutOnPartitionMapExchangeChangeMessage msg = new TxTimeoutOnPartitionMapExchangeChangeMessage(
+            requestId, timeout);
+
+        ctx.grid().context().discovery().sendCustomEvent(msg);
+
+        fut.get();
+    }
+
+    /**
      * @param obj Object to clone.
      * @return Object copy.
      * @throws IgniteCheckedException If failed.
@@ -4624,4 +4699,30 @@ public class GridCacheProcessor extends GridProcessorAdapter {
             return S.toString(EnableStatisticsFuture.class, this);
         }
     }
+
+    /**
+     * The future for changing transaction timeout on partition map exchange.
+     */
+    private class TxTimeoutOnPartitionMapExchangeChangeFuture extends GridFutureAdapter<Void> {
+        /** */
+        private UUID id;
+
+        /**
+         * @param id Future ID.
+         */
+        private TxTimeoutOnPartitionMapExchangeChangeFuture(UUID id) {
+            this.id = id;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean onDone(@Nullable Void res, @Nullable Throwable err) {
+            txTimeoutOnPartitionMapExchangeFuts.remove(id, this);
+            return super.onDone(res, err);
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(TxTimeoutOnPartitionMapExchangeChangeFuture.class, this);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/ee5fbce3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
index b5c4096..b195508 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
@@ -71,6 +71,7 @@ import org.apache.ignite.plugin.PluginProvider;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_LOCAL_STORE_KEEPS_PRIMARY_ONLY;
+import static org.apache.ignite.transactions.TransactionState.MARKED_ROLLBACK;
 
 /**
  * Shared context.
@@ -955,9 +956,14 @@ public class GridCacheSharedContext<K, V> {
      * @throws IgniteCheckedException If failed.
      */
     public void endTx(GridNearTxLocal tx) throws IgniteCheckedException {
-        tx.txState().awaitLastFuture(this);
+        boolean clearThreadMap = txMgr.threadLocalTx(null) == tx;
+
+        if (clearThreadMap)
+            tx.txState().awaitLastFuture(this);
+        else
+            tx.state(MARKED_ROLLBACK);
 
-        tx.close();
+        tx.close(clearThreadMap);
     }
 
     /**
@@ -983,9 +989,14 @@ public class GridCacheSharedContext<K, V> {
      * @return Rollback future.
      */
     public IgniteInternalFuture rollbackTxAsync(GridNearTxLocal tx) throws IgniteCheckedException {
-        tx.txState().awaitLastFuture(this);
+        boolean clearThreadMap = txMgr.threadLocalTx(null) == tx;
+
+        if (clearThreadMap)
+            tx.txState().awaitLastFuture(this);
+        else
+            tx.state(MARKED_ROLLBACK);
 
-        return tx.rollbackNearTxLocalAsync();
+        return tx.rollbackNearTxLocalAsync(clearThreadMap, false);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/ee5fbce3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index e244c75..bb64cc6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -817,13 +817,15 @@ public class GridCacheUtils {
         if (tx == null)
             return "null";
 
-        return tx.getClass().getSimpleName() + "[id=" + tx.xid() +
+        return tx.getClass().getSimpleName() + "[xid=" + tx.xid() +
+            ", xidVersion=" + tx.xidVersion() +
             ", concurrency=" + tx.concurrency() +
             ", isolation=" + tx.isolation() +
             ", state=" + tx.state() +
             ", invalidate=" + tx.isInvalidate() +
             ", rollbackOnly=" + tx.isRollbackOnly() +
             ", nodeId=" + tx.nodeId() +
+            ", timeout=" + tx.timeout() +
             ", duration=" + (U.currentTimeMillis() - tx.startTime()) + ']';
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/ee5fbce3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/TxTimeoutOnPartitionMapExchangeChangeMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/TxTimeoutOnPartitionMapExchangeChangeMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/TxTimeoutOnPartitionMapExchangeChangeMessage.java
new file mode 100644
index 0000000..5589c94
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/TxTimeoutOnPartitionMapExchangeChangeMessage.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.UUID;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Discovery message for changing transaction timeout on partition map exchange.
+ */
+public class TxTimeoutOnPartitionMapExchangeChangeMessage implements DiscoveryCustomMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private final IgniteUuid id = IgniteUuid.randomUuid();
+
+    /** Request ID. */
+    private final UUID reqId;
+
+    /** Transaction timeout on partition map exchange in milliseconds. */
+    private final long timeout;
+
+    /** Init flag. */
+    private final boolean isInit;
+
+    /**
+     * Constructor for response.
+     *
+     * @param req Request message.
+     */
+    public TxTimeoutOnPartitionMapExchangeChangeMessage(TxTimeoutOnPartitionMapExchangeChangeMessage req) {
+        this.reqId = req.reqId;
+        this.timeout = req.timeout;
+        this.isInit = false;
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param reqId Request ID.
+     * @param timeout Transaction timeout on partition map exchange in milliseconds.
+     */
+    public TxTimeoutOnPartitionMapExchangeChangeMessage(UUID reqId, long timeout) {
+        this.reqId = reqId;
+        this.timeout = timeout;
+        this.isInit = true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteUuid id() {
+        return id;
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public DiscoveryCustomMessage ackMessage() {
+        return isInit() ? new TxTimeoutOnPartitionMapExchangeChangeMessage(this) : null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isMutable() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean stopProcess() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr, AffinityTopologyVersion topVer,
+        DiscoCache discoCache) {
+        throw new UnsupportedOperationException();
+    }
+
+    /**
+     * Gets request ID.
+     *
+     * @return Request ID.
+     */
+    public UUID getRequestId() {
+        return reqId;
+    }
+
+    /**
+     * Gets transaction timeout on partition map exchange in milliseconds.
+     *
+     * @return Transaction timeout on partition map exchange in milliseconds.
+     */
+    public long getTimeout() {
+        return timeout;
+    }
+
+    /**
+     * Gets init flag.
+     *
+     * @return Init flag.
+     */
+    public boolean isInit() {
+        return isInit;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(TxTimeoutOnPartitionMapExchangeChangeMessage.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/ee5fbce3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/TxTimeoutOnPartitionMapExchangeChangeTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/TxTimeoutOnPartitionMapExchangeChangeTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/TxTimeoutOnPartitionMapExchangeChangeTask.java
new file mode 100644
index 0000000..6edfa58
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/TxTimeoutOnPartitionMapExchangeChangeTask.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * The task for changing transaction timeout on partition map exchange.
+ */
+public class TxTimeoutOnPartitionMapExchangeChangeTask implements CachePartitionExchangeWorkerTask {
+    /** Discovery message. */
+    private final TxTimeoutOnPartitionMapExchangeChangeMessage msg;
+
+    /**
+     * Constructor.
+     *
+     * @param msg Discovery message.
+     */
+    public TxTimeoutOnPartitionMapExchangeChangeTask(TxTimeoutOnPartitionMapExchangeChangeMessage msg) {
+        assert msg != null;
+        this.msg = msg;
+    }
+
+    /**
+     * Gets discovery message.
+     *
+     * @return Discovery message.
+     */
+    public TxTimeoutOnPartitionMapExchangeChangeMessage message() {
+        return msg;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean skipForExchangeMerge() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(TxTimeoutOnPartitionMapExchangeChangeTask.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/ee5fbce3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxFinishSync.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxFinishSync.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxFinishSync.java
index 02d1e8e..7cc368a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxFinishSync.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxFinishSync.java
@@ -26,6 +26,7 @@ import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.typedef.F;
@@ -338,4 +339,4 @@ public class GridCacheTxFinishSync<K, V> {
             }
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/ee5fbce3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java
index 45903aa..481c954 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java
@@ -41,7 +41,7 @@ public class GridDistributedTxMapping {
 
     /** Entries. */
     @GridToStringInclude
-    private Collection<IgniteTxEntry> entries;
+    private final Collection<IgniteTxEntry> entries;
 
     /** Explicit lock flag. */
     private boolean explicitLock;

http://git-wip-us.apache.org/repos/asf/ignite/blob/ee5fbce3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index 27044eb..a692b2e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@ -117,6 +117,9 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
     @GridToStringInclude
     protected IgniteTxRemoteState txState;
 
+    /** {@code True} if tx should skip adding itself to completed version map on finish. */
+    private boolean skipCompletedVers;
+
     /**
      * Empty constructor required for {@link Externalizable}.
      */
@@ -228,8 +231,7 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
     @Override public GridTuple<CacheObject> peek(GridCacheContext cacheCtx,
         boolean failFast,
         KeyCacheObject key)
-        throws GridCacheFilterFailedException
-    {
+        throws GridCacheFilterFailedException {
         assert false : "Method peek can only be called on user transaction: " + this;
 
         throw new IllegalStateException("Method peek can only be called on user transaction: " + this);
@@ -869,7 +871,7 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
             // Note that we don't evict near entries here -
             // they will be deleted by their corresponding transactions.
             if (state(ROLLING_BACK) || state() == UNKNOWN) {
-                cctx.tm().rollbackTx(this, false);
+                cctx.tm().rollbackTx(this, false, skipCompletedVers);
 
                 state(ROLLED_BACK);
             }
@@ -899,6 +901,20 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
     }
 
     /**
+     * @return {@code True} if tx should skip adding itself to completed version map on finish.
+     */
+    public boolean skipCompletedVersions() {
+        return skipCompletedVers;
+    }
+
+    /**
+     * @param skipCompletedVers {@code True} if tx should skip adding itself to completed version map on finish.
+     */
+    public void skipCompletedVersions(boolean skipCompletedVers) {
+        this.skipCompletedVers = skipCompletedVers;
+    }
+
+    /**
      * Adds explicit version if there is one.
      *
      * @param e Transaction entry.
@@ -925,4 +941,5 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
     @Override public String toString() {
         return GridToStringBuilder.toString(GridDistributedTxRemoteAdapter.class, this, "super", super.toString());
     }
+
 }