You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2018/08/30 12:24:33 UTC

[27/45] ignite git commit: IGNITE-4191: MVCC and transactional SQL support. Joint multi-man-years efforts of Semen Boikov, Igor Seliverstov, Alexander Paschenko, Igor Sapego, Sergey Kalashnikov, Roman Kondakov, Pavel Kuznetsov, Ivan Pavlukhin, Andrey Mas

http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/VacuumMetricsReducer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/VacuumMetricsReducer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/VacuumMetricsReducer.java
new file mode 100644
index 0000000..c952a48
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/VacuumMetricsReducer.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import org.apache.ignite.lang.IgniteReducer;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Vacuum metrics reducer.
+ */
+public class VacuumMetricsReducer implements IgniteReducer<VacuumMetrics, VacuumMetrics> {
+    /** */
+    private static final long serialVersionUID = 7063457745963917386L;
+
+    /** */
+    private final VacuumMetrics m = new VacuumMetrics();
+
+    /** {@inheritDoc} */
+    @Override public boolean collect(@Nullable VacuumMetrics metrics) {
+        assert metrics != null;
+
+        m.addCleanupRowsCnt(metrics.cleanupRowsCount());
+        m.addScannedRowsCount(metrics.scannedRowsCount());
+        m.addSearchNanoTime(metrics.searchNanoTime());
+        m.addCleanupNanoTime(metrics.cleanupNanoTime());
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public VacuumMetrics reduce() {
+        return m;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/VacuumTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/VacuumTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/VacuumTask.java
new file mode 100644
index 0000000..9a0d9e2
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/VacuumTask.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * Task for cleaning sing partition.
+ */
+public class VacuumTask extends GridFutureAdapter<VacuumMetrics> {
+    /** */
+    private final MvccSnapshot snapshot;
+
+    /** */
+    @GridToStringExclude
+    private final GridDhtLocalPartition part;
+
+    /**
+     * @param snapshot Snapshot.
+     * @param part Partition to cleanup.
+     */
+    VacuumTask(MvccSnapshot snapshot, GridDhtLocalPartition part) {
+        this.snapshot = snapshot;
+        this.part = part;
+    }
+
+    /**
+     * @return Snapshot.
+     */
+    public MvccSnapshot snapshot() {
+        return snapshot;
+    }
+
+    /**
+     * @return Partition to cleanup.
+     */
+    public GridDhtLocalPartition part() {
+        return part;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(VacuumTask.class, this, "partId", part.id());
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccAckRequestQueryCntr.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccAckRequestQueryCntr.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccAckRequestQueryCntr.java
new file mode 100644
index 0000000..0156c53
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccAckRequestQueryCntr.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc.msg;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ *
+ */
+public class MvccAckRequestQueryCntr implements MvccMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private long cntr;
+
+    /**
+     * Required by {@link GridIoMessageFactory}.
+     */
+    public MvccAckRequestQueryCntr() {
+        // No-op.
+    }
+
+    /**
+     * @param cntr Query counter.
+     */
+    public MvccAckRequestQueryCntr(long cntr) {
+        this.cntr = cntr;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean waitForCoordinatorInit() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean processedFromNioThread() {
+        return true;
+    }
+
+    /**
+     * @return Counter.
+     */
+    public long counter() {
+        return cntr;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeLong("cntr", cntr))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                cntr = reader.readLong("cntr");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(MvccAckRequestQueryCntr.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return 140;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 1;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onAckReceived() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(MvccAckRequestQueryCntr.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccAckRequestQueryId.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccAckRequestQueryId.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccAckRequestQueryId.java
new file mode 100644
index 0000000..7771f4d
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccAckRequestQueryId.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc.msg;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ *
+ */
+public class MvccAckRequestQueryId implements MvccMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private long qryTrackerId;
+
+    /**
+     * Required by {@link GridIoMessageFactory}.
+     */
+    public MvccAckRequestQueryId() {
+        // No-op.
+    }
+
+    /**
+     * @param qryTrackerId Query tracker Id.
+     */
+    public MvccAckRequestQueryId(long qryTrackerId) {
+        this.qryTrackerId = qryTrackerId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean waitForCoordinatorInit() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean processedFromNioThread() {
+        return true;
+    }
+
+    /**
+     * @return Query tracker id.
+     */
+    public long queryTrackerId() {
+        return qryTrackerId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeLong("qryTrackerId", qryTrackerId))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                qryTrackerId = reader.readLong("qryTrackerId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(MvccAckRequestQueryId.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return 145;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 1;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onAckReceived() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(MvccAckRequestQueryId.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccAckRequestTx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccAckRequestTx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccAckRequestTx.java
new file mode 100644
index 0000000..69dfd25
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccAckRequestTx.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc.msg;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+import static org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker.MVCC_TRACKER_ID_NA;
+import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.MVCC_COUNTER_NA;
+
+/**
+ *
+ */
+public class MvccAckRequestTx implements MvccMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private static final int SKIP_RESPONSE_FLAG_MASK = 0x01;
+
+    /** */
+    private long futId;
+
+    /** */
+    private long txCntr;
+
+    /** */
+    private byte flags;
+
+    /**
+     * Required by {@link GridIoMessageFactory}.
+     */
+    public MvccAckRequestTx() {
+        // No-op.
+    }
+
+    /**
+     * @param futId Future ID.
+     * @param txCntr Counter assigned to transaction.
+     */
+    public MvccAckRequestTx(long futId, long txCntr) {
+        this.futId = futId;
+        this.txCntr = txCntr;
+    }
+
+    /**
+     * @return Query counter.
+     */
+    public long queryCounter() {
+        return MVCC_COUNTER_NA;
+    }
+
+    /**
+     * @return Query tracker id.
+     */
+    public long queryTrackerId() {
+        return MVCC_TRACKER_ID_NA;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean waitForCoordinatorInit() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean processedFromNioThread() {
+        return true;
+    }
+
+    /**
+     * @return Future ID.
+     */
+    public long futureId() {
+        return futId;
+    }
+
+    /**
+     * @return {@code True} if response message is not needed.
+     */
+    public boolean skipResponse() {
+        return (flags & SKIP_RESPONSE_FLAG_MASK) != 0;
+    }
+
+    /**
+     * @param val {@code True} if response message is not needed.
+     */
+    public void skipResponse(boolean val) {
+        if (val)
+            flags |= SKIP_RESPONSE_FLAG_MASK;
+        else
+            flags &= ~SKIP_RESPONSE_FLAG_MASK;
+    }
+
+    /**
+     * @return Counter assigned tp transaction.
+     */
+    public long txCounter() {
+        return txCntr;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeByte("flags", flags))
+                    return false;
+
+                writer.incrementState();
+
+            case 1:
+                if (!writer.writeLong("futId", futId))
+                    return false;
+
+                writer.incrementState();
+
+            case 2:
+                if (!writer.writeLong("txCntr", txCntr))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                flags = reader.readByte("flags");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 1:
+                futId = reader.readLong("futId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 2:
+                txCntr = reader.readLong("txCntr");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(MvccAckRequestTx.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return 137;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 3;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onAckReceived() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(MvccAckRequestTx.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccAckRequestTxAndQueryCntr.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccAckRequestTxAndQueryCntr.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccAckRequestTxAndQueryCntr.java
new file mode 100644
index 0000000..99761c3
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccAckRequestTxAndQueryCntr.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc.msg;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ *
+ */
+public class MvccAckRequestTxAndQueryCntr extends MvccAckRequestTx {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private long qryCntr;
+
+    /**
+     * Required by {@link GridIoMessageFactory}.
+     */
+    public MvccAckRequestTxAndQueryCntr() {
+        // No-op.
+    }
+
+    /**
+     * @param futId Future ID.
+     * @param txCntr Counter assigned to transaction update.
+     * @param qryCntr Counter assigned for transaction reads.
+     */
+    public MvccAckRequestTxAndQueryCntr(long futId, long txCntr, long qryCntr) {
+        super(futId, txCntr);
+
+        this.qryCntr = qryCntr;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long queryCounter() {
+        return qryCntr;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!super.writeTo(buf, writer))
+            return false;
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 3:
+                if (!writer.writeLong("qryCntr", qryCntr))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        if (!super.readFrom(buf, reader))
+            return false;
+
+        switch (reader.state()) {
+            case 3:
+                qryCntr = reader.readLong("qryCntr");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(MvccAckRequestTxAndQueryCntr.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return 146;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 4;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(MvccAckRequestTxAndQueryCntr.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccAckRequestTxAndQueryId.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccAckRequestTxAndQueryId.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccAckRequestTxAndQueryId.java
new file mode 100644
index 0000000..89f09db
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccAckRequestTxAndQueryId.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc.msg;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ *
+ */
+public class MvccAckRequestTxAndQueryId extends MvccAckRequestTx {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private long qryTrackerId;
+
+    /**
+     * Required by {@link GridIoMessageFactory}.
+     */
+    public MvccAckRequestTxAndQueryId() {
+        // No-op.
+    }
+
+    /**
+     * @param futId Future ID.
+     * @param txCntr Counter assigned to transaction update.
+     * @param qryTrackerId Query tracker id.
+     */
+    public MvccAckRequestTxAndQueryId(long futId, long txCntr, long qryTrackerId) {
+        super(futId, txCntr);
+
+        this.qryTrackerId = qryTrackerId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long queryTrackerId() {
+        return qryTrackerId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!super.writeTo(buf, writer))
+            return false;
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 3:
+                if (!writer.writeLong("qryTrackerId", qryTrackerId))
+                    return false;
+
+                writer.incrementState();
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        if (!super.readFrom(buf, reader))
+            return false;
+
+        switch (reader.state()) {
+            case 3:
+                qryTrackerId = reader.readLong("qryTrackerId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(MvccAckRequestTxAndQueryId.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return 147;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 4;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(MvccAckRequestTxAndQueryId.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccActiveQueriesMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccActiveQueriesMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccActiveQueriesMessage.java
new file mode 100644
index 0000000..4b78c24
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccActiveQueriesMessage.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc.msg;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.util.GridLongList;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public class MvccActiveQueriesMessage implements MvccMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private GridLongList activeQrys;
+
+    /**
+     * Required by {@link GridIoMessageFactory}.
+     */
+    public MvccActiveQueriesMessage() {
+        // No-op.
+    }
+
+    /**
+     * @param activeQrys Active queries.
+     */
+    public MvccActiveQueriesMessage(GridLongList activeQrys) {
+        this.activeQrys = activeQrys;
+    }
+
+    /**
+     * @return Active queries.
+     */
+    @Nullable public GridLongList activeQueries() {
+        return activeQrys;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean waitForCoordinatorInit() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean processedFromNioThread() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeMessage("activeQrys", activeQrys))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                activeQrys = reader.readMessage("activeQrys");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(MvccActiveQueriesMessage.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return 149;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 1;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onAckReceived() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(MvccActiveQueriesMessage.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccFutureResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccFutureResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccFutureResponse.java
new file mode 100644
index 0000000..72e4c52
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccFutureResponse.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc.msg;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ *
+ */
+public class MvccFutureResponse implements MvccMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private long futId;
+
+    /**
+     * Required by {@link GridIoMessageFactory}.
+     */
+    public MvccFutureResponse() {
+        // No-op.
+    }
+
+    /**
+     * @param futId Future ID.
+     */
+    public MvccFutureResponse(long futId) {
+        this.futId = futId;
+    }
+
+    /**
+     * @return Future ID.
+     */
+    public long futureId() {
+        return futId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean waitForCoordinatorInit() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean processedFromNioThread() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeLong("futId", futId))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                futId = reader.readLong("futId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(MvccFutureResponse.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return 138;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 1;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onAckReceived() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(MvccFutureResponse.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccMessage.java
new file mode 100644
index 0000000..6d8b3c4
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccMessage.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc.msg;
+
+import org.apache.ignite.plugin.extensions.communication.Message;
+
+/**
+ * Common interface for all MVCC-related messages.
+ */
+public interface MvccMessage extends Message {
+    /**
+     * @return {@code True} if should wait for coordinator initialization.
+     */
+    public boolean waitForCoordinatorInit();
+
+    /**
+     * @return {@code True} if message should be processed from NIO thread.
+     */
+    public boolean processedFromNioThread();
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccQuerySnapshotRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccQuerySnapshotRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccQuerySnapshotRequest.java
new file mode 100644
index 0000000..75d33a7
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccQuerySnapshotRequest.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc.msg;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ * Request to get MVCC snapshot for a query.
+ */
+public class MvccQuerySnapshotRequest implements MvccMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private long futId;
+
+    /**
+     * Required by {@link GridIoMessageFactory}.
+     */
+    public MvccQuerySnapshotRequest() {
+        // No-op.
+    }
+
+    /**
+     * @param futId Future ID.
+     */
+    public MvccQuerySnapshotRequest(long futId) {
+        this.futId = futId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean waitForCoordinatorInit() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean processedFromNioThread() {
+        return true;
+    }
+
+    /**
+     * @return Future ID.
+     */
+    public long futureId() {
+        return futId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeLong("futId", futId))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                futId = reader.readLong("futId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(MvccQuerySnapshotRequest.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return 139;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 1;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onAckReceived() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(MvccQuerySnapshotRequest.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccSnapshotResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccSnapshotResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccSnapshotResponse.java
new file mode 100644
index 0000000..196003c
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccSnapshotResponse.java
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc.msg;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccLongList;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshotWithoutTxs;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ *
+ */
+public class MvccSnapshotResponse implements MvccMessage, MvccSnapshot, MvccLongList {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private long futId;
+
+    /** */
+    private long crdVer;
+
+    /** */
+    private long cntr;
+
+    /** */
+    private int opCntr;
+
+    /** */
+    @GridDirectTransient
+    private int txsCnt;
+
+    /** */
+    private long[] txs;
+
+    /** */
+    private long cleanupVer;
+
+    /** */
+    @GridDirectTransient
+    private long tracking;
+
+    /**
+     * Required by {@link GridIoMessageFactory}.
+     */
+    public MvccSnapshotResponse() {
+        // No-op.
+    }
+
+    /**
+     * @param futId Future ID.
+     * @param crdVer Coordinator version.
+     * @param cntr Counter.
+     * @param opCntr Operation counter.
+     * @param cleanupVer Cleanup version.
+     * @param tracking Tracking number.
+     */
+    public void init(long futId, long crdVer, long cntr, int opCntr, long cleanupVer, long tracking) {
+        this.futId = futId;
+        this.crdVer = crdVer;
+        this.cntr = cntr;
+        this.opCntr = opCntr;
+        this.cleanupVer = cleanupVer;
+        this.tracking = tracking;
+
+        if (txsCnt > 0 && txs.length > txsCnt) // truncate if necessary
+            txs = Arrays.copyOf(txs, txsCnt);
+    }
+
+    /**
+     * @param txId Transaction counter.
+     */
+    public void addTx(long txId) {
+        if (txs == null)
+            txs = new long[4];
+        else if (txs.length == txsCnt)
+            txs = Arrays.copyOf(txs, txs.length << 1);
+
+        txs[txsCnt++] = txId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int size() {
+        return txsCnt;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long get(int i) {
+        return txs[i];
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean contains(long val) {
+        for (int i = 0; i < txsCnt; i++) {
+            if (txs[i] == val)
+                return true;
+        }
+
+        return false;
+    }
+
+    /**
+     * @return Tracking counter.
+     */
+    public long tracking() {
+        return tracking;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean waitForCoordinatorInit() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean processedFromNioThread() {
+        return false;
+    }
+
+    /**
+     * @return Future ID.
+     */
+    public long futureId() {
+        return futId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long cleanupVersion() {
+        return cleanupVer;
+    }
+
+    /** {@inheritDoc} */
+    public long counter() {
+        return cntr;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int operationCounter() {
+        return opCntr;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void incrementOperationCounter() {
+        opCntr++;
+    }
+
+    /** {@inheritDoc} */
+    @Override public MvccLongList activeTransactions() {
+        return this;
+    }
+
+    /** {@inheritDoc} */
+    @Override public MvccSnapshot withoutActiveTransactions() {
+        if (txsCnt > 0)
+            return new MvccSnapshotWithoutTxs(crdVer, cntr, opCntr, cleanupVer);
+
+        return this;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long coordinatorVersion() {
+        return crdVer;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeLong("cleanupVer", cleanupVer))
+                    return false;
+
+                writer.incrementState();
+
+            case 1:
+                if (!writer.writeLong("cntr", cntr))
+                    return false;
+
+                writer.incrementState();
+
+            case 2:
+                if (!writer.writeLong("crdVer", crdVer))
+                    return false;
+
+                writer.incrementState();
+
+            case 3:
+                if (!writer.writeLong("futId", futId))
+                    return false;
+
+                writer.incrementState();
+
+            case 4:
+                if (!writer.writeInt("opCntr", opCntr))
+                    return false;
+
+                writer.incrementState();
+
+            case 5:
+                if (!writer.writeLongArray("txs", txs))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                cleanupVer = reader.readLong("cleanupVer");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 1:
+                cntr = reader.readLong("cntr");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 2:
+                crdVer = reader.readLong("crdVer");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 3:
+                futId = reader.readLong("futId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 4:
+                opCntr = reader.readInt("opCntr");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 5:
+                txs = reader.readLongArray("txs");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                txsCnt = txs != null ? txs.length : 0;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(MvccSnapshotResponse.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return 141;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 6;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onAckReceived() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(MvccSnapshotResponse.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccTxSnapshotRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccTxSnapshotRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccTxSnapshotRequest.java
new file mode 100644
index 0000000..cd30eb8
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccTxSnapshotRequest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc.msg;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ * Request to get MVCC snapshot for a new transaction.
+ */
+public class MvccTxSnapshotRequest implements MvccMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private long futId;
+
+    /**
+     * Required by {@link GridIoMessageFactory}.
+     */
+    public MvccTxSnapshotRequest() {
+        // No-op.
+    }
+
+    /**
+     * @param futId Future ID.
+     */
+    public MvccTxSnapshotRequest(long futId) {
+        this.futId = futId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean waitForCoordinatorInit() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean processedFromNioThread() {
+        return true;
+    }
+
+    /**
+     * @return Future ID.
+     */
+    public long futureId() {
+        return futId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeLong("futId", futId))
+                    return false;
+
+                writer.incrementState();
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                futId = reader.readLong("futId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+        }
+
+        return reader.afterMessageRead(MvccTxSnapshotRequest.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return 136;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 1;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onAckReceived() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(MvccTxSnapshotRequest.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccWaitTxsRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccWaitTxsRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccWaitTxsRequest.java
new file mode 100644
index 0000000..ae57507
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccWaitTxsRequest.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc.msg;
+
+import java.nio.ByteBuffer;
+
+import org.apache.ignite.internal.util.GridLongList;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ *
+ */
+public class MvccWaitTxsRequest implements MvccMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private long futId;
+
+    /** */
+    private GridLongList txs;
+
+    /**
+     *
+     */
+    public MvccWaitTxsRequest() {
+        // No-op.
+    }
+
+    /**
+     * @param futId Future ID.
+     * @param txs Transactions to wait for.
+     */
+    public MvccWaitTxsRequest(long futId, GridLongList txs) {
+        assert txs != null && txs.size() > 0 : txs;
+
+        this.futId = futId;
+        this.txs = txs;
+    }
+
+    /**
+     * @return Future ID.
+     */
+    public long futureId() {
+        return futId;
+    }
+
+    /**
+     * @return Transactions to wait for.
+     */
+    public GridLongList transactions() {
+        return txs;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean waitForCoordinatorInit() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean processedFromNioThread() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeLong("futId", futId))
+                    return false;
+
+                writer.incrementState();
+
+            case 1:
+                if (!writer.writeMessage("txs", txs))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                futId = reader.readLong("futId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 1:
+                txs = reader.readMessage("txs");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(MvccWaitTxsRequest.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return 142;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 2;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onAckReceived() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(MvccWaitTxsRequest.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/txlog/TxKey.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/txlog/TxKey.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/txlog/TxKey.java
new file mode 100644
index 0000000..92aff7b
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/txlog/TxKey.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc.txlog;
+
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ *
+ */
+public class TxKey {
+    /** */
+    private final long major;
+
+    /** */
+    private final long minor;
+
+    /**
+     * @param major Major version.
+     * @param minor Minor version
+     */
+    public TxKey(long major, long minor) {
+        this.major = major;
+        this.minor = minor;
+    }
+
+    /**
+     * @return Major version.
+     */
+    public long major() {
+        return major;
+    }
+
+    /**
+     * @return Minor version.
+     */
+    public long minor() {
+        return minor;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || o.getClass() != TxKey.class) return false;
+
+        TxKey txKey = (TxKey) o;
+
+        return major == txKey.major && minor == txKey.minor;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        int result = (int) (major ^ (major >>> 32));
+        result = 31 * result + (int) (minor ^ (minor >>> 32));
+        return result;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(TxKey.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/txlog/TxLog.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/txlog/TxLog.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/txlog/TxLog.java
new file mode 100644
index 0000000..905bfc4
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/txlog/TxLog.java
@@ -0,0 +1,584 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc.txlog;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.pagemem.PageIdUtils;
+import org.apache.ignite.internal.pagemem.PageMemory;
+import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
+import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageInitRecord;
+import org.apache.ignite.internal.processors.cache.persistence.DbCheckpointListener;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx;
+import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionMetaIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseList;
+import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseListImpl;
+import org.apache.ignite.internal.processors.cache.persistence.tree.util.PageHandler;
+import org.apache.ignite.internal.util.IgniteTree;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.internal.pagemem.PageIdAllocator.FLAG_IDX;
+import static org.apache.ignite.internal.pagemem.PageIdAllocator.INDEX_PARTITION;
+import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.MVCC_COUNTER_NA;
+import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.MVCC_CRD_COUNTER_NA;
+
+/**
+ *
+ */
+public class TxLog implements DbCheckpointListener {
+    /** */
+    public static final String TX_LOG_CACHE_NAME = "TxLog";
+
+    /** */
+    public static final int TX_LOG_CACHE_ID = CU.cacheId(TX_LOG_CACHE_NAME);
+
+    /** */
+    private static final TxKey LOWEST = new TxKey(0, 0);
+
+    /** */
+    private final IgniteCacheDatabaseSharedManager mgr;
+
+    /** */
+    private ReuseListImpl reuseList;
+
+    /** */
+    private TxLogTree tree;
+
+    /** */
+    private ConcurrentMap<TxKey, Sync> keyMap = new ConcurrentHashMap<>();
+
+    /**
+     *
+     * @param ctx Kernal context.
+     * @param mgr Database shared manager.
+     */
+    public TxLog(GridKernalContext ctx, IgniteCacheDatabaseSharedManager mgr) throws IgniteCheckedException {
+        this.mgr = mgr;
+
+        init(ctx);
+    }
+
+    /**
+     *
+     * @param ctx Kernal context.
+     * @throws IgniteCheckedException If failed.
+     */
+    private void init(GridKernalContext ctx) throws IgniteCheckedException {
+        if (CU.isPersistenceEnabled(ctx.config())) {
+            mgr.checkpointReadLock();
+
+            try {
+                IgniteWriteAheadLogManager wal = ctx.cache().context().wal();
+                PageMemoryEx pageMemory = (PageMemoryEx)mgr.dataRegion(TX_LOG_CACHE_NAME).pageMemory();
+
+                long partMetaId = pageMemory.partitionMetaPageId(TX_LOG_CACHE_ID, 0);
+                long partMetaPage = pageMemory.acquirePage(TX_LOG_CACHE_ID, partMetaId);
+
+                long treeRoot, reuseListRoot;
+
+                boolean isNew = false;
+
+                try {
+                    long pageAddr = pageMemory.writeLock(TX_LOG_CACHE_ID, partMetaId, partMetaPage);
+
+                    try {
+                        if (PageIO.getType(pageAddr) != PageIO.T_PART_META) {
+                            // Initialize new page.
+                            PagePartitionMetaIO io = PagePartitionMetaIO.VERSIONS.latest();
+
+                            io.initNewPage(pageAddr, partMetaId, pageMemory.pageSize());
+
+                            treeRoot = pageMemory.allocatePage(TX_LOG_CACHE_ID, 0, PageMemory.FLAG_DATA);
+                            reuseListRoot = pageMemory.allocatePage(TX_LOG_CACHE_ID, 0, PageMemory.FLAG_DATA);
+
+                            assert PageIdUtils.flag(treeRoot) == PageMemory.FLAG_DATA;
+                            assert PageIdUtils.flag(reuseListRoot) == PageMemory.FLAG_DATA;
+
+                            io.setTreeRoot(pageAddr, treeRoot);
+                            io.setReuseListRoot(pageAddr, reuseListRoot);
+
+                            if (PageHandler.isWalDeltaRecordNeeded(pageMemory, TX_LOG_CACHE_ID, partMetaId, partMetaPage, wal, null))
+                                wal.log(new MetaPageInitRecord(
+                                    TX_LOG_CACHE_ID,
+                                    partMetaId,
+                                    io.getType(),
+                                    io.getVersion(),
+                                    treeRoot,
+                                    reuseListRoot
+                                ));
+
+                            isNew = true;
+                        }
+                        else {
+                            PagePartitionMetaIO io = PageIO.getPageIO(pageAddr);
+
+                            treeRoot = io.getTreeRoot(pageAddr);
+                            reuseListRoot = io.getReuseListRoot(pageAddr);
+
+                            assert PageIdUtils.flag(treeRoot) == PageMemory.FLAG_DATA :
+                                U.hexLong(treeRoot) + ", part=" + 0 + ", TX_LOG_CACHE_ID=" + TX_LOG_CACHE_ID;
+                            assert PageIdUtils.flag(reuseListRoot) == PageMemory.FLAG_DATA :
+                                U.hexLong(reuseListRoot) + ", part=" + 0 + ", TX_LOG_CACHE_ID=" + TX_LOG_CACHE_ID;
+                        }
+                    }
+                    finally {
+                        pageMemory.writeUnlock(TX_LOG_CACHE_ID, partMetaId, partMetaPage, null, isNew);
+                    }
+                }
+                finally {
+                    pageMemory.releasePage(TX_LOG_CACHE_ID, partMetaId, partMetaPage);
+                }
+
+                reuseList = new ReuseListImpl(
+                    TX_LOG_CACHE_ID,
+                    TX_LOG_CACHE_NAME,
+                    pageMemory,
+                    wal,
+                    reuseListRoot,
+                    isNew);
+
+                tree = new TxLogTree(pageMemory, wal, treeRoot, reuseList, ctx.failure(), isNew);
+
+                ((GridCacheDatabaseSharedManager)mgr).addCheckpointListener(this);
+            }
+            finally {
+                mgr.checkpointReadUnlock();
+            }
+        }
+        else {
+            PageMemory pageMemory = mgr.dataRegion(TX_LOG_CACHE_NAME).pageMemory();
+            ReuseList reuseList1 = mgr.reuseList(TX_LOG_CACHE_NAME);
+
+            long treeRoot;
+
+            if ((treeRoot = reuseList1.takeRecycledPage()) == 0L)
+                treeRoot = pageMemory.allocatePage(TX_LOG_CACHE_ID, INDEX_PARTITION, FLAG_IDX);
+
+            tree = new TxLogTree(pageMemory, null, treeRoot, reuseList1, ctx.failure(), true);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onCheckpointBegin(Context ctx) throws IgniteCheckedException {
+        reuseList.saveMetadata();
+    }
+
+    /**
+     *
+     * @param major Major version.
+     * @param minor Minor version.
+     * @return Transaction state for given version.
+     * @throws IgniteCheckedException If failed.
+     */
+    public byte get(long major, long minor) throws IgniteCheckedException {
+        return get(new TxKey(major, minor));
+    }
+
+    /**
+     *
+     * @param key Transaction key.
+     * @return Transaction state for given version.
+     * @throws IgniteCheckedException If failed.
+     */
+    public byte get(TxKey key) throws IgniteCheckedException {
+        TxRow row = tree.findOne(key);
+
+        return row == null ? TxState.NA : row.state();
+    }
+
+    /**
+     *
+     * @param key TxKey.
+     * @param state  Transaction state for given version.
+     * @param primary Flag if this is a primary node.
+     * @throws IgniteCheckedException If failed.
+     */
+    public void put(TxKey key, byte state, boolean primary) throws IgniteCheckedException {
+        Sync sync = syncObject(key);
+
+        try {
+            mgr.checkpointReadLock();
+
+            try {
+                synchronized (sync) {
+                    tree.invoke(key, null, new TxLogUpdateClosure(key.major(), key.minor(), state, primary));
+                }
+            }
+            finally {
+                mgr.checkpointReadUnlock();
+            }
+        } finally {
+            evict(key, sync);
+        }
+    }
+
+    /**
+     * Removes all records less or equals to the given version.
+     *
+     * @param major Major version.
+     * @param minor Minor version.
+     * @throws IgniteCheckedException If failed.
+     */
+    public void removeUntil(long major, long minor) throws IgniteCheckedException {
+        TraversingClosure clo = new TraversingClosure(major, minor);
+
+        tree.iterate(LOWEST, clo, clo);
+
+        if (clo.rows != null) {
+            for (TxKey row : clo.rows) {
+                remove(row);
+            }
+        }
+    }
+
+    /** */
+    private void remove(TxKey key) throws IgniteCheckedException {
+        Sync sync = syncObject(key);
+
+        try {
+            mgr.checkpointReadLock();
+
+            try {
+                synchronized (sync) {
+                    tree.removex(key);
+                }
+            }
+            finally {
+                mgr.checkpointReadUnlock();
+            }
+        } finally {
+            evict(key, sync);
+        }
+    }
+
+    /** */
+    private Sync syncObject(TxKey key) {
+        Sync sync = keyMap.get(key);
+
+        while (true) {
+            if (sync == null) {
+                Sync old = keyMap.putIfAbsent(key, sync = new Sync());
+
+                if (old == null)
+                    return sync;
+                else
+                    sync = old;
+            }
+            else {
+                int cntr = sync.counter;
+
+                while (cntr > 0) {
+                    if (sync.casCounter(cntr, cntr + 1))
+                        return sync;
+
+                    cntr = sync.counter;
+                }
+
+                sync = keyMap.get(key);
+            }
+        }
+    }
+
+    /** */
+    private void evict(TxKey key, Sync sync) {
+        assert sync != null;
+
+        int cntr = sync.counter;
+
+        while (true) {
+            assert cntr > 0;
+
+            if (!sync.casCounter(cntr, cntr - 1)) {
+                cntr = sync.counter;
+
+                continue;
+            }
+
+            if (cntr == 1) {
+                boolean removed = keyMap.remove(key, sync);
+
+                assert removed;
+            }
+
+            break;
+        }
+    }
+
+    /**
+     *
+     */
+    private static class TraversingClosure extends TxKey implements BPlusTree.TreeRowClosure<TxKey, TxRow> {
+        /** */
+        private List<TxKey> rows;
+
+        /**
+         *
+         * @param major Major version.
+         * @param minor Minor version.
+         */
+        TraversingClosure(long major, long minor) {
+            super(major, minor);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean apply(BPlusTree<TxKey, TxRow> tree, BPlusIO<TxKey> io, long pageAddr,
+                                       int idx) throws IgniteCheckedException {
+
+            if (rows == null)
+                rows = new ArrayList<>();
+
+            TxLogIO logIO = (TxLogIO)io;
+            int offset = io.offset(idx);
+
+            rows.add(new TxKey(logIO.getMajor(pageAddr, offset), logIO.getMinor(pageAddr, offset)));
+
+            return true;
+        }
+    }
+
+    /** */
+    private static class Sync {
+        /** */
+        private static final AtomicIntegerFieldUpdater<Sync> UPD = AtomicIntegerFieldUpdater.newUpdater(Sync.class, "counter");
+
+        /** */
+        volatile int counter = 1;
+
+        /** */
+        boolean casCounter(int old, int upd) {
+            return UPD.compareAndSet(this, old, upd);
+        }
+    }
+
+    /**
+     * TxLog update closure.
+     */
+    private static final class TxLogUpdateClosure implements IgniteTree.InvokeClosure<TxRow> {
+        /** */
+        private final long major;
+
+        /** */
+        private final long minor;
+
+        /** */
+        private final byte newState;
+
+        /** */
+        private final boolean primary;
+
+        /** */
+        private IgniteTree.OperationType treeOp;
+
+        /**
+         *
+         * @param major Coordinator version.
+         * @param minor Counter.
+         * @param newState New Tx newState.
+         * @param primary Flag if this is primary node.
+         */
+        TxLogUpdateClosure(long major, long minor, byte newState, boolean primary) {
+            assert major > MVCC_CRD_COUNTER_NA && minor > MVCC_COUNTER_NA && newState != TxState.NA;
+            this.major = major;
+            this.minor = minor;
+            this.newState = newState;
+            this.primary = primary;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void call(@Nullable TxRow row) {
+            if (row == null) {
+                valid();
+
+                return;
+            }
+
+            byte currState = row.state();
+
+            switch (currState) {
+                case TxState.NA:
+                    checkNa(currState);
+
+                    break;
+
+                case TxState.PREPARED:
+                    checkPrepared(currState);
+
+                    break;
+
+                case TxState.COMMITTED:
+                    checkCommitted(currState);
+
+                    break;
+
+                case TxState.ABORTED:
+                    checkAborted(currState);
+
+                    break;
+
+                default:
+                    throw new IllegalStateException("Unknown tx state: " + currState);
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public TxRow newRow() {
+            return treeOp == IgniteTree.OperationType.PUT ? new TxRow(major, minor, newState) : null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public IgniteTree.OperationType operationType() {
+            return treeOp;
+        }
+
+        /**
+         * Checks update possibility for {@code TxState.NA} tx status.
+         *
+         * @param currState Current tx state.
+         */
+        private void checkNa(byte currState) {
+            switch (newState) {
+                case TxState.ABORTED:
+                case TxState.PREPARED:
+                    valid();
+
+                    break;
+
+                case TxState.COMMITTED:
+                    invalid(currState); // TODO IGNITE-8445
+
+                    break;
+
+                default:
+                    invalid(currState);
+            }
+        }
+
+        /**
+         * Checks update possibility for {@code TxState.PREPARED} status.
+         *
+         * @param currState Current tx state.
+         */
+        private void checkPrepared(byte currState) {
+            switch (newState) {
+                case TxState.ABORTED:
+                case TxState.COMMITTED:
+                    valid();
+
+                    break;
+
+                case TxState.PREPARED:
+                    ignore();
+
+                    break;
+
+                default:
+                    invalid(currState);
+            }
+        }
+
+        /**
+         * Checks update possibility for {@code TxState.COMMITTED} status.
+         *
+         * @param currState Current tx state.
+         */
+        private void checkCommitted(byte currState) {
+            switch (newState) {
+                case TxState.COMMITTED:
+                    ignore();
+
+                    break;
+
+                case TxState.PREPARED:
+                    if (primary)
+                        ignore(); // In case when remote tx has updated the current state before.
+                    else
+                        invalid(currState);
+
+                    break;
+
+                default:
+                    invalid(currState);
+            }
+        }
+
+        /**
+         * Checks update possibility for {@code TxState.ABORTED} status.
+         *
+         * @param currState Current tx state.
+         */
+        private void checkAborted(byte currState) {
+            switch (newState) {
+                case TxState.ABORTED:
+                    ignore();
+
+                    break;
+
+                case TxState.PREPARED:
+                    if (primary)
+                        ignore(); // In case when remote tx has updated the current state before.
+                    else
+                        invalid(currState);
+
+                    break;
+
+                default:
+                    invalid(currState);
+            }
+        }
+
+        /**
+         * Action for valid tx status update.
+         */
+        private void valid() {
+            assert treeOp == null;
+
+            treeOp = IgniteTree.OperationType.PUT;
+        }
+
+        /**
+         * Action for invalid tx status update.
+         */
+        private void invalid(byte currState) {
+            assert treeOp == null;
+
+            throw new IllegalStateException("Unexpected new transaction state. [currState=" +
+                currState +  ", newState=" + newState +  ", cntr=" + minor +']');
+        }
+
+        /**
+         * Action for ignoring tx status update.
+         */
+        private void ignore() {
+            assert treeOp == null;
+
+            treeOp = IgniteTree.OperationType.NOOP;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/txlog/TxLogIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/txlog/TxLogIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/txlog/TxLogIO.java
new file mode 100644
index 0000000..e952b43
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/txlog/TxLogIO.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc.txlog;
+
+/**
+ *
+ */
+public interface TxLogIO {
+    /**
+     * @param pageAddr Page address.
+     * @param off Item offset.
+     * @param row Row to compare with.
+     * @return Comparision result.
+     */
+    int compare(long pageAddr, int off, TxKey row);
+
+    /**
+     * @param pageAddr Page address.
+     * @param off Item offset.
+     * @return Major version
+     */
+    long getMajor(long pageAddr, int off);
+
+    /**
+     * @param pageAddr Page address.
+     * @param off Item offset.
+     * @param major Major version
+     */
+    void setMajor(long pageAddr, int off, long major);
+
+    /**
+     * @param pageAddr Page address.
+     * @param off Item offset.
+     * @return Minor version.
+     */
+    long getMinor(long pageAddr, int off);
+
+    /**
+     * @param pageAddr Page address.
+     * @param off Item offset.
+     * @param minor Minor version.
+     */
+    void setMinor(long pageAddr, int off, long minor);
+
+    /**
+     * @param pageAddr Page address.
+     * @param off Item offset.
+     * @return Transaction state.
+     */
+    byte getState(long pageAddr, int off);
+
+    /**
+     * @param pageAddr Page address.
+     * @param off Item offset.
+     * @param state Transaction state.
+     */
+    void setState(long pageAddr, int off, byte state);
+}