You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2018/11/13 07:41:36 UTC

[2/2] ignite git commit: IGNITE-10052: MVCC: fixed local node recovery. This closes #5245. This closes #5345.

IGNITE-10052: MVCC: fixed local node recovery. This closes #5245. This closes #5345.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a719aa9c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a719aa9c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a719aa9c

Branch: refs/heads/master
Commit: a719aa9cafdf372aeaf067c5cecfc43b7e09989f
Parents: 2439ade
Author: AMRepo <an...@gmail.com>
Authored: Tue Nov 13 10:38:28 2018 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Tue Nov 13 10:38:28 2018 +0300

----------------------------------------------------------------------
 .../internal/pagemem/wal/record/DataEntry.java  |   1 +
 .../pagemem/wal/record/LazyDataEntry.java       |   1 -
 .../pagemem/wal/record/LazyMvccDataEntry.java   | 149 ++++++++++
 .../pagemem/wal/record/MvccDataEntry.java       |  75 +++++
 .../pagemem/wal/record/MvccDataRecord.java      |  69 +++++
 .../pagemem/wal/record/MvccTxRecord.java        |  92 +++++++
 .../internal/pagemem/wal/record/TxRecord.java   |  34 +--
 .../pagemem/wal/record/UnwrapMvccDataEntry.java | 130 +++++++++
 .../internal/pagemem/wal/record/WALRecord.java  |  10 +-
 .../processors/cache/GridCacheMapEntry.java     | 107 ++++++--
 .../processors/cache/GridCacheProcessor.java    |  10 +-
 .../cache/IgniteCacheOffheapManager.java        |  39 ++-
 .../cache/IgniteCacheOffheapManagerImpl.java    | 106 ++++++++
 .../dht/GridDhtTxAbstractEnlistFuture.java      |  15 -
 .../cache/mvcc/MvccProcessorImpl.java           |  62 +++--
 .../processors/cache/mvcc/MvccUtils.java        |   9 +-
 .../processors/cache/mvcc/txlog/TxLog.java      |  37 ++-
 .../GridCacheDatabaseSharedManager.java         | 104 ++++++-
 .../persistence/GridCacheOffheapManager.java    |   8 +
 .../reader/StandaloneWalRecordsIterator.java    |  70 +++--
 .../wal/serializer/RecordDataV1Serializer.java  |  26 +-
 .../wal/serializer/RecordDataV2Serializer.java  | 141 +++++++++-
 .../wal/serializer/TxRecordSerializer.java      | 132 ++++++++-
 .../cache/transactions/IgniteTxAdapter.java     |  28 +-
 .../cache/transactions/IgniteTxHandler.java     |  12 +-
 .../cache/transactions/IgniteTxManager.java     |  80 +++++-
 .../cache/tree/mvcc/data/MvccUpdateDataRow.java |   2 +-
 .../ignite/failure/IoomFailureHandlerTest.java  |  28 +-
 .../cache/mvcc/CacheMvccProcessorTest.java      |  20 +-
 .../cache/mvcc/CacheMvccTxFailoverTest.java     | 272 +++++++++++++++++++
 .../cache/mvcc/CacheMvccVacuumTest.java         |   6 +
 .../testsuites/IgniteCacheMvccTestSuite.java    |   2 +
 .../ignite/development/utils/WalStat.java       |   4 +-
 33 files changed, 1646 insertions(+), 235 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/a719aa9c/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataEntry.java
index d13a68a..dd05726 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataEntry.java
@@ -61,6 +61,7 @@ public class DataEntry {
     @GridToStringInclude
     protected long partCnt;
 
+    /** Constructor. */
     private DataEntry() {
         // No-op, used from factory methods.
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/a719aa9c/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/LazyDataEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/LazyDataEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/LazyDataEntry.java
index 6b56da5..403d778 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/LazyDataEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/LazyDataEntry.java
@@ -143,5 +143,4 @@ public class LazyDataEntry extends DataEntry {
     public byte[] getValBytes() {
         return valBytes;
     }
-
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/a719aa9c/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/LazyMvccDataEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/LazyMvccDataEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/LazyMvccDataEntry.java
new file mode 100644
index 0000000..15b1468
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/LazyMvccDataEntry.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagemem.wal.record;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheOperation;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccVersion;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;
+
+/**
+ * Represents Data Entry ({@link #key}, {@link #val value}) pair update {@link #op operation}. <br>
+ * This Data entry was not converted to key, value pair during record deserialization.
+ */
+public class LazyMvccDataEntry extends MvccDataEntry {
+    /** */
+    private GridCacheSharedContext cctx;
+
+    /** Data Entry key type code. See {@link CacheObject} for built-in value type codes */
+    private byte keyType;
+
+    /** Key value bytes. */
+    private byte[] keyBytes;
+
+    /** Data Entry Value type code. See {@link CacheObject} for built-in value type codes */
+    private byte valType;
+
+    /** Value value bytes. */
+    private byte[] valBytes;
+
+    /**
+     * @param cctx Shared context.
+     * @param cacheId Cache ID.
+     * @param keyType Object type code for Key.
+     * @param keyBytes Data Entry Key value bytes.
+     * @param valType Object type code for Value.
+     * @param valBytes Data Entry Value value bytes.
+     * @param op Operation.
+     * @param nearXidVer Near transaction version.
+     * @param writeVer Write version.
+     * @param expireTime Expire time.
+     * @param partId Partition ID.
+     * @param partCnt Partition counter.
+     * @param mvccVer Mvcc version.
+     */
+    public LazyMvccDataEntry(
+        GridCacheSharedContext cctx,
+        int cacheId,
+        byte keyType,
+        byte[] keyBytes,
+        byte valType,
+        byte[] valBytes,
+        GridCacheOperation op,
+        GridCacheVersion nearXidVer,
+        GridCacheVersion writeVer,
+        long expireTime,
+        int partId,
+        long partCnt,
+        MvccVersion mvccVer
+    ) {
+        super(cacheId, null, null, op, nearXidVer, writeVer, expireTime, partId, partCnt, mvccVer);
+
+        this.cctx = cctx;
+        this.keyType = keyType;
+        this.keyBytes = keyBytes;
+        this.valType = valType;
+        this.valBytes = valBytes;
+    }
+
+    /** {@inheritDoc} */
+    @Override public KeyCacheObject key() {
+        try {
+            if (key == null) {
+                GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
+
+                if (cacheCtx == null)
+                    throw new IgniteException("Failed to find cache context for the given cache ID: " + cacheId);
+
+                IgniteCacheObjectProcessor co = cctx.kernalContext().cacheObjects();
+
+                key = co.toKeyCacheObject(cacheCtx.cacheObjectContext(), keyType, keyBytes);
+
+                if (key.partition() == -1)
+                    key.partition(partId);
+            }
+
+            return key;
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public CacheObject value() {
+        if (val == null && valBytes != null) {
+            GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
+
+            if (cacheCtx == null)
+                throw new IgniteException("Failed to find cache context for the given cache ID: " + cacheId);
+
+            IgniteCacheObjectProcessor co = cctx.kernalContext().cacheObjects();
+
+            val = co.toCacheObject(cacheCtx.cacheObjectContext(), valType, valBytes);
+        }
+
+        return val;
+    }
+
+    /** @return Data Entry Key type code. See {@link CacheObject} for built-in value type codes */
+    public byte getKeyType() {
+        return keyType;
+    }
+
+    /** @return Key value bytes. */
+    public byte[] getKeyBytes() {
+        return keyBytes;
+    }
+
+    /** @return Data Entry Value type code. See {@link CacheObject} for built-in value type codes */
+    public byte getValType() {
+        return valType;
+    }
+
+    /** @return Value value bytes. */
+    public byte[] getValBytes() {
+        return valBytes;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a719aa9c/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/MvccDataEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/MvccDataEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/MvccDataEntry.java
new file mode 100644
index 0000000..6480f6f
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/MvccDataEntry.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagemem.wal.record;
+
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.GridCacheOperation;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccVersion;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Represents Data Entry ({@link #key}, {@link #val value}) pair for mvcc update {@link #op operation} in WAL log.
+ */
+public class MvccDataEntry extends DataEntry {
+    /** Entry version. */
+    private MvccVersion mvccVer;
+
+    /**
+     * @param cacheId Cache ID.
+     * @param key Key.
+     * @param val Value or null for delete operation.
+     * @param op Operation.
+     * @param nearXidVer Near transaction version.
+     * @param writeVer Write version.
+     * @param expireTime Expire time.
+     * @param partId Partition ID.
+     * @param partCnt Partition counter.
+     * @param mvccVer Mvcc version.
+     */
+    public MvccDataEntry(
+        int cacheId,
+        KeyCacheObject key,
+        @Nullable CacheObject val,
+        GridCacheOperation op,
+        GridCacheVersion nearXidVer,
+        GridCacheVersion writeVer,
+        long expireTime,
+        int partId,
+        long partCnt,
+        MvccVersion mvccVer
+    ) {
+        super(cacheId, key, val, op, nearXidVer, writeVer, expireTime, partId, partCnt);
+
+        this.mvccVer = mvccVer;
+    }
+
+    /**
+     * @return Mvcc version.
+     */
+    public MvccVersion mvccVer() {
+        return mvccVer;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(MvccDataEntry.class, this);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/a719aa9c/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/MvccDataRecord.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/MvccDataRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/MvccDataRecord.java
new file mode 100644
index 0000000..276ba1b
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/MvccDataRecord.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagemem.wal.record;
+
+import java.util.Collections;
+import java.util.List;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ * Logical data record with cache operation description.
+ * This record contains information about operation we want to do.
+ * Contains operation type (put, remove) and (Key, Value, Version) for each {@link MvccDataEntry}
+ */
+public class MvccDataRecord extends DataRecord {
+    /** {@inheritDoc} */
+    @Override public RecordType type() {
+        return RecordType.MVCC_DATA_RECORD;
+    }
+
+    /**
+     * @param writeEntry Write entry.
+     */
+    public MvccDataRecord(MvccDataEntry writeEntry) {
+        this(writeEntry, U.currentTimeMillis());
+    }
+
+    /**
+     * @param writeEntries Write entries.
+     */
+    public MvccDataRecord(List<DataEntry> writeEntries) {
+        this(writeEntries, U.currentTimeMillis());
+    }
+
+    /**
+     * @param writeEntry Write entry.
+     */
+    public MvccDataRecord(MvccDataEntry writeEntry, long timestamp) {
+        this(Collections.singletonList(writeEntry), timestamp);
+    }
+
+    /**
+     * @param writeEntries Write entries.
+     * @param timestamp TimeStamp.
+     */
+    public MvccDataRecord(List<DataEntry> writeEntries, long timestamp) {
+        super(writeEntries, timestamp);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(MvccDataRecord.class, this, "super", super.toString());
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/a719aa9c/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/MvccTxRecord.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/MvccTxRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/MvccTxRecord.java
new file mode 100644
index 0000000..82c4409
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/MvccTxRecord.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagemem.wal.record;
+
+import java.util.Collection;
+import java.util.Map;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccVersion;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.transactions.TransactionState;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Logical data record indented for MVCC transaction related actions.<br>
+ * This record is marker of prepare, commit, and rollback transactions.
+ */
+public class MvccTxRecord extends TxRecord {
+    /** Transaction mvcc snapshot version. */
+    private final MvccVersion mvccVer;
+
+    /**
+     * @param state Transaction state.
+     * @param nearXidVer Transaction id.
+     * @param writeVer Transaction entries write topology version.
+     * @param participatingNodes Primary -> Backup nodes compact IDs participating in transaction.
+     * @param mvccVer Transaction snapshot version.
+     */
+    public MvccTxRecord(
+        TransactionState state,
+        GridCacheVersion nearXidVer,
+        GridCacheVersion writeVer,
+        @Nullable Map<Short, Collection<Short>> participatingNodes,
+        MvccVersion mvccVer
+    ) {
+        super(state, nearXidVer, writeVer, participatingNodes);
+
+        this.mvccVer = mvccVer;
+    }
+
+    /**
+     * @param state Transaction state.
+     * @param nearXidVer Transaction id.
+     * @param writeVer Transaction entries write topology version.
+     * @param mvccVer Transaction snapshot version.
+     * @param participatingNodes Primary -> Backup nodes participating in transaction.
+     * @param ts TimeStamp.
+     */
+    public MvccTxRecord(
+        TransactionState state,
+        GridCacheVersion nearXidVer,
+        GridCacheVersion writeVer,
+        @Nullable Map<Short, Collection<Short>> participatingNodes,
+        MvccVersion mvccVer,
+        long ts
+    ) {
+        super(state, nearXidVer, writeVer, participatingNodes, ts);
+
+        this.mvccVer = mvccVer;
+    }
+
+    /** {@inheritDoc} */
+    @Override public RecordType type() {
+        return RecordType.MVCC_TX_RECORD;
+    }
+
+    /**
+     * @return Mvcc version.
+     */
+    public MvccVersion mvccVersion() {
+        return mvccVer;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(MvccTxRecord.class, this, "super", super.toString());
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/a719aa9c/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/TxRecord.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/TxRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/TxRecord.java
index 90df004..ae8d112 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/TxRecord.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/TxRecord.java
@@ -32,14 +32,14 @@ import org.jetbrains.annotations.Nullable;
 public class TxRecord extends TimeStampRecord {
     /** Transaction state. */
     @GridToStringInclude
-    private TransactionState state;
+    private final TransactionState state;
 
     /** Global transaction identifier within cluster, assigned by transaction coordinator. */
     @GridToStringInclude
-    private GridCacheVersion nearXidVer;
+    private final GridCacheVersion nearXidVer;
 
     /** Transaction entries write topology version. */
-    private GridCacheVersion writeVer;
+    private final GridCacheVersion writeVer;
 
     /**
      * Transaction participating nodes.
@@ -103,13 +103,6 @@ public class TxRecord extends TimeStampRecord {
     }
 
     /**
-     * @param nearXidVer Near xid version.
-     */
-    public void nearXidVersion(GridCacheVersion nearXidVer) {
-        this.nearXidVer = nearXidVer;
-    }
-
-    /**
      * @return DHT version.
      */
     public GridCacheVersion writeVersion() {
@@ -117,13 +110,6 @@ public class TxRecord extends TimeStampRecord {
     }
 
     /**
-     * @param writeVer DHT version.
-     */
-    public void dhtVersion(GridCacheVersion writeVer) {
-        this.writeVer = writeVer;
-    }
-
-    /**
      * @return Transaction state.
      */
     public TransactionState state() {
@@ -131,26 +117,12 @@ public class TxRecord extends TimeStampRecord {
     }
 
     /**
-     * @param state Transaction state.
-     */
-    public void state(TransactionState state) {
-        this.state = state;
-    }
-
-    /**
      * @return Primary -> backup participating nodes compact IDs.
      */
     public Map<Short, Collection<Short>> participatingNodes() {
         return participatingNodes;
     }
 
-    /**
-     * @param participatingNodeIds Primary -> backup participating nodes compact IDs.
-     */
-    public void participatingNodes(Map<Short, Collection<Short>> participatingNodeIds) {
-        this.participatingNodes = participatingNodeIds;
-    }
-
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(TxRecord.class, this, "super", super.toString());

http://git-wip-us.apache.org/repos/asf/ignite/blob/a719aa9c/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/UnwrapMvccDataEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/UnwrapMvccDataEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/UnwrapMvccDataEntry.java
new file mode 100644
index 0000000..25d7a6e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/UnwrapMvccDataEntry.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagemem.wal.record;
+
+import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
+import org.apache.ignite.internal.processors.cache.GridCacheOperation;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccVersion;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+
+/**
+ * Data Entry for automatic unwrapping key and value from Mvcc Data Entry
+ */
+public class UnwrapMvccDataEntry extends MvccDataEntry {
+    /** Cache object value context. Context is used for unwrapping objects. */
+    private final CacheObjectValueContext cacheObjValCtx;
+
+    /** Keep binary. This flag disables converting of non primitive types (BinaryObjects). */
+    private boolean keepBinary;
+
+    /**
+     * @param cacheId Cache ID.
+     * @param key Key.
+     * @param val Value or null for delete operation.
+     * @param op Operation.
+     * @param nearXidVer Near transaction version.
+     * @param writeVer Write version.
+     * @param expireTime Expire time.
+     * @param partId Partition ID.
+     * @param partCnt Partition counter.
+     * @param mvccVer Mvcc version.
+     * @param cacheObjValCtx cache object value context for unwrapping objects.
+     * @param keepBinary disable unwrapping for non primitive objects, Binary Objects would be returned instead.
+     */
+    public UnwrapMvccDataEntry(
+        final int cacheId,
+        final KeyCacheObject key,
+        final CacheObject val,
+        final GridCacheOperation op,
+        final GridCacheVersion nearXidVer,
+        final GridCacheVersion writeVer,
+        final long expireTime,
+        final int partId,
+        final long partCnt,
+        MvccVersion mvccVer,
+        final CacheObjectValueContext cacheObjValCtx,
+        final boolean keepBinary) {
+        super(cacheId, key, val, op, nearXidVer, writeVer, expireTime, partId, partCnt, mvccVer);
+
+        this.cacheObjValCtx = cacheObjValCtx;
+        this.keepBinary = keepBinary;
+    }
+
+    /**
+     * Unwraps key value from cache key object into primitive boxed type or source class. If client classes were used
+     * in key, call of this method requires classes to be available in classpath.
+     *
+     * @return Key which was placed into cache. Or null if failed to convert.
+     */
+    public Object unwrappedKey() {
+        try {
+            if (keepBinary && key instanceof BinaryObject)
+                return key;
+
+            Object unwrapped = key.value(cacheObjValCtx, false);
+
+            if (unwrapped instanceof BinaryObject) {
+                if (keepBinary)
+                    return unwrapped;
+                unwrapped = ((BinaryObject)unwrapped).deserialize();
+            }
+
+            return unwrapped;
+        }
+        catch (Exception e) {
+            cacheObjValCtx.kernalContext().log(UnwrapMvccDataEntry.class)
+                .error("Unable to convert key [" + key + "]", e);
+
+            return null;
+        }
+    }
+
+    /**
+     * Unwraps value value from cache value object into primitive boxed type or source class. If client classes were
+     * used in key, call of this method requires classes to be available in classpath.
+     *
+     * @return Value which was placed into cache. Or null for delete operation or for failure.
+     */
+    public Object unwrappedValue() {
+        try {
+            if (val == null)
+                return null;
+
+            if (keepBinary && val instanceof BinaryObject)
+                return val;
+
+            return val.value(cacheObjValCtx, false);
+        }
+        catch (Exception e) {
+            cacheObjValCtx.kernalContext().log(UnwrapMvccDataEntry.class)
+                .error("Unable to convert value [" + value() + "]", e);
+            return null;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return getClass().getSimpleName() + "[k = " + unwrappedKey() + ", v = [ "
+            + unwrappedValue()
+            + "], super = ["
+            + super.toString() + "]]";
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a719aa9c/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
index e22adc0..ad8a2a4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
@@ -198,8 +198,14 @@ public abstract class WALRecord {
         /** Encrypted WAL-record. */
         ENCRYPTED_RECORD,
 
-        /** Ecnrypted data record */
-        ENCRYPTED_DATA_RECORD;
+        /** Ecnrypted data record. */
+        ENCRYPTED_DATA_RECORD,
+
+        /** Mvcc data record. */
+        MVCC_DATA_RECORD,
+
+        /** Mvcc Tx state change record. */
+        MVCC_TX_RECORD;
 
         /** */
         private static final RecordType[] VALS = RecordType.values();

http://git-wip-us.apache.org/repos/asf/ignite/blob/a719aa9c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index cc0a78e..fa4cc98 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -42,6 +42,8 @@ import org.apache.ignite.internal.UnregisteredClassException;
 import org.apache.ignite.internal.pagemem.wal.WALPointer;
 import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
 import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
+import org.apache.ignite.internal.pagemem.wal.record.MvccDataEntry;
+import org.apache.ignite.internal.pagemem.wal.record.MvccDataRecord;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheUpdateAtomicResult.UpdateOutcome;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
@@ -1216,7 +1218,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             }
 
             if (cctx.group().persistenceEnabled() && cctx.group().walEnabled()) {
-                logPtr = cctx.shared().wal().log(new DataRecord(new DataEntry(
+                logPtr = cctx.shared().wal().log(new MvccDataRecord(new MvccDataEntry(
                     cctx.cacheId(),
                     key,
                     val,
@@ -1226,7 +1228,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                     newVer,
                     expireTime,
                     key.partition(),
-                    0L)));
+                    0L,
+                    mvccVer)
+                ));
             }
 
             update(val, expireTime, ttl, newVer, true);
@@ -1343,7 +1347,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             }
 
             if (cctx.group().persistenceEnabled() && cctx.group().walEnabled())
-                logPtr = logTxUpdate(tx, null, 0, 0L);
+                logPtr = logMvccUpdate(tx, null, 0, 0L, mvccVer);
 
             update(null, 0, 0, newVer, true);
 
@@ -3433,17 +3437,32 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                     updateCntr = nextPartitionCounter(topVer, true, null);
 
                 if (walEnabled) {
-                    cctx.shared().wal().log(new DataRecord(new DataEntry(
-                        cctx.cacheId(),
-                        key,
-                        val,
-                        val == null ? GridCacheOperation.DELETE : GridCacheOperation.CREATE,
-                        null,
-                        ver,
-                        expireTime,
-                        partition(),
-                        updateCntr
-                    )));
+                    if (cctx.mvccEnabled()) {
+                        cctx.shared().wal().log(new MvccDataRecord(new MvccDataEntry(
+                            cctx.cacheId(),
+                            key,
+                            val,
+                            val == null ? GridCacheOperation.DELETE : GridCacheOperation.CREATE,
+                            null,
+                            ver,
+                            expireTime,
+                            partition(),
+                            updateCntr,
+                            mvccVer
+                        )));
+                    } else {
+                        cctx.shared().wal().log(new DataRecord(new DataEntry(
+                            cctx.cacheId(),
+                            key,
+                            val,
+                            val == null ? GridCacheOperation.DELETE : GridCacheOperation.CREATE,
+                            null,
+                            ver,
+                            expireTime,
+                            partition(),
+                            updateCntr
+                        )));
+                    }
                 }
 
                 drReplicate(drType, val, ver, topVer);
@@ -4297,7 +4316,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
      */
     protected WALPointer logTxUpdate(IgniteInternalTx tx, CacheObject val, long expireTime, long updCntr)
         throws IgniteCheckedException {
-        assert cctx.transactional();
+        assert cctx.transactional() && !cctx.transactionalSnapshot();
 
         if (tx.local()) { // For remote tx we log all updates in batch: GridDistributedTxRemoteAdapter.commitIfLocked()
             GridCacheOperation op;
@@ -4322,6 +4341,43 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
     }
 
     /**
+     * @param tx Transaction.
+     * @param val Value.
+     * @param expireTime Expire time (or 0 if not applicable).     *
+     * @param updCntr Update counter.
+     * @param mvccVer Mvcc version.
+     * @throws IgniteCheckedException In case of log failure.
+     */
+    protected WALPointer logMvccUpdate(IgniteInternalTx tx, CacheObject val, long expireTime, long updCntr,
+        MvccSnapshot mvccVer)
+        throws IgniteCheckedException {
+        assert mvccVer != null;
+        assert cctx.transactionalSnapshot();
+
+        if (tx.local()) { // For remote tx we log all updates in batch: GridDistributedTxRemoteAdapter.commitIfLocked()
+            GridCacheOperation op;
+            if (val == null)
+                op = GridCacheOperation.DELETE;
+            else
+                op = this.val == null ? GridCacheOperation.CREATE : GridCacheOperation.UPDATE;
+
+            return cctx.shared().wal().log(new MvccDataRecord(new MvccDataEntry(
+                cctx.cacheId(),
+                key,
+                val,
+                op,
+                tx.nearXidVersion(),
+                tx.writeVersion(),
+                expireTime,
+                key.partition(),
+                updCntr,
+                mvccVer)));
+        }
+        else
+            return null;
+    }
+
+    /**
      * Removes value from offheap.
      *
      * @throws IgniteCheckedException If failed.
@@ -5159,16 +5215,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                 }
 
                 if (cctx.group().persistenceEnabled() && cctx.group().walEnabled())
-                    logPtr = cctx.shared().wal().log(new DataRecord(new DataEntry(
-                        cctx.cacheId(),
-                        entry.key(),
-                        null,
-                        DELETE,
-                        tx.nearXidVersion(),
-                        tx.writeVersion(),
-                        0,
-                        entry.key().partition(),
-                        0)));
+                    entry.logMvccUpdate(tx, null, 0, 0, mvccVer);
 
                 entry.update(null, 0, 0, newVer, true);
 
@@ -5510,7 +5557,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                 }
 
                 if (cctx.group().persistenceEnabled() && cctx.group().walEnabled())
-                    logPtr = cctx.shared().wal().log(new DataRecord(new DataEntry(
+                    logPtr = cctx.shared().wal().log(new MvccDataRecord(new MvccDataEntry(
                         cctx.cacheId(),
                         entry.key(),
                         val,
@@ -5520,7 +5567,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                         newVer,
                         expireTime,
                         entry.key().partition(),
-                        0L)));
+                        0L,
+                        mvccVer)));
 
                 entry.update(val, expireTime, ttl, newVer, true);
 
@@ -6707,7 +6755,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             GridCacheVersion ver = tx.writeVersion();
 
             if (cctx.group().persistenceEnabled() && cctx.group().walEnabled())
-                logPtr = cctx.shared().wal().log(new DataRecord(new DataEntry(
+                logPtr = cctx.shared().wal().log(new MvccDataRecord(new MvccDataEntry(
                     cctx.cacheId(),
                     key,
                     val,
@@ -6716,7 +6764,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                     ver,
                     CU.EXPIRE_TIME_ETERNAL,
                     key.partition(),
-                    0L)));
+                    0L,
+                    mvccVer)));
 
             update(val, expireTime, ttl, ver, true);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a719aa9c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index f4e5b1b..2e2fa47 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -1498,7 +1498,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
         pluginMgr.validate();
 
-        if (cfg.getAtomicityMode() == TRANSACTIONAL_SNAPSHOT)
+        if (!recoveryMode && cfg.getAtomicityMode() == TRANSACTIONAL_SNAPSHOT)
             sharedCtx.coordinators().ensureStarted();
 
         sharedCtx.jta().registerCache(cfg);
@@ -2306,6 +2306,9 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
         cacheContext.finishRecovery(cacheStartVer, updatedDescriptor.cacheConfiguration().isStatisticsEnabled());
 
+        if (cacheContext.config().getAtomicityMode() == TRANSACTIONAL_SNAPSHOT)
+            sharedCtx.coordinators().ensureStarted();
+
         onKernalStart(cacheContext.cache());
 
         if (log.isInfoEnabled())
@@ -5446,11 +5449,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         /** {@inheritDoc} */
         @Override public void afterBinaryMemoryRestore(IgniteCacheDatabaseSharedManager mgr) throws IgniteCheckedException {
             for (DynamicCacheDescriptor cacheDescriptor : persistentCaches()) {
-                // Skip MVCC caches.
-                // TODO: https://issues.apache.org/jira/browse/IGNITE-10052
-                if (cacheDescriptor.cacheConfiguration().getAtomicityMode() == TRANSACTIONAL_SNAPSHOT)
-                    continue;
-
                 startCacheInRecoveryMode(cacheDescriptor);
 
                 querySchemas.put(cacheDescriptor.cacheId(), cacheDescriptor.schema().copy());

http://git-wip-us.apache.org/repos/asf/ignite/blob/a719aa9c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
index 66d927f..d311708 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
@@ -23,8 +23,8 @@ import javax.cache.Cache;
 import javax.cache.processor.EntryProcessor;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtDemandedPartitionsMap;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccVersion;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
@@ -412,6 +412,25 @@ public interface IgniteCacheOffheapManager {
     /**
      * @param cctx Cache context.
      * @param key Key.
+     * @param val Value.
+     * @param ver Version.
+     * @param expireTime Expire time.
+     * @param part Partition.
+     * @param mvccVer Mvcc version.
+     * @throws IgniteCheckedException If failed.
+     */
+    void mvccApplyUpdate(
+        GridCacheContext cctx,
+        KeyCacheObject key,
+        CacheObject val,
+        GridCacheVersion ver,
+        long expireTime,
+        GridDhtLocalPartition part,
+        MvccVersion mvccVer) throws IgniteCheckedException;
+
+    /**
+     * @param cctx Cache context.
+     * @param key Key.
      * @param partId Partition number.
      * @param part Partition.
      * @throws IgniteCheckedException If failed.
@@ -923,6 +942,24 @@ public interface IgniteCacheOffheapManager {
         public void invoke(GridCacheContext cctx, KeyCacheObject key, OffheapInvokeClosure c) throws IgniteCheckedException;
 
         /**
+         *
+         * @param cctx Cache context.
+         * @param key Key.
+         * @param val Value.
+         * @param ver Version.
+         * @param expireTime Expire time.
+         * @param mvccVer Mvcc version.
+         * @throws IgniteCheckedException
+         */
+        void mvccApplyUpdate(GridCacheContext cctx,
+            KeyCacheObject key,
+            CacheObject val,
+            GridCacheVersion ver,
+            long expireTime,
+            MvccVersion mvccVer
+        ) throws IgniteCheckedException;
+
+        /**
          * @param cctx Cache context.
          * @param key Key.
          * @param partId Partition number.

http://git-wip-us.apache.org/repos/asf/ignite/blob/a719aa9c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index f00c784..5f467b3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -51,6 +51,8 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Ign
 import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException;
 import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshotWithoutTxs;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccUtils;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccVersion;
 import org.apache.ignite.internal.processors.cache.mvcc.txlog.TxState;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
@@ -626,6 +628,24 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
     }
 
     /** {@inheritDoc} */
+    @Override public void mvccApplyUpdate(
+        GridCacheContext cctx,
+        KeyCacheObject key,
+        CacheObject val,
+        GridCacheVersion ver,
+        long expireTime,
+        GridDhtLocalPartition part,
+        MvccVersion mvccVer) throws IgniteCheckedException {
+
+        dataStore(part).mvccApplyUpdate(cctx,
+            key,
+            val,
+            ver,
+            expireTime,
+            mvccVer);
+    }
+
+    /** {@inheritDoc} */
     @Override public void remove(
         GridCacheContext cctx,
         KeyCacheObject key,
@@ -2512,6 +2532,92 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
             }
         }
 
+        /** {@inheritDoc} */
+        @Override public void mvccApplyUpdate(GridCacheContext cctx,
+            KeyCacheObject key,
+            CacheObject val,
+            GridCacheVersion ver,
+            long expireTime,
+            MvccVersion mvccVer
+        ) throws IgniteCheckedException {
+            if (!busyLock.enterBusy())
+                throw new NodeStoppingException("Operation has been cancelled (node is stopping).");
+
+            try {
+                int cacheId = grp.sharedGroup() ? cctx.cacheId() : CU.UNDEFINED_CACHE_ID;
+
+                CacheObjectContext coCtx = cctx.cacheObjectContext();
+
+                // Make sure value bytes initialized.
+                key.valueBytes(coCtx);
+
+                if (val != null)
+                    val.valueBytes(coCtx);
+
+                MvccSnapshotWithoutTxs mvccSnapshot = new MvccSnapshotWithoutTxs(mvccVer.coordinatorVersion(),
+                    mvccVer.counter(), mvccVer.operationCounter(), MvccUtils.MVCC_COUNTER_NA);
+
+                MvccUpdateDataRow updateRow = new MvccUpdateDataRow(
+                    cctx,
+                    key,
+                    val,
+                    ver,
+                    partId,
+                    0L,
+                    mvccSnapshot,
+                    null,
+                    null,
+                    false,
+                    false,
+                    false,
+                    false,
+                    false,
+                    false);
+
+                assert cctx.shared().database().checkpointLockIsHeldByThread();
+
+                dataTree.visit(new MvccMaxSearchRow(cacheId, key), new MvccMinSearchRow(cacheId, key), updateRow);
+
+                ResultType res = updateRow.resultType();
+
+                assert res == ResultType.PREV_NULL || res == ResultType.PREV_NOT_NULL : res;
+
+                if (res == ResultType.PREV_NOT_NULL) {
+                    CacheDataRow oldRow = updateRow.oldRow();
+
+                    assert oldRow != null && oldRow.link() != 0 : oldRow;
+
+                    rowStore.updateDataRow(oldRow.link(), mvccUpdateMarker, mvccSnapshot);
+                }
+
+                if (val != null) {
+                    if (!grp.storeCacheIdInDataPage() && updateRow.cacheId() != CU.UNDEFINED_CACHE_ID) {
+                        updateRow.cacheId(CU.UNDEFINED_CACHE_ID);
+
+                        rowStore.addRow(updateRow);
+
+                        updateRow.cacheId(cctx.cacheId());
+                    }
+                    else
+                        rowStore.addRow(updateRow);
+
+                    boolean old = dataTree.putx(updateRow);
+
+                    assert !old;
+
+                    GridCacheQueryManager qryMgr = cctx.queries();
+
+                    if (qryMgr.enabled())
+                        qryMgr.store(updateRow, null, true);
+
+                    cleanup(cctx, updateRow.cleanupRows());
+                }
+            }
+            finally {
+                busyLock.leaveBusy();
+            }
+        }
+
         /**
          * @param cctx Cache context.
          * @param newRow New row.

http://git-wip-us.apache.org/repos/asf/ignite/blob/a719aa9c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java
index 68669b7..cfa8eb7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java
@@ -36,7 +36,6 @@ import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
-import org.apache.ignite.internal.pagemem.wal.WALPointer;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheEntryInfoCollection;
 import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
@@ -181,9 +180,6 @@ public abstract class GridDhtTxAbstractEnlistFuture<T> extends GridCacheFutureAd
     /** Batches already sent to remotes, but their acks are not received yet. */
     private ConcurrentMap<UUID, ConcurrentMap<Integer, Batch>> pending;
 
-    /** */
-    private WALPointer walPtr;
-
     /** Do not send DHT requests to near node. */
     protected boolean skipNearNodeUpdates;
 
@@ -531,12 +527,6 @@ public abstract class GridDhtTxAbstractEnlistFuture<T> extends GridCacheFutureAd
                 }
 
                 if (!hasNext0()) {
-                    if (walPtr != null && !cctx.tm().logTxRecords()) {
-                        cctx.shared().wal().flush(walPtr, true);
-
-                        walPtr = null; // Avoid additional flushing.
-                    }
-
                     if (!F.isEmpty(batches)) {
                         // Flush incomplete batches.
                         // Need to skip batches for nodes where first request (contains tx info) is still in-flight.
@@ -629,11 +619,6 @@ public abstract class GridDhtTxAbstractEnlistFuture<T> extends GridCacheFutureAd
 
         assert updRes != null && updRes.updateFuture() == null;
 
-        WALPointer ptr0 = updRes.loggedPointer();
-
-        if (ptr0 != null)
-            walPtr = ptr0;
-
         onEntryProcessed(entry.key(), updRes);
 
         if (!updRes.success())

http://git-wip-us.apache.org/repos/asf/ignite/blob/a719aa9c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java
index 07fbb9b..851a5ea 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java
@@ -93,6 +93,7 @@ import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.lang.GridClosureException;
 import org.apache.ignite.internal.util.lang.GridCursor;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -288,10 +289,11 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
 
     /** {@inheritDoc} */
     @Override public void ensureStarted() throws IgniteCheckedException {
-        if (!ctx.clientNode() && txLog == null) {
+        if (!ctx.clientNode()) {
             assert mvccEnabled && mvccSupported;
 
-            txLog = new TxLog(ctx, ctx.cache().context().database());
+            if (txLog == null)
+                txLog = new TxLog(ctx, ctx.cache().context().database());
 
             startVacuumWorkers();
 
@@ -327,6 +329,15 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
     /** {@inheritDoc} */
     @Override public void beforeBinaryMemoryRestore(IgniteCacheDatabaseSharedManager mgr) throws IgniteCheckedException {
         txLogPageStoreInit(mgr);
+
+        boolean hasMvccCaches = ctx.cache().persistentCaches().stream()
+            .anyMatch(c -> c.cacheConfiguration().getAtomicityMode() == TRANSACTIONAL_SNAPSHOT);
+
+        if (hasMvccCaches) {
+            txLog = new TxLog(ctx, mgr);
+
+            mvccEnabled = true;
+        }
     }
 
     /**
@@ -1260,15 +1271,14 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
 
                                         if (log.isDebugEnabled())
                                             log.debug("Vacuum completed. " + metrics);
-                                    }
-                                    catch (NodeStoppingException ignored) {
-                                        if (log.isDebugEnabled())
-                                            log.debug("Cannot complete vacuum (node is stopping).");
-
-                                        metrics = new VacuumMetrics();
-                                    }
-                                    catch (Throwable e) {
-                                        ex = new GridClosureException(e);
+                                    } catch (Throwable e) {
+                                        if (X.hasCause(e, NodeStoppingException.class)) {
+                                            if (log.isDebugEnabled())
+                                                log.debug("Cannot complete vacuum (node is stopping).");
+
+                                            metrics = new VacuumMetrics();
+                                        } else
+                                            ex = new GridClosureException(e);
                                     }
 
                                     res.onDone(metrics, ex);
@@ -2399,27 +2409,27 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
                     entry = cctx.cache().entryEx(key);
                 }
 
-                cctx.shared().database().checkpointReadLock();
-
                 int cleaned = 0;
 
                 try {
-                    if (cleanupRows != null)
-                        cleaned = part.dataStore().cleanup(cctx, cleanupRows);
+                    cctx.shared().database().checkpointReadLock();
 
-                    if (rest != null) {
-                        if (rest.getClass() == ArrayList.class) {
-                            for (MvccDataRow row : ((List<MvccDataRow>)rest)) {
-                                part.dataStore().updateTxState(cctx, row);
-                            }
+                    try {
+                        if (cleanupRows != null)
+                            cleaned = part.dataStore().cleanup(cctx, cleanupRows);
+
+                        if (rest != null) {
+                            if (rest.getClass() == ArrayList.class) {
+                                for (MvccDataRow row : ((List<MvccDataRow>) rest)) {
+                                    part.dataStore().updateTxState(cctx, row);
+                                }
+                            } else
+                                part.dataStore().updateTxState(cctx, (MvccDataRow) rest);
                         }
-                        else
-                            part.dataStore().updateTxState(cctx, (MvccDataRow)rest);
+                    } finally {
+                        cctx.shared().database().checkpointReadUnlock();
                     }
-                }
-                finally {
-                    cctx.shared().database().checkpointReadUnlock();
-
+                } finally {
                     entry.unlockEntry();
                     cctx.evicts().touch(entry, AffinityTopologyVersion.NONE);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a719aa9c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java
index 972d4d9..f29e23f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java
@@ -184,7 +184,14 @@ public class MvccUtils {
         if ((mvccOpCntr & MVCC_HINTS_MASK) != 0)
             return (byte)(mvccOpCntr >>> MVCC_HINTS_BIT_OFF);
 
-        return proc.state(mvccCrd, mvccCntr);
+        byte state = proc.state(mvccCrd, mvccCntr);
+
+        if ((state == TxState.NA || state == TxState.PREPARED)
+            && (proc.currentCoordinator() == null // Recovery from WAL.
+            || mvccCrd < proc.currentCoordinator().coordinatorVersion()))
+            state = TxState.ABORTED;
+
+        return state;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/a719aa9c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/txlog/TxLog.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/txlog/TxLog.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/txlog/TxLog.java
index 03eb659..ca3053c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/txlog/TxLog.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/txlog/TxLog.java
@@ -236,18 +236,13 @@ public class TxLog implements DbCheckpointListener {
      * @throws IgniteCheckedException If failed.
      */
     public void put(TxKey key, byte state, boolean primary) throws IgniteCheckedException {
+        assert mgr.checkpointLockIsHeldByThread();
+
         Sync sync = syncObject(key);
 
         try {
-            mgr.checkpointReadLock();
-
-            try {
-                synchronized (sync) {
-                    tree.invoke(key, null, new TxLogUpdateClosure(key.major(), key.minor(), state, primary));
-                }
-            }
-            finally {
-                mgr.checkpointReadUnlock();
+            synchronized (sync) {
+                tree.invoke(key, null, new TxLogUpdateClosure(key.major(), key.minor(), state, primary));
             }
         } finally {
             evict(key, sync);
@@ -267,8 +262,14 @@ public class TxLog implements DbCheckpointListener {
         tree.iterate(LOWEST, clo, clo);
 
         if (clo.rows != null) {
-            for (TxKey row : clo.rows) {
-                remove(row);
+            mgr.checkpointReadLock();
+
+            try {
+                for (TxKey row : clo.rows)
+                    remove(row);
+            }
+            finally {
+                mgr.checkpointReadUnlock();
             }
         }
     }
@@ -278,17 +279,11 @@ public class TxLog implements DbCheckpointListener {
         Sync sync = syncObject(key);
 
         try {
-            mgr.checkpointReadLock();
-
-            try {
-                synchronized (sync) {
-                    tree.removex(key);
-                }
+            synchronized (sync) {
+                tree.removex(key);
             }
-            finally {
-                mgr.checkpointReadUnlock();
-            }
-        } finally {
+        }
+        finally {
             evict(key, sync);
         }
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/a719aa9c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index 94ac100..393e137 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -95,6 +95,8 @@ import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
 import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
 import org.apache.ignite.internal.pagemem.wal.record.MemoryRecoveryRecord;
 import org.apache.ignite.internal.pagemem.wal.record.MetastoreDataRecord;
+import org.apache.ignite.internal.pagemem.wal.record.MvccDataEntry;
+import org.apache.ignite.internal.pagemem.wal.record.MvccTxRecord;
 import org.apache.ignite.internal.pagemem.wal.record.PageSnapshot;
 import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
 import org.apache.ignite.internal.pagemem.wal.record.WalRecordCacheGroupAware;
@@ -112,6 +114,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri
 import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
 import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
 import org.apache.ignite.internal.processors.cache.mvcc.txlog.TxLog;
+import org.apache.ignite.internal.processors.cache.mvcc.txlog.TxState;
 import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointEntry;
 import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointEntryType;
 import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointHistory;
@@ -157,6 +160,7 @@ import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.mxbean.DataStorageMetricsMXBean;
 import org.apache.ignite.thread.IgniteThread;
 import org.apache.ignite.thread.IgniteThreadPoolExecutor;
+import org.apache.ignite.transactions.TransactionState;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentLinkedHashMap;
@@ -705,7 +709,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
 
                 metaStorage = createMetastorage(true);
 
-                applyLogicalUpdates(status, g -> MetaStorage.METASTORAGE_CACHE_ID == g, false);
+                applyLogicalUpdates(status, g -> MetaStorage.METASTORAGE_CACHE_ID == g, false, true);
 
                 fillWalDisabledGroups();
 
@@ -2006,7 +2010,8 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
             RestoreLogicalState logicalState = applyLogicalUpdates(
                     status,
                     g -> !initiallyGlobalWalDisabledGrps.contains(g) && !initiallyLocalWalDisabledGrps.contains(g),
-                    true
+                    true,
+                    false
             );
 
             // Restore state for all groups.
@@ -2325,6 +2330,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
                         break;
 
                     switch (rec.type()) {
+                        case MVCC_DATA_RECORD:
                         case DATA_RECORD:
                             checkpointReadLock();
 
@@ -2361,6 +2367,20 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
 
                             break;
 
+                        case MVCC_TX_RECORD:
+                            try {
+                                MvccTxRecord txRecord = (MvccTxRecord)rec;
+
+                                byte txState = convertToTxState(txRecord.state());
+
+                                cctx.coordinators().updateState(txRecord.mvccVersion(), txState, false);
+                            }
+                            catch (IgniteCheckedException e) {
+                                throw new IgniteException(e);
+                            }
+
+                            break;
+
                         default:
                             // Skip other records.
                     }
@@ -2389,7 +2409,8 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
     private RestoreLogicalState applyLogicalUpdates(
         CheckpointStatus status,
         Predicate<Integer> cacheGroupsPredicate,
-        boolean skipFieldLookup
+        boolean skipFieldLookup,
+        boolean metaStoreOnly
     ) throws IgniteCheckedException {
         if (log.isInfoEnabled())
             log.info("Applying lost cache updates since last checkpoint record [lastMarked="
@@ -2414,6 +2435,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
                     break;
 
                 switch (rec.type()) {
+                    case MVCC_DATA_RECORD:
                     case DATA_RECORD:
                         DataRecord dataRec = (DataRecord)rec;
 
@@ -2485,6 +2507,18 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
 
                         break;
 
+                    case MVCC_TX_RECORD:
+                        if (metaStoreOnly)
+                            continue;
+
+                        MvccTxRecord txRecord = (MvccTxRecord)rec;
+
+                        byte txState = convertToTxState(txRecord.state());
+
+                        cctx.coordinators().updateState(txRecord.mvccVersion(), txState, false);
+
+                        break;
+
                     default:
                         // Skip other records.
                 }
@@ -2503,6 +2537,28 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
     }
 
     /**
+     * Convert {@link TransactionState} to Mvcc {@link TxState}.
+     *
+     * @param state TransactionState.
+     * @return TxState.
+     */
+    private byte convertToTxState(TransactionState state) {
+        switch (state) {
+            case PREPARED:
+                return TxState.PREPARED;
+
+            case COMMITTED:
+                return TxState.COMMITTED;
+
+            case ROLLED_BACK:
+                return TxState.ABORTED;
+
+            default:
+                throw new IllegalStateException("Unsupported TxState.");
+        }
+    }
+
+    /**
      * Wal truncate callBack.
      *
      * @param highBound WALPointer.
@@ -2530,14 +2586,26 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
         switch (dataEntry.op()) {
             case CREATE:
             case UPDATE:
-                cacheCtx.offheap().update(
-                    cacheCtx,
-                    dataEntry.key(),
-                    dataEntry.value(),
-                    dataEntry.writeVersion(),
-                    0L,
-                    locPart,
-                    null);
+                if (dataEntry instanceof MvccDataEntry) {
+                    cacheCtx.offheap().mvccApplyUpdate(
+                        cacheCtx,
+                        dataEntry.key(),
+                        dataEntry.value(),
+                        dataEntry.writeVersion(),
+                        0L,
+                        locPart,
+                        ((MvccDataEntry)dataEntry).mvccVer());
+                }
+                else {
+                    cacheCtx.offheap().update(
+                        cacheCtx,
+                        dataEntry.key(),
+                        dataEntry.value(),
+                        dataEntry.writeVersion(),
+                        0L,
+                        locPart,
+                        null);
+                }
 
                 if (dataEntry.partitionCounter() != 0)
                     cacheCtx.offheap().onPartitionInitialCounterUpdated(partId, dataEntry.partitionCounter());
@@ -2545,7 +2613,18 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
                 break;
 
             case DELETE:
-                cacheCtx.offheap().remove(cacheCtx, dataEntry.key(), partId, locPart);
+                if (dataEntry instanceof MvccDataEntry) {
+                    cacheCtx.offheap().mvccApplyUpdate(
+                        cacheCtx,
+                        dataEntry.key(),
+                        null,
+                        dataEntry.writeVersion(),
+                        0L,
+                        locPart,
+                        ((MvccDataEntry)dataEntry).mvccVer());
+                }
+                else
+                    cacheCtx.offheap().remove(cacheCtx, dataEntry.key(), partId, locPart);
 
                 if (dataEntry.partitionCounter() != 0)
                     cacheCtx.offheap().onPartitionInitialCounterUpdated(partId, dataEntry.partitionCounter());
@@ -4669,6 +4748,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
 
                     switch (rec.type()) {
                         case METASTORE_DATA_RECORD:
+                        case MVCC_DATA_RECORD:
                         case DATA_RECORD:
                             if (skipDataRecords)
                                 continue;

http://git-wip-us.apache.org/repos/asf/ignite/blob/a719aa9c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index 5f6511d..48e86e4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -1929,6 +1929,14 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
         }
 
         /** {@inheritDoc} */
+        @Override public void mvccApplyUpdate(GridCacheContext cctx, KeyCacheObject key, CacheObject val, GridCacheVersion ver,
+            long expireTime, MvccVersion mvccVer) throws IgniteCheckedException {
+            CacheDataStore delegate = init0(false);
+
+            delegate.mvccApplyUpdate(cctx, key, val, ver, expireTime, mvccVer);
+        }
+
+        /** {@inheritDoc} */
         @Override public CacheDataRow createRow(
             GridCacheContext cctx,
             KeyCacheObject key,

http://git-wip-us.apache.org/repos/asf/ignite/blob/a719aa9c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
index be5f55a..e6191dd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
@@ -31,7 +31,10 @@ import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
 import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
 import org.apache.ignite.internal.pagemem.wal.record.FilteredRecord;
 import org.apache.ignite.internal.pagemem.wal.record.LazyDataEntry;
+import org.apache.ignite.internal.pagemem.wal.record.MvccDataEntry;
+import org.apache.ignite.internal.pagemem.wal.record.MvccDataRecord;
 import org.apache.ignite.internal.pagemem.wal.record.UnwrapDataEntry;
+import org.apache.ignite.internal.pagemem.wal.record.UnwrapMvccDataEntry;
 import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
 import org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType;
 import org.apache.ignite.internal.processors.cache.CacheObject;
@@ -43,13 +46,13 @@ import org.apache.ignite.internal.processors.cache.persistence.wal.AbstractWalRe
 import org.apache.ignite.internal.processors.cache.persistence.wal.FileDescriptor;
 import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer;
 import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.ReadFileHandle;
-import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordDataV1Serializer.EncryptedDataEntry;
 import org.apache.ignite.internal.processors.cache.persistence.wal.WalSegmentTailReachedException;
+import org.apache.ignite.internal.processors.cache.persistence.wal.crc.IgniteDataIntegrityViolationException;
 import org.apache.ignite.internal.processors.cache.persistence.wal.io.FileInput;
 import org.apache.ignite.internal.processors.cache.persistence.wal.io.SegmentFileInputFactory;
 import org.apache.ignite.internal.processors.cache.persistence.wal.io.SegmentIO;
 import org.apache.ignite.internal.processors.cache.persistence.wal.io.SimpleSegmentFileInputFactory;
-import org.apache.ignite.internal.processors.cache.persistence.wal.crc.IgniteDataIntegrityViolationException;
+import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordDataV1Serializer.EncryptedDataEntry;
 import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializer;
 import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactoryImpl;
 import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.SegmentHeader;
@@ -350,11 +353,11 @@ class StandaloneWalRecordsIterator extends AbstractWalRecordsIterator {
     }
 
     /** {@inheritDoc} */
-    @NotNull @Override protected WALRecord postProcessRecord(@NotNull final WALRecord rec) {
+    @Override protected @NotNull WALRecord postProcessRecord(@NotNull final WALRecord rec) {
         GridKernalContext kernalCtx = sharedCtx.kernalContext();
         IgniteCacheObjectProcessor processor = kernalCtx.cacheObjects();
 
-        if (processor != null && rec.type() == RecordType.DATA_RECORD) {
+        if (processor != null && (rec.type() == RecordType.DATA_RECORD || rec.type() == RecordType.MVCC_DATA_RECORD)) {
             try {
                 return postProcessDataRecord((DataRecord)rec, kernalCtx, processor);
             }
@@ -409,7 +412,9 @@ class StandaloneWalRecordsIterator extends AbstractWalRecordsIterator {
             postProcessedEntries.add(postProcessedEntry);
         }
 
-        DataRecord res = new DataRecord(postProcessedEntries, dataRec.timestamp());
+        DataRecord res = dataRec instanceof MvccDataRecord ?
+            new MvccDataRecord(postProcessedEntries, dataRec.timestamp()) :
+            new DataRecord(postProcessedEntries, dataRec.timestamp());
 
         res.size(dataRec.size());
         res.position(dataRec.position());
@@ -426,7 +431,7 @@ class StandaloneWalRecordsIterator extends AbstractWalRecordsIterator {
      * @return post precessed entry
      * @throws IgniteCheckedException if failed
      */
-    @NotNull private DataEntry postProcessDataEntry(
+    private @NotNull DataEntry postProcessDataEntry(
         final IgniteCacheObjectProcessor processor,
         final CacheObjectContext fakeCacheObjCtx,
         final DataEntry dataEntry) throws IgniteCheckedException {
@@ -457,18 +462,47 @@ class StandaloneWalRecordsIterator extends AbstractWalRecordsIterator {
             val = dataEntry.value();
         }
 
-        return new UnwrapDataEntry(
-            dataEntry.cacheId(),
-            key,
-            val,
-            dataEntry.op(),
-            dataEntry.nearXidVersion(),
-            dataEntry.writeVersion(),
-            dataEntry.expireTime(),
-            dataEntry.partitionId(),
-            dataEntry.partitionCounter(),
-            fakeCacheObjCtx,
-            keepBinary || marshallerMappingFileStoreDir == null);
+        return unwrapDataEntry(fakeCacheObjCtx, dataEntry, key, val, marshallerMappingFileStoreDir);
+    }
+
+    /**
+     * Unwrap data entry.
+     * @param coCtx CacheObject context.
+     * @param dataEntry Data entry.
+     * @param key Entry key.
+     * @param val Entry value.
+     * @param marshallerMappingFileStoreDir Marshaller directory.
+     * @return Unwrapped entry.
+     */
+    private @NotNull DataEntry unwrapDataEntry(CacheObjectContext coCtx, DataEntry dataEntry,
+        KeyCacheObject key, CacheObject val, File marshallerMappingFileStoreDir) {
+        if (dataEntry instanceof MvccDataEntry)
+            return new UnwrapMvccDataEntry(
+                dataEntry.cacheId(),
+                key,
+                val,
+                dataEntry.op(),
+                dataEntry.nearXidVersion(),
+                dataEntry.writeVersion(),
+                dataEntry.expireTime(),
+                dataEntry.partitionId(),
+                dataEntry.partitionCounter(),
+                ((MvccDataEntry)dataEntry).mvccVer(),
+                coCtx,
+                keepBinary || marshallerMappingFileStoreDir == null);
+        else
+            return new UnwrapDataEntry(
+                dataEntry.cacheId(),
+                key,
+                val,
+                dataEntry.op(),
+                dataEntry.nearXidVersion(),
+                dataEntry.writeVersion(),
+                dataEntry.expireTime(),
+                dataEntry.partitionId(),
+                dataEntry.partitionCounter(),
+                coCtx,
+                keepBinary || marshallerMappingFileStoreDir == null);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/a719aa9c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
index 0ad92e3..a097361 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
@@ -29,7 +29,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.spi.encryption.EncryptionSpi;
 import org.apache.ignite.internal.pagemem.FullPageId;
 import org.apache.ignite.internal.pagemem.wal.record.CacheState;
 import org.apache.ignite.internal.pagemem.wal.record.CheckpointRecord;
@@ -46,6 +45,7 @@ import org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType;
 import org.apache.ignite.internal.pagemem.wal.record.WalRecordCacheGroupAware;
 import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageInsertFragmentRecord;
 import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageInsertRecord;
+import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageMvccMarkUpdatedRecord;
 import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageMvccUpdateNewTxStateHintRecord;
 import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageMvccUpdateTxStateHintRecord;
 import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageRemoveRecord;
@@ -68,7 +68,6 @@ import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageUpdateLastSuc
 import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageUpdateLastSuccessfulSnapshotId;
 import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageUpdateNextSnapshotId;
 import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageUpdatePartitionDataRecord;
-import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageMvccMarkUpdatedRecord;
 import org.apache.ignite.internal.pagemem.wal.record.delta.NewRootInitRecord;
 import org.apache.ignite.internal.pagemem.wal.record.delta.PageListMetaResetCountRecord;
 import org.apache.ignite.internal.pagemem.wal.record.delta.PagesListAddPageRecord;
@@ -106,6 +105,7 @@ import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.T3;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.encryption.EncryptionSpi;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.DATA_RECORD;
@@ -122,8 +122,8 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
     /** Length of HEADER record data. */
     static final int HEADER_RECORD_DATA_SIZE = /*Magic*/8 + /*Version*/4;
 
-    /** Cache shared context */
-    private final GridCacheSharedContext cctx;
+    /** Cache shared context. */
+    protected final GridCacheSharedContext cctx;
 
     /** Size of page used for PageMemory regions. */
     private final int pageSize;
@@ -131,8 +131,8 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
     /** Size of page without encryption overhead. */
     private final int realPageSize;
 
-    /** Cache object processor to reading {@link DataEntry DataEntries} */
-    private final IgniteCacheObjectProcessor co;
+    /** Cache object processor to reading {@link DataEntry DataEntries}. */
+    protected final IgniteCacheObjectProcessor co;
 
     /** Serializer of {@link TxRecord} records. */
     private TxRecordSerializer txRecordSerializer;
@@ -1117,7 +1117,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
                 throw new EOFException("END OF SEGMENT");
 
             case TX_RECORD:
-                res = txRecordSerializer.read(in);
+                res = txRecordSerializer.readTx(in);
 
                 break;
 
@@ -1789,7 +1789,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
      * @param ver Version to write.
      * @param allowNull Is {@code null}version allowed.
      */
-    private static void putVersion(ByteBuffer buf, GridCacheVersion ver, boolean allowNull) {
+    static void putVersion(ByteBuffer buf, GridCacheVersion ver, boolean allowNull) {
         CacheVersionIO.write(buf, ver, allowNull);
     }
 
@@ -1971,7 +1971,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
      * @param allowNull Is {@code null}version allowed.
      * @return Read cache version.
      */
-    private GridCacheVersion readVersion(ByteBufferBackedDataInput in, boolean allowNull) throws IOException {
+    GridCacheVersion readVersion(ByteBufferBackedDataInput in, boolean allowNull) throws IOException {
         // To be able to read serialization protocol version.
         in.ensure(1);
 
@@ -1992,7 +1992,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
      * @return Full data record size.
      * @throws IgniteCheckedException If failed to obtain the length of one of the entries.
      */
-    private int dataSize(DataRecord dataRec) throws IgniteCheckedException {
+    protected int dataSize(DataRecord dataRec) throws IgniteCheckedException {
         boolean encrypted = isDataRecordEncrypted(dataRec);
 
         int sz = 0;
@@ -2018,7 +2018,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
      * @return Entry size.
      * @throws IgniteCheckedException If failed to get key or value bytes length.
      */
-    private int entrySize(DataEntry entry) throws IgniteCheckedException {
+    protected int entrySize(DataEntry entry) throws IgniteCheckedException {
         GridCacheContext cctx = this.cctx.cacheContext(entry.cacheId());
         CacheObjectContext coCtx = cctx.cacheObjectContext();
 
@@ -2058,7 +2058,11 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
         return size;
     }
 
+    /**
+     * Represents encrypted Data Entry ({@link #key}, {@link #val value}) pair.
+     */
     public static class EncryptedDataEntry extends DataEntry {
+        /** Constructor. */
         EncryptedDataEntry() {
             super(0, null, null, READ, null, null, 0, 0, 0);
         }