You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2018/08/30 11:53:58 UTC

[36/38] ignite git commit: IGNITE-4191: MVCC and transactional SQL support. Joint multi-man-years efforts of Semen Boikov, Igor Seliverstov, Alexander Paschenko, Igor Sapego, Sergey Kalashnikov, Roman Kondakov, Pavel Kuznetsov, Ivan Pavlukhin, Andrey Mas

http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinDatabaseMetadata.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinDatabaseMetadata.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinDatabaseMetadata.java
index 6c128a4..e24ecbb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinDatabaseMetadata.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinDatabaseMetadata.java
@@ -46,6 +46,7 @@ import org.apache.ignite.internal.processors.odbc.jdbc.JdbcTableMeta;
 import org.apache.ignite.internal.util.typedef.F;
 
 import static java.sql.Connection.TRANSACTION_NONE;
+import static java.sql.Connection.TRANSACTION_REPEATABLE_READ;
 import static java.sql.ResultSet.CONCUR_READ_ONLY;
 import static java.sql.ResultSet.HOLD_CURSORS_OVER_COMMIT;
 import static java.sql.ResultSet.TYPE_FORWARD_ONLY;
@@ -630,17 +631,19 @@ public class JdbcThinDatabaseMetadata implements DatabaseMetaData {
 
     /** {@inheritDoc} */
     @Override public int getDefaultTransactionIsolation() throws SQLException {
-        return TRANSACTION_NONE;
+        return conn.igniteVersion().greaterThanEqual(2, 5, 0) ? TRANSACTION_REPEATABLE_READ :
+            TRANSACTION_NONE;
     }
 
     /** {@inheritDoc} */
     @Override public boolean supportsTransactions() throws SQLException {
-        return false;
+        return conn.igniteVersion().greaterThanEqual(2, 5, 0);
     }
 
     /** {@inheritDoc} */
     @Override public boolean supportsTransactionIsolationLevel(int level) throws SQLException {
-        return false;
+        return conn.igniteVersion().greaterThanEqual(2, 5, 0) &&
+            TRANSACTION_REPEATABLE_READ == level;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinStatement.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinStatement.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinStatement.java
index 30e446f..f0f7337 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinStatement.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinStatement.java
@@ -46,6 +46,8 @@ import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQueryExecuteResult;
 import org.apache.ignite.internal.processors.odbc.jdbc.JdbcResult;
 import org.apache.ignite.internal.processors.odbc.jdbc.JdbcResultInfo;
 import org.apache.ignite.internal.processors.odbc.jdbc.JdbcStatementType;
+import org.apache.ignite.internal.processors.odbc.jdbc.JdbcBulkLoadBatchRequest;
+import org.apache.ignite.internal.processors.odbc.jdbc.JdbcStatementType;
 import org.apache.ignite.internal.processors.query.IgniteSQLException;
 import org.apache.ignite.internal.sql.SqlKeyword;
 import org.apache.ignite.internal.sql.SqlParseException;
@@ -208,7 +210,7 @@ public class JdbcThinStatement implements Statement {
         }
 
         JdbcResult res0 = conn.sendRequest(new JdbcQueryExecuteRequest(stmtType, schema, pageSize,
-            maxRows, sql, args == null ? null : args.toArray(new Object[args.size()])));
+            maxRows, conn.getAutoCommit(), sql, args == null ? null : args.toArray(new Object[args.size()])));
 
         assert res0 != null;
 
@@ -646,7 +648,8 @@ public class JdbcThinStatement implements Statement {
             throw new SQLException("Batch is empty.");
 
         try {
-            JdbcBatchExecuteResult res = conn.sendRequest(new JdbcBatchExecuteRequest(conn.getSchema(), batch, false));
+            JdbcBatchExecuteResult res = conn.sendRequest(new JdbcBatchExecuteRequest(conn.getSchema(), batch,
+                conn.getAutoCommit(), false));
 
             if (res.errorCode() != ClientListenerResponse.STATUS_SUCCESS) {
                 throw new BatchUpdateException(res.errorMessage(), IgniteQueryErrorCode.codeToSqlState(res.errorCode()),

http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java
index 6128d07..2c3f321 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java
@@ -326,6 +326,7 @@ public class JdbcThinTcpIo {
         writer.writeBoolean(connProps.isAutoCloseServerCursor());
         writer.writeBoolean(connProps.isLazy());
         writer.writeBoolean(connProps.isSkipReducerOnUpdate());
+        writer.writeString(connProps.nestedTxMode());
 
         if (!F.isEmpty(connProps.getUsername())) {
             assert ver.compareTo(VER_2_5_0) >= 0 : "Authentication is supported since 2.5";
@@ -374,8 +375,9 @@ public class JdbcThinTcpIo {
                     + ", url=" + connProps.getUrl() + ']', SqlStateCode.CONNECTION_REJECTED);
             }
 
-            if (VER_2_4_0.equals(srvProtocolVer) || VER_2_3_0.equals(srvProtocolVer) ||
-                VER_2_1_5.equals(srvProtocolVer))
+            if (VER_2_4_0.equals(srvProtocolVer)
+                    || VER_2_3_0.equals(srvProtocolVer)
+                    || VER_2_1_5.equals(srvProtocolVer))
                 handshake(srvProtocolVer);
             else if (VER_2_1_0.equals(srvProtocolVer))
                 handshake_2_1_0();
@@ -541,8 +543,8 @@ public class JdbcThinTcpIo {
 
             int cnt = !F.isEmpty(qrys) ? Math.min(MAX_BATCH_QRY_CNT, qrys.size()) : 0;
 
-            // One additional byte for last batch flag.
-            cap = cnt * DYNAMIC_SIZE_MSG_CAP + 1;
+            // One additional byte for autocommit and last batch flags.
+            cap = cnt * DYNAMIC_SIZE_MSG_CAP + 2;
         }
         else if (req instanceof JdbcQueryCloseRequest)
             cap = QUERY_CLOSE_MSG_SIZE;

http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 8d9a700..f515d57 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -67,6 +67,7 @@ import org.apache.ignite.internal.managers.GridManagerAdapter;
 import org.apache.ignite.internal.managers.deployment.GridDeployment;
 import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
+import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccMessage;
 import org.apache.ignite.internal.processors.platform.message.PlatformMessageFilter;
 import org.apache.ignite.internal.processors.pool.PoolProcessor;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
@@ -101,6 +102,7 @@ import org.jetbrains.annotations.Nullable;
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
+import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE_COORDINATOR;
 import static org.apache.ignite.internal.GridTopic.TOPIC_COMM_USER;
 import static org.apache.ignite.internal.GridTopic.TOPIC_IO_TEST;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.AFFINITY_POOL;
@@ -1112,6 +1114,17 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
 
             return;
         }
+        if (msg.topicOrdinal() == TOPIC_CACHE_COORDINATOR.ordinal()) {
+            MvccMessage msg0 = (MvccMessage)msg.message();
+
+            // see IGNITE-8609
+            /*if (msg0.processedFromNioThread())
+                c.run();
+            else*/
+                ctx.getStripedExecutorService().execute(-1, c);
+
+            return;
+        }
 
         if (plc == GridIoPolicy.SYSTEM_POOL && msg.partition() != GridIoMessage.STRIPE_DISABLED_PART) {
             ctx.getStripedExecutorService().execute(msg.partition(), c);
@@ -1648,6 +1661,9 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
                 if (e.getCause() instanceof ClusterTopologyCheckedException)
                     throw (ClusterTopologyCheckedException)e.getCause();
 
+                if (!ctx.discovery().alive(node))
+                    throw new ClusterTopologyCheckedException("Failed to send message, node left: " + node.id());
+
                 throw new IgniteCheckedException("Failed to send message (node may have left the grid or " +
                     "TCP connection cannot be established due to firewall issues) " +
                     "[node=" + node + ", topic=" + topic +

http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 581c32e..8dddd8b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -47,13 +47,13 @@ import org.apache.ignite.internal.processors.cache.CacheInvokeDirectResult;
 import org.apache.ignite.internal.processors.cache.CacheObjectByteArrayImpl;
 import org.apache.ignite.internal.processors.cache.CacheObjectImpl;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
+import org.apache.ignite.internal.processors.cache.GridCacheMvccEntryInfo;
 import org.apache.ignite.internal.processors.cache.GridCacheReturn;
 import org.apache.ignite.internal.processors.cache.GridChangeGlobalStateMessageResponse;
 import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
 import org.apache.ignite.internal.processors.cache.WalStateAckMessage;
 import org.apache.ignite.internal.processors.cache.binary.MetadataRequestMessage;
 import org.apache.ignite.internal.processors.cache.binary.MetadataResponseMessage;
-import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch.LatchAckMessage;
 import org.apache.ignite.internal.processors.cache.distributed.GridCacheTtlUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryRequest;
 import org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryResponse;
@@ -68,11 +68,15 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffini
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLockRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLockResponse;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionsUpdateCountersMap;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxOnePhaseCommitAckRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareResponse;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxQueryEnlistRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxQueryEnlistResponse;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxQueryFirstEnlistRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtUnlockRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicDeferredUpdateResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicNearResponse;
@@ -97,6 +101,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch.LatchAckMessage;
 import org.apache.ignite.internal.processors.cache.distributed.near.CacheVersionedValue;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse;
@@ -108,7 +113,24 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFi
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishResponse;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxQueryEnlistRequest;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxQueryEnlistResponse;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxQueryResultsEnlistRequest;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxQueryResultsEnlistResponse;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearUnlockRequest;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshotWithoutTxs;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccVersionImpl;
+import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccAckRequestQueryCntr;
+import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccAckRequestQueryId;
+import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccAckRequestTx;
+import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccAckRequestTxAndQueryCntr;
+import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccAckRequestTxAndQueryId;
+import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccActiveQueriesMessage;
+import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccFutureResponse;
+import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccQuerySnapshotRequest;
+import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccSnapshotResponse;
+import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccTxSnapshotRequest;
+import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccWaitTxsRequest;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryRequest;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryResponse;
 import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
@@ -927,6 +949,116 @@ public class GridIoMessageFactory implements MessageFactory {
 
                 break;
 
+            case 136:
+                msg = new MvccTxSnapshotRequest();
+
+                break;
+
+            case 137:
+                msg = new MvccAckRequestTx();
+
+                break;
+
+            case 138:
+                msg = new MvccFutureResponse();
+
+                break;
+
+            case 139:
+                msg = new MvccQuerySnapshotRequest();
+
+                break;
+
+            case 140:
+                msg = new MvccAckRequestQueryCntr();
+
+                break;
+
+            case 141:
+                msg = new MvccSnapshotResponse();
+
+                break;
+
+            case 142:
+                msg = new MvccWaitTxsRequest();
+
+                break;
+
+            case 143:
+                msg = new GridCacheMvccEntryInfo();
+
+                break;
+
+            case 144:
+                msg = new GridDhtTxQueryEnlistResponse();
+
+                break;
+
+            case 145:
+                msg = new MvccAckRequestQueryId();
+
+                break;
+
+            case 146:
+                msg = new MvccAckRequestTxAndQueryCntr();
+
+                break;
+
+            case 147:
+                msg = new MvccAckRequestTxAndQueryId();
+
+                break;
+
+            case 148:
+                msg = new MvccVersionImpl();
+
+                break;
+
+            case 149:
+                msg = new MvccActiveQueriesMessage();
+
+                break;
+
+            case 150:
+                msg = new MvccSnapshotWithoutTxs();
+
+                break;
+
+            case 151:
+                msg = new GridNearTxQueryEnlistRequest();
+
+                break;
+
+            case 152:
+                msg = new GridNearTxQueryEnlistResponse();
+
+                break;
+
+            case 153:
+                msg = new GridNearTxQueryResultsEnlistRequest();
+
+                break;
+
+            case 154:
+                msg = new GridNearTxQueryResultsEnlistResponse();
+
+                break;
+
+            case 155:
+                msg = new GridDhtTxQueryEnlistRequest();
+
+                break;
+
+            case 156:
+                msg = new GridDhtTxQueryFirstEnlistRequest();
+
+                break;
+
+            case 157:
+                msg = new GridDhtPartitionsUpdateCountersMap();
+
+                break;
+
             // [-3..119] [124..129] [-23..-27] [-36..-55]- this
             // [120..123] - DR
             // [-4..-22, -30..-35] - SQL

http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
index 8cdcbf3..84bcab1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
@@ -26,6 +26,7 @@ import java.util.UUID;
 import org.apache.ignite.cluster.BaselineNode;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator;
 import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState;
 import org.apache.ignite.internal.util.GridConcurrentHashSet;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -108,10 +109,14 @@ public class DiscoCache {
     /** */
     private final P1<ClusterNode> aliveNodePred;
 
+    /** */
+    private final MvccCoordinator mvccCrd;
+
     /**
      * @param topVer Topology version.
      * @param state Current cluster state.
      * @param loc Local node.
+     * @param mvccCrd MVCC coordinator node.
      * @param rmtNodes Remote nodes.
      * @param allNodes All nodes.
      * @param srvNodes Server nodes.
@@ -130,6 +135,7 @@ public class DiscoCache {
         AffinityTopologyVersion topVer,
         DiscoveryDataClusterState state,
         ClusterNode loc,
+        MvccCoordinator mvccCrd,
         List<ClusterNode> rmtNodes,
         List<ClusterNode> allNodes,
         List<ClusterNode> srvNodes,
@@ -148,6 +154,7 @@ public class DiscoCache {
         this.topVer = topVer;
         this.state = state;
         this.loc = loc;
+        this.mvccCrd = mvccCrd;
         this.rmtNodes = rmtNodes;
         this.allNodes = allNodes;
         this.srvNodes = srvNodes;
@@ -157,7 +164,7 @@ public class DiscoCache {
         this.allCacheNodes = allCacheNodes;
         this.cacheGrpAffNodes = cacheGrpAffNodes;
         this.nodeMap = nodeMap;
-        alives.addAll(alives0);
+        this.alives.addAll(alives0);
         this.minNodeVer = minNodeVer;
         this.minSrvNodeVer = minSrvNodeVer;
         this.nodeIdToConsIdx = nodeIdToConsIdx;
@@ -177,6 +184,13 @@ public class DiscoCache {
     }
 
     /**
+     * @return Mvcc coordinator node.
+     */
+    @Nullable public MvccCoordinator mvccCoordinator() {
+        return mvccCrd;
+    }
+
+    /**
      * @return Topology version.
      */
     public AffinityTopologyVersion version() {
@@ -461,6 +475,7 @@ public class DiscoCache {
             ver,
             state == null ? this.state : state,
             loc,
+            mvccCrd,
             rmtNodes,
             allNodes,
             srvNodes,

http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index a13f31e..d19e08b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -84,6 +84,7 @@ import org.apache.ignite.internal.processors.cache.DynamicCacheChangeRequest;
 import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
 import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator;
 import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
 import org.apache.ignite.internal.processors.cluster.BaselineTopology;
 import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
@@ -159,6 +160,7 @@ import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_LATE_AFFINITY
 import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MACS;
 import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MARSHALLER_USE_BINARY_STRING_SER_VER_2;
 import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MARSHALLER_USE_DFLT_SUID;
+import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MVCC_ENABLED;
 import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_OFFHEAP_SIZE;
 import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_PEER_CLASSLOADING;
 import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_PHY_RAM;
@@ -644,6 +646,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
                     updateClientNodes(node.id());
                 }
 
+                ctx.coordinators().onDiscoveryEvent(type, topSnapshot, topVer);
+
                 boolean locJoinEvt = type == EVT_NODE_JOINED && node.id().equals(locNode.id());
 
                 ChangeGlobalStateFinishMessage stateFinishMsg = null;
@@ -1204,6 +1208,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
         Boolean locSrvcCompatibilityEnabled = locNode.attribute(ATTR_SERVICES_COMPATIBILITY_MODE);
         Boolean locSecurityCompatibilityEnabled = locNode.attribute(ATTR_SECURITY_COMPATIBILITY_MODE);
 
+        Boolean locMvccEnabled = locNode.attribute(ATTR_MVCC_ENABLED);
+
         for (ClusterNode n : nodes) {
             int rmtJvmMajVer = nodeJavaMajorVersion(n);
 
@@ -1301,6 +1307,17 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
                     ", locNodeId=" + locNode.id() + ", rmtNode=" + U.toShortString(n) + "]");
             }
 
+            Boolean rmtMvccEnabled = n.attribute(ATTR_MVCC_ENABLED);
+
+            if (!F.eq(locMvccEnabled, rmtMvccEnabled)) {
+                throw new IgniteCheckedException("Remote node has MVCC mode different from local " +
+                    "[locId8=" +  U.id8(locNode.id()) +
+                    ", locMvccMode=" + (Boolean.TRUE.equals(locMvccEnabled) ? "ENABLED" : "DISABLED") +
+                    ", rmtId8=" + U.id8(n.id()) +
+                    ", rmtMvccMode=" + (Boolean.TRUE.equals(rmtMvccEnabled) ? "ENABLED" : "DISABLED") +
+                    ", rmtAddrs=" + U.addressesAsString(n) + ", rmtNode=" + U.toShortString(n) + "]");
+            }
+
             if (n.version().compareToIgnoreTimestamp(SERVICE_PERMISSIONS_SINCE) >= 0
                 && ctx.security().enabled() // Matters only if security enabled.
                ) {
@@ -2353,6 +2370,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
         Collection<ClusterNode> topSnapshot) {
         assert topSnapshot.contains(loc);
 
+        MvccCoordinator mvccCrd = ctx.coordinators().coordinatorFromDiscoveryEvent();
+
         HashSet<UUID> alives = U.newHashSet(topSnapshot.size());
         HashMap<UUID, ClusterNode> nodeMap = U.newHashMap(topSnapshot.size());
 
@@ -2454,6 +2473,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
             topVer,
             state,
             loc,
+            mvccCrd,
             Collections.unmodifiableList(rmtNodes),
             Collections.unmodifiableList(allNodes),
             Collections.unmodifiableList(srvNodes),
@@ -3362,6 +3382,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
             topVer,
             discoCache.state(),
             discoCache.localNode(),
+            discoCache.mvccCoordinator(),
             discoCache.remoteNodes(),
             allNodes,
             discoCache.serverNodes(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java
index 5475bef..ff0c66a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java
@@ -26,6 +26,7 @@ import org.apache.ignite.internal.processors.cache.CacheGroupContext;
 import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedManager;
 import org.apache.ignite.internal.processors.cache.StoredCacheData;
+import org.apache.ignite.internal.processors.cache.persistence.AllocatedPageTracker;
 import org.apache.ignite.internal.processors.cluster.IgniteChangeGlobalStateSupport;
 
 /**
@@ -43,6 +44,18 @@ public interface IgnitePageStoreManager extends GridCacheSharedManager, IgniteCh
     public void finishRecover() throws IgniteCheckedException;
 
     /**
+     * Initializes disk store structures.
+     *
+     * @param cacheId Cache id.
+     * @param partitions Partitions count.
+     * @param workingDir Working directory.
+     * @param tracker Allocation tracker.
+     * @throws IgniteCheckedException If failed.
+     */
+    void initialize(int cacheId, int partitions, String workingDir, AllocatedPageTracker tracker)
+        throws IgniteCheckedException;
+
+    /**
      * Callback called when a cache is starting.
      *
      * @param grpDesc Cache group descriptor.

http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
index 585336a..a555aae 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
@@ -184,7 +184,16 @@ public abstract class WALRecord {
         RESERVED,
 
         /** Rotated id part record. */
-        ROTATED_ID_PART_RECORD;
+        ROTATED_ID_PART_RECORD,
+
+        /** */
+        MVCC_DATA_PAGE_MARK_UPDATED_RECORD,
+
+        /** */
+        MVCC_DATA_PAGE_TX_STATE_HINT_UPDATED_RECORD,
+
+        /** */
+        MVCC_DATA_PAGE_NEW_TX_STATE_HINT_UPDATED_RECORD;
 
         /** */
         private static final RecordType[] VALS = RecordType.values();

http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageMvccMarkUpdatedRecord.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageMvccMarkUpdatedRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageMvccMarkUpdatedRecord.java
new file mode 100644
index 0000000..5e89f8e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageMvccMarkUpdatedRecord.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagemem.wal.record.delta;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.pagemem.PageMemory;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPageIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * MVCC mark updated record.
+ */
+public class DataPageMvccMarkUpdatedRecord extends PageDeltaRecord {
+    /** */
+    private int itemId;
+
+    /** */
+    private long newMvccCrd;
+
+    /** */
+    private long newMvccCntr;
+
+    /** */
+    private int newMvccOpCntr;
+
+    /**
+     * @param grpId Cache group ID.
+     * @param pageId Page ID.
+     * @param itemId Item id.
+     * @param newMvccCrd New MVCC coordinator version.
+     * @param newMvccCntr New MVCC counter version.
+     * @param newMvccOpCntr New MVCC operation counter.
+     */
+    public DataPageMvccMarkUpdatedRecord(int grpId, long pageId, int itemId, long newMvccCrd, long newMvccCntr, int newMvccOpCntr) {
+        super(grpId, pageId);
+
+        this.itemId = itemId;
+        this.newMvccCrd = newMvccCrd;
+        this.newMvccCntr = newMvccCntr;
+        this.newMvccOpCntr = newMvccOpCntr;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void applyDelta(PageMemory pageMem, long pageAddr) throws IgniteCheckedException {
+        DataPageIO io = PageIO.getPageIO(pageAddr);
+
+        io.updateNewVersion(pageAddr, itemId, pageMem.pageSize(), newMvccCrd, newMvccCntr, newMvccOpCntr);
+    }
+
+    /** {@inheritDoc} */
+    @Override public RecordType type() {
+        return RecordType.MVCC_DATA_PAGE_MARK_UPDATED_RECORD;
+    }
+
+    /**
+     * @return Item id.
+     */
+    public int itemId() {
+        return itemId;
+    }
+
+    /**
+     * @return New MVCC coordinator version.
+     */
+    public long newMvccCrd() {
+        return newMvccCrd;
+    }
+
+    /**
+     * @return New MVCC counter version.
+     */
+    public long newMvccCntr() {
+        return newMvccCntr;
+    }
+
+    /**
+     * @return New MVCC operation counter.
+     */
+    public int newMvccOpCntr() {
+        return newMvccOpCntr;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(DataPageMvccMarkUpdatedRecord.class, this, "super", super.toString());
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageMvccUpdateNewTxStateHintRecord.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageMvccUpdateNewTxStateHintRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageMvccUpdateNewTxStateHintRecord.java
new file mode 100644
index 0000000..4a244a1
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageMvccUpdateNewTxStateHintRecord.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagemem.wal.record.delta;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.pagemem.PageMemory;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPageIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * MVCC update tx state hint record.
+ */
+public class DataPageMvccUpdateNewTxStateHintRecord extends PageDeltaRecord {
+    /** */
+    private int itemId;
+
+    /** */
+    private byte txState;
+
+    /**
+     * @param grpId Cache group ID.
+     * @param pageId Page ID.
+     * @param itemId Item id.
+     * @param txState Tx state hint.
+     */
+    public DataPageMvccUpdateNewTxStateHintRecord(int grpId, long pageId, int itemId, byte txState) {
+        super(grpId, pageId);
+
+        this.itemId = itemId;
+        this.txState = txState;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void applyDelta(PageMemory pageMem, long pageAddr) throws IgniteCheckedException {
+        DataPageIO io = PageIO.getPageIO(pageAddr);
+
+        io.updateNewTxState(pageAddr, itemId, pageMem.pageSize(), txState);
+    }
+
+    /** {@inheritDoc} */
+    @Override public RecordType type() {
+        return RecordType.MVCC_DATA_PAGE_NEW_TX_STATE_HINT_UPDATED_RECORD;
+    }
+
+    /**
+     * @return Item id.
+     */
+    public int itemId() {
+        return itemId;
+    }
+
+    /**
+     * @return Tx state hint.
+     */
+    public byte txState() {
+        return txState;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(DataPageMvccUpdateNewTxStateHintRecord.class, this, "super", super.toString());
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageMvccUpdateTxStateHintRecord.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageMvccUpdateTxStateHintRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageMvccUpdateTxStateHintRecord.java
new file mode 100644
index 0000000..7e53609
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageMvccUpdateTxStateHintRecord.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagemem.wal.record.delta;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.pagemem.PageMemory;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPageIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * MVCC update tx state hint record.
+ */
+public class DataPageMvccUpdateTxStateHintRecord extends PageDeltaRecord {
+    /** */
+    private int itemId;
+
+    /** */
+    private byte txState;
+
+    /**
+     * @param grpId Cache group ID.
+     * @param pageId Page ID.
+     * @param itemId Item id.
+     * @param txState Tx state hint.
+     */
+    public DataPageMvccUpdateTxStateHintRecord(int grpId, long pageId, int itemId, byte txState) {
+        super(grpId, pageId);
+
+        this.itemId = itemId;
+        this.txState = txState;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void applyDelta(PageMemory pageMem, long pageAddr) throws IgniteCheckedException {
+        DataPageIO io = PageIO.getPageIO(pageAddr);
+
+        io.updateTxState(pageAddr, itemId, pageMem.pageSize(), txState);
+    }
+
+    /** {@inheritDoc} */
+    @Override public RecordType type() {
+        return RecordType.MVCC_DATA_PAGE_TX_STATE_HINT_UPDATED_RECORD;
+    }
+
+    /**
+     * @return Item id.
+     */
+    public int itemId() {
+        return itemId;
+    }
+
+    /**
+     * @return Tx state hint.
+     */
+    public byte txState() {
+        return txState;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(DataPageMvccUpdateTxStateHintRecord.class, this, "super", super.toString());
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityAssignment.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityAssignment.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityAssignment.java
index f78ab60..1c321d3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityAssignment.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityAssignment.java
@@ -17,12 +17,12 @@
 
 package org.apache.ignite.internal.processors.affinity;
 
-import org.apache.ignite.cluster.ClusterNode;
-
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.UUID;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator;
 
 /**
  * Cached affinity calculations.
@@ -90,4 +90,9 @@ public interface AffinityAssignment {
      * @return Backup partitions for specified node ID.
      */
     public Set<Integer> backupPartitions(UUID nodeId);
+
+    /**
+     * @return Mvcc coordinator.
+     */
+    public MvccCoordinator mvccCoordinator();
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java
index cbec1a1..f96bc9d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator;
 import org.apache.ignite.internal.util.typedef.internal.S;
 
 /**
@@ -40,6 +41,9 @@ public class GridAffinityAssignment implements AffinityAssignment, Serializable
     /** Topology version. */
     private final AffinityTopologyVersion topVer;
 
+    /** */
+    private final MvccCoordinator mvccCrd;
+
     /** Collection of calculated affinity nodes. */
     private List<List<ClusterNode>> assignment;
 
@@ -73,6 +77,7 @@ public class GridAffinityAssignment implements AffinityAssignment, Serializable
         this.topVer = topVer;
         primary = new HashMap<>();
         backup = new HashMap<>();
+        mvccCrd = null;
         clientEvtChange = false;
     }
 
@@ -83,7 +88,8 @@ public class GridAffinityAssignment implements AffinityAssignment, Serializable
      */
     GridAffinityAssignment(AffinityTopologyVersion topVer,
         List<List<ClusterNode>> assignment,
-        List<List<ClusterNode>> idealAssignment) {
+        List<List<ClusterNode>> idealAssignment,
+        MvccCoordinator mvccCrd) {
         assert topVer != null;
         assert assignment != null;
         assert idealAssignment != null;
@@ -91,6 +97,7 @@ public class GridAffinityAssignment implements AffinityAssignment, Serializable
         this.topVer = topVer;
         this.assignment = assignment;
         this.idealAssignment = idealAssignment.equals(assignment) ? assignment : idealAssignment;
+        this.mvccCrd = mvccCrd;
 
         primary = new HashMap<>();
         backup = new HashMap<>();
@@ -110,6 +117,7 @@ public class GridAffinityAssignment implements AffinityAssignment, Serializable
         idealAssignment = aff.idealAssignment;
         primary = aff.primary;
         backup = aff.backup;
+        mvccCrd = aff.mvccCrd;
 
         clientEvtChange = true;
     }
@@ -283,6 +291,11 @@ public class GridAffinityAssignment implements AffinityAssignment, Serializable
     }
 
     /** {@inheritDoc} */
+    @Override public MvccCoordinator mvccCoordinator() {
+        return mvccCrd;
+    }
+
+    /** {@inheritDoc} */
     @Override public int hashCode() {
         return topVer.hashCode();
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
index 34e2b0a..cc2c17c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
@@ -43,6 +43,7 @@ import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.cluster.NodeOrderComparator;
 import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator;
 import org.apache.ignite.internal.processors.cache.ExchangeDiscoveryEvents;
 import org.apache.ignite.internal.processors.cluster.BaselineTopology;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -202,11 +203,25 @@ public class GridAffinityAssignmentCache {
      * @param affAssignment Affinity assignment for topology version.
      */
     public void initialize(AffinityTopologyVersion topVer, List<List<ClusterNode>> affAssignment) {
+        MvccCoordinator mvccCrd = ctx.cache().context().coordinators().currentCoordinator(topVer);
+
+        initialize(topVer, affAssignment, mvccCrd);
+    }
+
+    /**
+     * Initializes affinity with given topology version and assignment.
+     *
+     * @param topVer Topology version.
+     * @param affAssignment Affinity assignment for topology version.
+     * @param mvccCrd Mvcc coordinator.
+     */
+    public void initialize(AffinityTopologyVersion topVer, List<List<ClusterNode>> affAssignment, MvccCoordinator mvccCrd) {
         assert topVer.compareTo(lastVersion()) >= 0 : "[topVer = " + topVer + ", last=" + lastVersion() + ']';
 
         assert idealAssignment != null;
+        assert mvccCrd == null || topVer.compareTo(mvccCrd.topologyVersion()) >= 0 : "[mvccCrd=" + mvccCrd + ", topVer=" + topVer + ']';
 
-        GridAffinityAssignment assignment = new GridAffinityAssignment(topVer, affAssignment, idealAssignment);
+        GridAffinityAssignment assignment = new GridAffinityAssignment(topVer, affAssignment, idealAssignment, mvccCrd);
 
         HistoryAffinityAssignment hAff = affCache.put(topVer, new HistoryAffinityAssignment(assignment));
 
@@ -745,7 +760,9 @@ public class GridAffinityAssignmentCache {
 
         idealAssignment(aff.idealAssignment());
 
-        initialize(aff.lastVersion(), aff.assignments(aff.lastVersion()));
+        AffinityAssignment assign = aff.cachedAffinity(aff.lastVersion());
+
+        initialize(aff.lastVersion(), assign.assignment(), assign.mvccCoordinator());
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
index 08333c3..4a0908c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
@@ -423,7 +423,7 @@ public class GridAffinityProcessor extends GridProcessorAdapter {
             try {
                 GridAffinityAssignment assign = assign0 instanceof GridAffinityAssignment ?
                     (GridAffinityAssignment)assign0 :
-                    new GridAffinityAssignment(topVer, assign0.assignment(), assign0.idealAssignment());
+                    new GridAffinityAssignment(topVer, assign0.assignment(), assign0.idealAssignment(), assign0.mvccCoordinator());
 
                 AffinityInfo info = new AffinityInfo(
                     cctx.config().getAffinity(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityUtils.java
index abd5292..15d7e4e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityUtils.java
@@ -184,7 +184,7 @@ class GridAffinityUtils {
 
             GridAffinityAssignment assign = assign0 instanceof GridAffinityAssignment ?
                 (GridAffinityAssignment)assign0 :
-                new GridAffinityAssignment(topVer, assign0.assignment(), assign0.idealAssignment());
+                new GridAffinityAssignment(topVer, assign0.assignment(), assign0.idealAssignment(), assign0.mvccCoordinator());
 
             return F.t(
                 affinityMessage(ctx, cctx.config().getAffinity()),

http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignment.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignment.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignment.java
index 94eaab4..1a6b2ef 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignment.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignment.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.affinity;
 
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -44,17 +45,26 @@ public class HistoryAffinityAssignment implements AffinityAssignment {
     /** */
     private final boolean clientEvtChange;
 
+    /** */
+    private final MvccCoordinator mvccCrd;
+
     /**
      * @param assign Assignment.
      */
-    public HistoryAffinityAssignment(GridAffinityAssignment assign) {
+    HistoryAffinityAssignment(GridAffinityAssignment assign) {
         this.topVer = assign.topologyVersion();
         this.assignment = assign.assignment();
         this.idealAssignment = assign.idealAssignment();
+        this.mvccCrd = assign.mvccCoordinator();
         this.clientEvtChange = assign.clientEventChange();
     }
 
     /** {@inheritDoc} */
+    @Override public MvccCoordinator mvccCoordinator() {
+        return mvccCrd;
+    }
+
+    /** {@inheritDoc} */
     @Override public boolean clientEventChange() {
         return clientEvtChange;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 0b448e1..5859452 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -58,6 +58,8 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator;
+import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
 import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState;
 import org.apache.ignite.internal.util.GridLongList;
 import org.apache.ignite.internal.util.GridPartitionStateMap;
@@ -458,7 +460,11 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                         if (grpHolder.client()) {
                             ClientCacheDhtTopologyFuture topFut = new ClientCacheDhtTopologyFuture(topVer);
 
-                            grp.topology().updateTopologyVersion(topFut, discoCache, -1, false);
+                            grp.topology().updateTopologyVersion(topFut,
+                                discoCache,
+                                cctx.coordinators().currentCoordinator(),
+                                -1,
+                                false);
 
                             grpHolder = new CacheGroupHolder1(grp, grpHolder.affinity());
 
@@ -506,6 +512,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                 assert grp != null;
 
                 GridDhtAffinityAssignmentResponse res = fetchAffinity(topVer,
+                    cctx.coordinators().currentCoordinator(),
                     null,
                     discoCache,
                     grp.affinity(),
@@ -528,7 +535,11 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                         new ClusterTopologyServerNotFoundException("All server nodes left grid."));
                 }
 
-                grp.topology().updateTopologyVersion(topFut, discoCache, -1, false);
+                grp.topology().updateTopologyVersion(topFut,
+                    discoCache,
+                    cctx.coordinators().currentCoordinator(),
+                    -1,
+                    false);
 
                 grp.topology().update(topVer, partMap, null, Collections.<Integer>emptySet(), null, null);
 
@@ -1284,7 +1295,12 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
 
             fetchFut.init(false);
 
-            fetchAffinity(evts.topologyVersion(), evts, evts.discoveryCache(), aff, fetchFut);
+            fetchAffinity(evts.topologyVersion(),
+                cctx.coordinators().currentCoordinator(),
+                evts,
+                evts.discoveryCache(),
+                aff,
+                fetchFut);
         }
     }
 
@@ -1682,6 +1698,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
             int grpId = fetchFut.groupId();
 
             fetchAffinity(topVer,
+                cctx.coordinators().currentCoordinator(),
                 fut.events(),
                 fut.events().discoveryCache(),
                 cctx.cache().cacheGroup(grpId).affinity(),
@@ -1691,6 +1708,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
 
     /**
      * @param topVer Topology version.
+     * @param mvccCrd Mvcc coordinator to set in affinity.
      * @param events Discovery events.
      * @param discoCache Discovery data cache.
      * @param affCache Affinity.
@@ -1698,7 +1716,9 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
      * @throws IgniteCheckedException If failed.
      * @return Affinity assignment response.
      */
-    private GridDhtAffinityAssignmentResponse fetchAffinity(AffinityTopologyVersion topVer,
+    private GridDhtAffinityAssignmentResponse fetchAffinity(
+        AffinityTopologyVersion topVer,
+        MvccCoordinator mvccCrd,
         @Nullable ExchangeDiscoveryEvents events,
         DiscoCache discoCache,
         GridAffinityAssignmentCache affCache,
@@ -1711,7 +1731,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         if (res == null) {
             List<List<ClusterNode>> aff = affCache.calculate(topVer, events, discoCache);
 
-            affCache.initialize(topVer, aff);
+            affCache.initialize(topVer, aff, mvccCrd);
         }
         else {
             List<List<ClusterNode>> idealAff = res.idealAffinityAssignment(discoCache);
@@ -1728,7 +1748,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
 
             assert aff != null : res;
 
-            affCache.initialize(topVer, aff);
+            affCache.initialize(topVer, aff, mvccCrd);
         }
 
         return res;
@@ -1847,6 +1867,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                             @Override public void applyx(IgniteInternalFuture<GridDhtAffinityAssignmentResponse> fetchFut)
                                 throws IgniteCheckedException {
                                 fetchAffinity(prev.topologyVersion(),
+                                    null, // Pass null mvcc coordinator, this affinity version should be used for queries.
                                     prev.events(),
                                     prev.events().discoveryCache(),
                                     aff,

http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryInfoCollection.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryInfoCollection.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryInfoCollection.java
index 49f77fa..614d7c0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryInfoCollection.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryInfoCollection.java
@@ -37,6 +37,18 @@ public class CacheEntryInfoCollection implements Message {
     @GridDirectCollection(GridCacheEntryInfo.class)
     private List<GridCacheEntryInfo> infos;
 
+    /** */
+    public CacheEntryInfoCollection() {
+        // No-op
+    }
+
+    /**
+     * @param infos List of cache entry info.
+     */
+    public CacheEntryInfoCollection(List<GridCacheEntryInfo> infos) {
+        this.infos = infos;
+    }
+
     /**
      *
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
index c8fe283..c100d16 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
@@ -30,6 +30,7 @@ import org.apache.ignite.cache.affinity.AffinityFunction;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.DataPageEvictionMode;
+import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.TopologyValidator;
 import org.apache.ignite.events.CacheRebalancingEvent;
 import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
@@ -60,6 +61,7 @@ import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.mxbean.CacheGroupMetricsMXBean;
 import org.jetbrains.annotations.Nullable;
 
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
 import static org.apache.ignite.cache.CacheMode.LOCAL;
 import static org.apache.ignite.cache.CacheMode.REPLICATED;
 import static org.apache.ignite.cache.CacheRebalanceMode.NONE;
@@ -147,6 +149,9 @@ public class CacheGroupContext {
     /** */
     private boolean qryEnabled;
 
+    /** */
+    private boolean mvccEnabled;
+
     /** MXBean. */
     private CacheGroupMetricsMXBean mxBean;
 
@@ -218,6 +223,28 @@ public class CacheGroupContext {
         caches = new ArrayList<>();
 
         mxBean = new CacheGroupMetricsMXBeanImpl(this);
+
+        mvccEnabled = mvccEnabled(ctx.gridConfig(), ccfg, cacheType);
+    }
+
+    /**
+     * @param cfg Ignite configuration.
+     * @param ccfg Cache configuration.
+     * @param cacheType Cache typr.
+     * @return {@code True} if mvcc is enabled for given cache.
+     */
+    public static boolean mvccEnabled(IgniteConfiguration cfg, CacheConfiguration ccfg, CacheType cacheType) {
+        return cfg.isMvccEnabled() &&
+            cacheType == CacheType.USER &&
+            ccfg.getCacheMode() != LOCAL &&
+            ccfg.getAtomicityMode() == TRANSACTIONAL;
+    }
+
+    /**
+     * @return Mvcc flag.
+     */
+    public boolean mvccEnabled() {
+        return mvccEnabled;
     }
 
     /**
@@ -396,6 +423,13 @@ public class CacheGroupContext {
     }
 
     /**
+     * @return {@code True} if cache created by user.
+     */
+    public boolean userCache() {
+        return cacheType.userCache();
+    }
+
+    /**
      * Adds rebalancing event.
      *
      * @param part Partition.

http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationContext.java
index 8a7afe7..53f9e22 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationContext.java
@@ -163,7 +163,7 @@ public class CacheOperationContext implements Serializable {
     /**
      * Gets data center ID.
      *
-     * @return Client ID.
+     * @return Datacenter ID.
      */
     @Nullable public Byte dataCenterId() {
         return dataCenterId;

http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
index 3aaf7f3..8eab9c0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
@@ -147,7 +147,7 @@ class ClusterCachesInfo {
             if (ccfg == null)
                 grpCfgs.put(info.cacheData().config().getGroupName(), info.cacheData().config());
             else
-                validateCacheGroupConfiguration(ccfg, info.cacheData().config());
+                validateCacheGroupConfiguration(ccfg, info.cacheData().config(), info.cacheType());
         }
 
         String conflictErr = processJoiningNode(joinDiscoData, ctx.localNodeId(), true);
@@ -220,7 +220,7 @@ class ClusterCachesInfo {
                 }
 
                 if (checkConsistency)
-                    validateStartCacheConfiguration(locCfg);
+                    validateStartCacheConfiguration(locCfg, cacheData.cacheType());
             }
         }
 
@@ -1864,16 +1864,17 @@ class ClusterCachesInfo {
 
     /**
      * @param ccfg Cache configuration to start.
+     * @param cacheType Cache type.
      * @throws IgniteCheckedException If failed.
      */
-    public void validateStartCacheConfiguration(CacheConfiguration ccfg) throws IgniteCheckedException {
+    void validateStartCacheConfiguration(CacheConfiguration ccfg, CacheType cacheType) throws IgniteCheckedException {
         if (ccfg.getGroupName() != null) {
             CacheGroupDescriptor grpDesc = cacheGroupByName(ccfg.getGroupName());
 
             if (grpDesc != null) {
                 assert ccfg.getGroupName().equals(grpDesc.groupName());
 
-                validateCacheGroupConfiguration(grpDesc.config(), ccfg);
+                validateCacheGroupConfiguration(grpDesc.config(), ccfg, cacheType);
             }
         }
     }
@@ -1881,9 +1882,10 @@ class ClusterCachesInfo {
     /**
      * @param cfg Existing configuration.
      * @param startCfg Cache configuration to start.
+     * @param cacheType Cache type.
      * @throws IgniteCheckedException If validation failed.
      */
-    private void validateCacheGroupConfiguration(CacheConfiguration cfg, CacheConfiguration startCfg)
+    private void validateCacheGroupConfiguration(CacheConfiguration cfg, CacheConfiguration startCfg, CacheType cacheType)
         throws IgniteCheckedException {
         GridCacheAttributes attr1 = new GridCacheAttributes(cfg);
         GridCacheAttributes attr2 = new GridCacheAttributes(startCfg);
@@ -1891,6 +1893,11 @@ class ClusterCachesInfo {
         CU.validateCacheGroupsAttributesMismatch(log, cfg, startCfg, "cacheMode", "Cache mode",
             cfg.getCacheMode(), startCfg.getCacheMode(), true);
 
+        CU.validateCacheGroupsAttributesMismatch(log, cfg, startCfg, "mvccEnabled", "MVCC mode",
+            CacheGroupContext.mvccEnabled(ctx.config(), cfg, cacheType),
+            CacheGroupContext.mvccEnabled(ctx.config(), startCfg, cacheType),
+            true);
+
         CU.validateCacheGroupsAttributesMismatch(log, cfg, startCfg, "affinity", "Affinity function",
             attr1.cacheAffinityClassName(), attr2.cacheAffinityClassName(), true);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java
index 4046c98..34ed048 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java
@@ -17,11 +17,15 @@
 
 package org.apache.ignite.internal.processors.cache;
 
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Set;
+import java.util.UUID;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
+import org.apache.ignite.internal.util.GridLongList;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.jetbrains.annotations.Nullable;
 
@@ -51,11 +55,20 @@ public class ExchangeContext {
     /** */
     private final boolean compatibilityNode = getBoolean(IGNITE_EXCHANGE_COMPATIBILITY_VER_1, false);
 
+    /** */
+    private final boolean newMvccCrd;
+
+    /** Currently running mvcc queries, initialized when mvcc coordinator is changed. */
+    private Map<UUID, GridLongList> activeQueries;
+
     /**
      * @param crd Coordinator flag.
+     * @param newMvccCrd {@code True} if new coordinator assigned during this exchange.
      * @param fut Exchange future.
      */
-    public ExchangeContext(boolean crd, GridDhtPartitionsExchangeFuture fut) {
+    public ExchangeContext(boolean crd, boolean newMvccCrd, GridDhtPartitionsExchangeFuture fut) {
+        this.newMvccCrd = newMvccCrd;
+
         int protocolVer = exchangeProtocolVersion(fut.firstEventCache().minimumNodeVersion());
 
         if (compatibilityNode || (crd && fut.localJoinExchange())) {
@@ -124,6 +137,34 @@ public class ExchangeContext {
         return merge;
     }
 
+    /**
+     * @return {@code True} if new node assigned as mvcc coordinator node during this exchange.
+     */
+    public boolean newMvccCoordinator() {
+        return newMvccCrd;
+    }
+
+    /**
+     * @return Active queries.
+     */
+    public Map<UUID, GridLongList> activeQueries() {
+        return activeQueries;
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @param nodeQueries Node queries.
+     */
+    public void addActiveQueries(UUID nodeId, @Nullable GridLongList nodeQueries) {
+        if (nodeQueries == null)
+            return;
+
+        if (activeQueries == null)
+            activeQueries = new HashMap<>();
+
+        activeQueries.put(nodeId, nodeQueries);
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(ExchangeContext.class, this);

http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GatewayProtectedCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GatewayProtectedCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GatewayProtectedCacheProxy.java
index 96c5e29..c99eb00 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GatewayProtectedCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GatewayProtectedCacheProxy.java
@@ -49,6 +49,7 @@ import org.apache.ignite.cache.query.QueryMetrics;
 import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.cluster.ClusterGroup;
 import org.apache.ignite.internal.AsyncSupportAdapter;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccUtils;
 import org.apache.ignite.internal.GridKernalState;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.future.IgniteFutureImpl;
@@ -145,6 +146,8 @@ public class GatewayProtectedCacheProxy<K, V> extends AsyncSupportAdapter<Ignite
         CacheOperationGate opGate = onEnter();
 
         try {
+            MvccUtils.verifyMvccOperationSupport(delegate.context(), "withExpiryPolicy");
+
             return new GatewayProtectedCacheProxy<>(delegate, opCtx.withExpiryPolicy(plc), lock);
         }
         finally {

http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index bc14c54..159d681 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -92,6 +92,8 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopolo
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxLocalAdapter;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
 import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccUtils;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalAdapter;
@@ -138,7 +140,6 @@ import org.apache.ignite.mxbean.CacheMetricsMXBean;
 import org.apache.ignite.plugin.security.SecurityPermission;
 import org.apache.ignite.resources.IgniteInstanceResource;
 import org.apache.ignite.resources.JobContextResource;
-import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
 import org.apache.ignite.transactions.Transaction;
 import org.apache.ignite.transactions.TransactionConcurrency;
 import org.apache.ignite.transactions.TransactionIsolation;
@@ -816,6 +817,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
 
         ctx.checkSecurity(SecurityPermission.CACHE_READ);
 
+        //TODO IGNITE-7955
+        MvccUtils.verifyMvccOperationSupport(ctx, "Peek");
+
         PeekModes modes = parsePeekModes(peekModes, false);
 
         KeyCacheObject cacheKey = ctx.toCacheKeyObject(key);
@@ -1121,6 +1125,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
     @Override public void clearLocally(boolean srv, boolean near, boolean readers) {
         ctx.checkSecurity(SecurityPermission.CACHE_REMOVE);
 
+        //TODO IGNITE-7952
+        MvccUtils.verifyMvccOperationSupport(ctx, "Clear");
+
         List<GridCacheClearAllRunnable<K, V>> jobs = splitClearLocally(srv, near, readers);
 
         if (!F.isEmpty(jobs)) {
@@ -1190,6 +1197,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
      * @throws IgniteCheckedException In case of error.
      */
     private void clear(@Nullable Set<? extends K> keys) throws IgniteCheckedException {
+        //TODO IGNITE-7952
+        MvccUtils.verifyMvccOperationSupport(ctx, "Clear");
+
         if (isLocal()) {
             if (keys == null)
                 clearLocally(true, false, false);
@@ -1207,6 +1217,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
      * @return Future.
      */
     private IgniteInternalFuture<?> clearAsync(@Nullable final Set<? extends K> keys) {
+        //TODO IGNITE-7952
+        MvccUtils.verifyMvccOperationSupport(ctx, "Clear");
+
         if (isLocal())
             return clearLocallyAsync(keys);
         else
@@ -1262,6 +1275,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
         if (F.isEmpty(keys))
             return;
 
+        //TODO IGNITE-7952
+        MvccUtils.verifyMvccOperationSupport(ctx, "Clear");
+
         GridCacheVersion obsoleteVer = ctx.versions().next();
 
         for (KeyCacheObject key : keys) {
@@ -1881,7 +1897,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
             skipVals,
             /*keep cache objects*/false,
             recovery,
-            needVer);
+            needVer,
+            null); // TODO IGNITE-7371
     }
 
     /**
@@ -1896,6 +1913,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
      * @param skipVals Skip values flag.
      * @param keepCacheObjects Keep cache objects.
      * @param needVer If {@code true} returns values as tuples containing value and version.
+     * @param mvccSnapshot MVCC snapshot.
      * @return Future.
      */
     protected final <K1, V1> IgniteInternalFuture<Map<K1, V1>> getAllAsync0(
@@ -1910,7 +1928,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
         final boolean skipVals,
         final boolean keepCacheObjects,
         final boolean recovery,
-        final boolean needVer
+        final boolean needVer,
+        MvccSnapshot mvccSnapshot
     ) {
         if (F.isEmpty(keys))
             return new GridFinishedFuture<>(Collections.<K1, V1>emptyMap());
@@ -1967,7 +1986,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
                             boolean skipEntry = readNoEntry;
 
                             if (readNoEntry) {
-                                CacheDataRow row = ctx.offheap().read(ctx, key);
+                                CacheDataRow row = mvccSnapshot != null ? ctx.offheap().mvccRead(ctx, key, mvccSnapshot) :
+                                    ctx.offheap().read(ctx, key);
 
                                 if (row != null) {
                                     long expireTime = row.expireTime();
@@ -2030,6 +2050,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
                                         taskName,
                                         expiry,
                                         !deserializeBinary,
+                                        mvccSnapshot,
                                         readerArgs);
 
                                     assert res != null;
@@ -2054,6 +2075,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
                                         taskName,
                                         expiry,
                                         !deserializeBinary,
+                                        mvccSnapshot,
                                         readerArgs);
 
                                     if (res == null)
@@ -3072,7 +3094,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
         List<K> keys = new ArrayList<>(Math.min(REMOVE_ALL_KEYS_BATCH, size()));
 
         do {
-            for (Iterator<CacheDataRow> it = ctx.offheap().cacheIterator(ctx.cacheId(), true, true, null);
+            for (Iterator<CacheDataRow> it = ctx.offheap().cacheIterator(ctx.cacheId(), true, true, null, null);
                 it.hasNext() && keys.size() < REMOVE_ALL_KEYS_BATCH; )
                 keys.add((K)it.next().key());
 
@@ -3397,6 +3419,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
         if (keyCheck)
             validateCacheKeys(keys);
 
+        //TODO IGNITE-7764
+        MvccUtils.verifyMvccOperationSupport(ctx, "Lock");
+
         IgniteInternalFuture<Boolean> fut = lockAllAsync(keys, timeout);
 
         boolean isInterrupted = false;
@@ -3425,6 +3450,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
         if (keyCheck)
             validateCacheKey(key);
 
+        //TODO IGNITE-7764
+        MvccUtils.verifyMvccOperationSupport(ctx, "Lock");
+
         return lockAllAsync(Collections.singletonList(key), timeout);
     }
 
@@ -3534,6 +3562,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
     /** {@inheritDoc} */
     @Override public void localLoadCache(final IgniteBiPredicate<K, V> p, Object[] args)
         throws IgniteCheckedException {
+        //TODO IGNITE-7954
+        MvccUtils.verifyMvccOperationSupport(ctx, "Load");
+
         final boolean replicate = ctx.isDrEnabled();
         final AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion();
 
@@ -3671,6 +3702,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
         if (!ctx.store().configured())
             return new GridFinishedFuture<>();
 
+        //TODO IGNITE-7954
+        MvccUtils.verifyMvccOperationSupport(ctx, "Load");
+
         CacheOperationContext opCtx = ctx.operationContextPerCall();
 
         ExpiryPolicy plc = opCtx != null ? opCtx.expiry() : null;
@@ -3755,6 +3789,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
      */
     public void localLoad(Collection<? extends K> keys, @Nullable ExpiryPolicy plc, final boolean keepBinary)
         throws IgniteCheckedException {
+        //TODO IGNITE-7954
+        MvccUtils.verifyMvccOperationSupport(ctx, "Load");
+
         final boolean replicate = ctx.isDrEnabled();
         final AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion();
 
@@ -3827,6 +3864,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
 
         assert !F.isEmpty(nodes) : "There are not datanodes fo cache: " + ctx.name();
 
+        //TODO IGNITE-7954
+        MvccUtils.verifyMvccOperationSupport(ctx, "Load");
+
         final boolean keepBinary = opCtx != null && opCtx.isKeepBinary();
 
         ComputeTaskInternalFuture fut = ctx.kernalContext().closure().callAsync(BROADCAST,
@@ -4185,6 +4225,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
                     READ_COMMITTED,
                     tCfg.getDefaultTxTimeout(),
                     !ctx.skipStore(),
+                    false,
                     0,
                     null
                 );
@@ -4286,6 +4327,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
                     READ_COMMITTED,
                     txCfg.getDefaultTxTimeout(),
                     !skipStore,
+                    false,
                     0,
                     null);
 
@@ -4556,6 +4598,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
      * @param readers Whether to clear readers.
      */
     private boolean clearLocally0(K key, boolean readers) {
+        //TODO IGNITE-7952
+        MvccUtils.verifyMvccOperationSupport(ctx, "Clear");
+
         ctx.checkSecurity(SecurityPermission.CACHE_REMOVE);
 
         if (keyCheck)
@@ -4593,6 +4638,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
         if (keyCheck)
             validateCacheKey(key);
 
+        //TODO IGNITE-7956
+        MvccUtils.verifyMvccOperationSupport(ctx, "Evict");
+
         return evictx(key, ctx.versions().next(), CU.empty0());
     }
 
@@ -4606,6 +4654,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
         if (keyCheck)
             validateCacheKey(keys);
 
+        //TODO IGNITE-7956
+        MvccUtils.verifyMvccOperationSupport(ctx, "Evict");
+
         GridCacheVersion obsoleteVer = ctx.versions().next();
 
         try {
@@ -4890,7 +4941,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
             /*transformClo*/null,
             /*taskName*/null,
             /*expiryPlc*/null,
-            !deserializeBinary);
+            !deserializeBinary,
+            null); // TODO IGNITE-7371
 
         if (val == null)
             return null;
@@ -4950,6 +5002,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
                 READ_COMMITTED,
                 CU.transactionConfiguration(ctx, ctx.kernalContext().config()).getDefaultTxTimeout(),
                 opCtx == null || !opCtx.skipStore(),
+                false,
                 0,
                 null);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
index c9ee38c..cf4344d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
@@ -32,6 +32,7 @@ import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.typedef.F;
 import org.jetbrains.annotations.Nullable;
@@ -243,6 +244,10 @@ public class GridCacheAffinityManager extends GridCacheManagerAdapter {
         return aff0.cachedAffinity(topVer);
     }
 
+    public MvccCoordinator mvccCoordinator(AffinityTopologyVersion topVer) {
+        return assignment(topVer).mvccCoordinator();
+    }
+
     /**
      * @param key Key to check.
      * @param topVer Topology version.

http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index 61b1878..ac9de7c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -2114,6 +2114,13 @@ public class GridCacheContext<K, V> implements Externalizable {
     }
 
     /**
+     * @return {@code True} if mvcc is enabled for cache.
+     */
+    public boolean mvccEnabled() {
+        return grp.mvccEnabled();
+    }
+
+    /**
      * @param part Partition.
      * @param topVer Topology version.
      * @return {@code True} if partition is available locally.