You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ir...@apache.org on 2018/05/11 15:49:57 UTC

[1/2] ignite git commit: IGNITE-5874 Store TTL expire times in B+ tree on per-partition basis - Fixes #3231.

Repository: ignite
Updated Branches:
  refs/heads/master 01f60542e -> 89c775737
Updated Tags:  refs/tags/1.0.0-RELEASE-TEST-RC1 [created] a37da05fb
  refs/tags/1.0.0-RELEASE-TEST-RC2 [created] 16ba1421d
  refs/tags/1.0.0-RELEASE-TEST-RC3 [created] 1b3c8184d
  refs/tags/1.0.0-RELEASE-TEST-RC4 [created] d9ab06450
  refs/tags/1.0.1-RELEASE-TEST-RC5 [created] eee94fa28
  refs/tags/1.0.1-RELEASE-TEST-RC6 [created] 4de9e8dc5
  refs/tags/1.10.0.ea1 [created] 0dae7316d
  refs/tags/1.10.0.ea2 [created] ab4808545
  refs/tags/1.10.0.ea3 [created] 62df82653
  refs/tags/1.10.0.ea5 [created] 30460d2a5
  refs/tags/1.10.1.ea4 [created] 8c83d167d
  refs/tags/1.5.1-QAVS1901 [created] 5c8283571
  refs/tags/1.5.10 [created] 4bf9edd68
  refs/tags/1.5.11 [created] 460f0078f
  refs/tags/1.5.12 [created] fb675772d
  refs/tags/1.5.14 [created] 1362553b4
  refs/tags/1.5.15 [created] 78e671138
  refs/tags/1.5.16 [created] b39f54f21
  refs/tags/1.5.17 [created] d4de1cc0f
  refs/tags/1.5.18 [created] 67b8c1ae2
  refs/tags/1.5.19 [created] 953e9db0d
  refs/tags/1.5.20 [created] fca3c78c7
  refs/tags/1.5.21 [created] bd1916bfe
  refs/tags/1.5.22 [created] 597ea0cf5
  refs/tags/1.5.23 [created] f2832c6ff
  refs/tags/1.5.24 [created] 692cdf3e4
  refs/tags/1.5.25 [created] c1a354a7a
  refs/tags/1.5.26 [created] 0d4dbc416
  refs/tags/1.5.27 [created] 847b27814
  refs/tags/1.5.28 [created] 55dbc8a62
  refs/tags/1.5.29 [created] a1980c6f6
  refs/tags/1.5.30 [created] ae7e6cc95
  refs/tags/1.5.31 [created] 767f88bcf
  refs/tags/1.5.32 [created] f57631d98
  refs/tags/1.5.33 [created] 1986b93b2
  refs/tags/1.5.4 [created] db62c7a78
  refs/tags/1.5.5 [created] 8f3ae6bab
  refs/tags/1.5.5-QATEST [created] 6b4e4be3a
  refs/tags/1.5.6 [created] 75591a935
  refs/tags/1.5.7 [created] c9020c405
  refs/tags/1.5.7-QATEST [created] 80b21ebb8
  refs/tags/1.5.7-TEST [created] 57c19c20a
  refs/tags/1.5.8 [created] 210366eb7
  refs/tags/1.5.9 [created] cc595929c
  refs/tags/1.6.1 [created] b418cea04
  refs/tags/1.6.10 [created] f1c424bd2
  refs/tags/1.6.11 [created] babff41f0
  refs/tags/1.6.12 [created] a22010d32
  refs/tags/1.6.2 [created] 072e3b3ce
  refs/tags/1.6.3 [created] 9d0212018
  refs/tags/1.6.5 [created] 62c101cf3
  refs/tags/1.6.6 [created] f14c6fb25
  refs/tags/1.6.7 [created] 8e5ecdde0
  refs/tags/1.6.8 [created] 4ef6b1743
  refs/tags/1.6.8-QAVS1902 [created] 5739b6a1c
  refs/tags/1.6.9 [created] 93723542e
  refs/tags/1.7.1 [created] 3dd286282
  refs/tags/1.7.2 [created] 9e67197e3
  refs/tags/1.7.3 [created] 596479c91
  refs/tags/1.7.4 [created] 3fae2e313
  refs/tags/1.7.4-p1 [created] 0da3c2ed0
  refs/tags/1.7.5 [created] ba3cccc88
  refs/tags/1.8.0-b1 [created] 947266517
  refs/tags/1.8.0.b2 [created] 5b0cedfe4
  refs/tags/1.8.1 [created] 8fe1fc191
  refs/tags/1.8.2 [created] f255ff094
  refs/tags/2.2.4-test [created] e8e0d75d7
  refs/tags/2.2.5-test [created] 36cb161e9


http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/migration/UpgradePendingTreeToPerPartitionTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/migration/UpgradePendingTreeToPerPartitionTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/migration/UpgradePendingTreeToPerPartitionTask.java
new file mode 100644
index 0000000..6fa039d
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/migration/UpgradePendingTreeToPerPartitionTask.java
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.migration;
+
+import java.util.Set;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.pagemem.PageIdUtils;
+import org.apache.ignite.internal.pagemem.PageMemory;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheOffheapManager;
+import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.IndexStorage;
+import org.apache.ignite.internal.processors.cache.persistence.RootPage;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
+import org.apache.ignite.internal.processors.cache.tree.PendingEntriesTree;
+import org.apache.ignite.internal.processors.cache.tree.PendingRow;
+import org.apache.ignite.internal.util.IgniteTree;
+import org.apache.ignite.internal.util.lang.GridCursor;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.resources.LoggerResource;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.internal.processors.cache.tree.PendingEntriesTree.WITHOUT_KEY;
+
+/**
+ * Ignite native persistence migration task upgrades existed PendingTrees to per-partition basis. It's ignore possible
+ * assertions errors when a pointer to an entry exists in tree but the entry itself was removed due to some reason (e.g.
+ * when partition was evicted after restart).
+ *
+ * Task goes through persistent cache groups and copy entries to certain partitions.
+ */
+public class UpgradePendingTreeToPerPartitionTask implements IgniteCallable<Boolean> {
+    /** */
+    private static final String PENDING_ENTRIES_TREE_NAME = "PendingEntries";
+
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    public static final int BATCH_SIZE = 500;
+
+    /** */
+    @IgniteInstanceResource
+    private IgniteEx node;
+
+    /** */
+    @LoggerResource
+    private IgniteLogger log;
+
+    /** {@inheritDoc} */
+    @Override public Boolean call() throws IgniteException {
+        GridCacheSharedContext<Object, Object> sharedCtx = node.context().cache().context();
+
+        for (CacheGroupContext grp : sharedCtx.cache().cacheGroups()) {
+            if (!grp.persistenceEnabled() || !grp.affinityNode()) {
+                if (!grp.persistenceEnabled())
+                    log.info("Skip pending tree upgrade for non-persistent cache group: [grpId=" + grp.groupId() +
+                        ", grpName=" + grp.name() + ']');
+                else
+                    log.info("Skip pending tree upgrade on non-affinity node for cache group: [grpId=" + grp.groupId() +
+                        ", grpName=" + grp.name() + ']');
+
+                continue;
+            }
+
+            try {
+                processCacheGroup(grp);
+            }
+            catch (Exception ex) {
+                if (Thread.interrupted() || X.hasCause(ex, InterruptedException.class))
+                    log.info("Upgrade pending tree has been cancelled.");
+                else
+                    log.warning("Failed to upgrade pending tree for cache group:  [grpId=" + grp.groupId() +
+                        ", grpName=" + grp.name() + ']', ex);
+
+                return false;
+            }
+
+            if (Thread.interrupted()) {
+                log.info("Upgrade pending tree has been cancelled.");
+
+                return false;
+            }
+        }
+
+        log.info("All pending trees upgraded successfully.");
+
+        return true;
+    }
+
+    /**
+     * Converts CacheGroup pending tree to per-partition basis.
+     *
+     * @param grp Cache group.
+     * @throws IgniteCheckedException If error occurs.
+     */
+    private void processCacheGroup(CacheGroupContext grp) throws IgniteCheckedException {
+        assert grp.offheap() instanceof GridCacheOffheapManager;
+
+        PendingEntriesTree oldPendingTree;
+
+        final IgniteCacheDatabaseSharedManager db = grp.shared().database();
+
+        db.checkpointReadLock();
+        try {
+            IndexStorage indexStorage = ((GridCacheOffheapManager)grp.offheap()).getIndexStorage();
+
+            //TODO: IGNITE-5874: replace with some check-method to avoid unnecessary page allocation.
+            RootPage pendingRootPage = indexStorage.getOrAllocateForTree(PENDING_ENTRIES_TREE_NAME);
+
+            if (pendingRootPage.isAllocated()) {
+                log.info("No pending tree found for cache group: [grpId=" + grp.groupId() +
+                    ", grpName=" + grp.name() + ']');
+
+                // Nothing to do here as just allocated tree is obviously empty.
+                indexStorage.dropRootPage(PENDING_ENTRIES_TREE_NAME);
+
+                return;
+            }
+
+            oldPendingTree = new PendingEntriesTree(
+                grp,
+                PENDING_ENTRIES_TREE_NAME,
+                grp.dataRegion().pageMemory(),
+                pendingRootPage.pageId().pageId(),
+                ((GridCacheOffheapManager)grp.offheap()).reuseListForIndex(null),
+                false
+            );
+        }
+        finally {
+            db.checkpointReadUnlock();
+        }
+
+        processPendingTree(grp, oldPendingTree);
+
+        if (Thread.currentThread().isInterrupted())
+            return;
+
+        db.checkpointReadLock();
+        try {
+            oldPendingTree.destroy();
+        }
+        finally {
+            db.checkpointReadUnlock();
+        }
+    }
+
+    /**
+     * Move pending rows for CacheGroup entries to per-partition PendingTree.
+     * Invalid pending rows will be ignored.
+     *
+     * @param grp Cache group.
+     * @param oldPendingEntries Old-style PendingTree.
+     * @throws IgniteCheckedException If error occurs.
+     */
+    private void processPendingTree(CacheGroupContext grp, PendingEntriesTree oldPendingEntries)
+        throws IgniteCheckedException {
+        final PageMemory pageMemory = grp.dataRegion().pageMemory();
+
+        final IgniteCacheDatabaseSharedManager db = grp.shared().database();
+
+        final Set<Integer> cacheIds = grp.cacheIds();
+
+        PendingRow row = null;
+
+        int processedEntriesCnt = 0;
+        int skippedEntries = 0;
+
+        // Re-acquire checkpoint lock for every next batch.
+        while (!Thread.currentThread().isInterrupted()) {
+            int cnt = 0;
+
+            db.checkpointReadLock();
+            try {
+                GridCursor<PendingRow> cursor = oldPendingEntries.find(row, null, WITHOUT_KEY);
+
+                while (cnt++ < BATCH_SIZE && cursor.next()) {
+                    row = cursor.get();
+
+                    assert row.link != 0 && row.expireTime != 0 : row;
+
+                    GridCacheEntryEx entry;
+
+                    // Lost cache or lost entry.
+                    if (!cacheIds.contains(row.cacheId) || (entry = getEntry(grp, row)) == null) {
+                        skippedEntries++;
+
+                        oldPendingEntries.removex(row);
+
+                        continue;
+                    }
+
+                    entry.lockEntry();
+                    try {
+                        if (processRow(pageMemory, grp, row))
+                            processedEntriesCnt++;
+                        else
+                            skippedEntries++;
+                    }
+                    finally {
+                        entry.unlockEntry();
+                    }
+
+                    oldPendingEntries.removex(row);
+                }
+
+                if (cnt < BATCH_SIZE)
+                    break;
+            }
+            finally {
+                db.checkpointReadUnlock();
+            }
+        }
+
+        log.info("PendingTree upgraded: " +
+            "[grpId=" + grp.groupId() +
+            ", grpName=" + grp.name() +
+            ", processedEntries=" + processedEntriesCnt +
+            ", failedEntries=" + skippedEntries +
+            ']');
+    }
+
+    /**
+     * Return CacheEntry instance for lock purpose.
+     *
+     * @param grp Cache group
+     * @param row Pending row.
+     * @return CacheEntry if found or null otherwise.
+     */
+    private GridCacheEntryEx getEntry(CacheGroupContext grp, PendingRow row) {
+        try {
+            CacheDataRowAdapter rowData = new CacheDataRowAdapter(row.link);
+
+            rowData.initFromLink(grp, CacheDataRowAdapter.RowData.KEY_ONLY);
+
+            GridCacheContext cctx = grp.shared().cacheContext(row.cacheId);
+
+            assert cctx != null;
+
+            return cctx.cache().entryEx(rowData.key());
+        }
+        catch (Throwable ex) {
+            if (Thread.currentThread().isInterrupted() || X.hasCause(ex, InterruptedException.class))
+                throw new IgniteException(new InterruptedException());
+
+            log.warning("Failed to move old-version pending entry " +
+                "to per-partition PendingTree: key not found (skipping): " +
+                "[grpId=" + grp.groupId() +
+                ", grpName=" + grp.name() +
+                ", pendingRow=" + row + "]");
+
+            return null;
+        }
+
+    }
+
+    /**
+     * Validates PendingRow and add it to per-partition PendingTree.
+     *
+     * @param pageMemory Page memory.
+     * @param grp Cache group.
+     * @param row Pending row.
+     * @return {@code True} if pending row successfully moved, {@code False} otherwise.
+     */
+    private boolean processRow(PageMemory pageMemory, CacheGroupContext grp, PendingRow row) {
+        final long pageId = PageIdUtils.pageId(row.link);
+
+        final int partition = PageIdUtils.partId(pageId);
+
+        assert partition >= 0;
+
+        try {
+            final long page = pageMemory.acquirePage(grp.groupId(), pageId);
+            long pageAddr = pageMemory.readLock(grp.groupId(), pageId, page);
+            try {
+                assert PageIO.getType(pageAddr) != 0;
+                assert PageIO.getVersion(pageAddr) != 0;
+
+                IgniteCacheOffheapManager.CacheDataStore store =
+                    ((GridCacheOffheapManager)grp.offheap()).dataStore(partition);
+
+                if (store == null) {
+                    log.warning("Failed to move old-version pending entry " +
+                        "to per-partition PendingTree: Node has no partition anymore (skipping): " +
+                        "[grpId=" + grp.groupId() +
+                        ", grpName=" + grp.name() +
+                        ", partId=" + partition +
+                        ", pendingRow=" + row + "]");
+
+                    return false;
+                }
+
+                assert store instanceof GridCacheOffheapManager.GridCacheDataStore;
+                assert store.pendingTree() != null;
+
+                store.pendingTree().invoke(row, WITHOUT_KEY, new PutIfAbsentClosure(row));
+            }
+            finally {
+                pageMemory.readUnlock(grp.groupId(), pageId, page);
+            }
+        }
+        catch (AssertionError | Exception ex) {
+            if (Thread.currentThread().isInterrupted() || X.hasCause(ex, InterruptedException.class)) {
+                Thread.currentThread().interrupt();
+
+                throw new IgniteException(ex);
+            }
+
+            String msg = "Unexpected error occurs while moving old-version pending entry " +
+                "to per-partition PendingTree. Seems page doesn't longer exists (skipping): " +
+                "[grpId=" + grp.groupId() +
+                ", grpName=" + grp.name() +
+                ", partId=" + partition +
+                ", pendingRow=" + row + ']';
+
+            if (log.isDebugEnabled())
+                log.warning(msg, ex);
+            else
+                log.warning(msg);
+
+            return false;
+        }
+
+        return true;
+    }
+
+    /** */
+    private static class PutIfAbsentClosure implements IgniteTree.InvokeClosure<PendingRow> {
+        /** */
+        private final PendingRow pendingRow;
+
+        /** */
+        private IgniteTree.OperationType op;
+
+        /** */
+        PutIfAbsentClosure(PendingRow pendingRow) {
+            this.pendingRow = pendingRow;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void call(@Nullable PendingRow oldRow) throws IgniteCheckedException {
+            op = (oldRow == null) ? IgniteTree.OperationType.PUT : IgniteTree.OperationType.NOOP;
+        }
+
+        /** {@inheritDoc} */
+        @Override public PendingRow newRow() {
+            return pendingRow;
+        }
+
+        /** {@inheritDoc} */
+        @Override public IgniteTree.OperationType operationType() {
+            return op;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PageIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PageIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PageIO.java
index a5236c2..c940c39 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PageIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PageIO.java
@@ -191,7 +191,6 @@ public abstract class PageIO {
     /** */
     public static final short T_DATA_REF_METASTORAGE_LEAF = 23;
 
-
     /** Index for payload == 1. */
     public static final short T_H2_EX_REF_LEAF_START = 10000;
 
@@ -215,8 +214,8 @@ public abstract class PageIO {
      * @param ver Page format version.
      */
     protected PageIO(int type, int ver) {
-        assert ver > 0 && ver < 65535: ver;
-        assert type > 0 && type < 65535: type;
+        assert ver > 0 && ver < 65535 : ver;
+        assert type > 0 && type < 65535 : type;
 
         this.type = type;
         this.ver = ver;
@@ -245,7 +244,7 @@ public abstract class PageIO {
     public static void setType(long pageAddr, int type) {
         PageUtils.putShort(pageAddr, TYPE_OFF, (short)type);
 
-        assert getType(pageAddr) == type;
+        assert getType(pageAddr) == type : getType(pageAddr);
     }
 
     /**
@@ -268,7 +267,7 @@ public abstract class PageIO {
      * @param pageAddr Page address.
      * @param ver Version.
      */
-    private static void setVersion(long pageAddr, int ver) {
+    protected static void setVersion(long pageAddr, int ver) {
         PageUtils.putShort(pageAddr, VER_OFF, (short)ver);
 
         assert getVersion(pageAddr) == ver;
@@ -580,7 +579,7 @@ public abstract class PageIO {
      * @param pageSize Page size.
      * @param sb Sb.
      */
-    protected abstract void printPage(long addr, int pageSize, GridStringBuilder sb) throws IgniteCheckedException ;
+    protected abstract void printPage(long addr, int pageSize, GridStringBuilder sb) throws IgniteCheckedException;
 
     /**
      * @param addr Address.

http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PagePartitionMetaIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PagePartitionMetaIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PagePartitionMetaIO.java
index 3d79884..fe6b7a5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PagePartitionMetaIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PagePartitionMetaIO.java
@@ -42,9 +42,13 @@ public class PagePartitionMetaIO extends PageMetaIO {
     /** */
     private static final int NEXT_PART_META_PAGE_OFF = PARTITION_STATE_OFF + 1;
 
+    /** End of page partition meta. */
+    static final int END_OF_PARTITION_PAGE_META = NEXT_PART_META_PAGE_OFF + 8;
+
     /** */
     public static final IOVersions<PagePartitionMetaIO> VERSIONS = new IOVersions<>(
-        new PagePartitionMetaIO(1)
+        new PagePartitionMetaIO(1),
+        new PagePartitionMetaIOV2(2)
     );
 
     /** {@inheritDoc} */
@@ -150,6 +154,7 @@ public class PagePartitionMetaIO extends PageMetaIO {
 
     /**
      * Returns partition counters page identifier, page with caches in cache group sizes.
+     *
      * @param pageAddr Partition metadata page address.
      * @return Next meta partial page ID or {@code 0} if it does not exist.
      */
@@ -167,19 +172,39 @@ public class PagePartitionMetaIO extends PageMetaIO {
         PageUtils.putLong(pageAddr, NEXT_PART_META_PAGE_OFF, cntrsPageId);
     }
 
+    /**
+     * Returns partition pending tree root. Pending tree is used to tracking expiring entries.
+     *
+     * @param pageAddr Page address.
+     * @return Pending Tree root page.
+     */
+    public long getPendingTreeRoot(long pageAddr) {
+        throw new UnsupportedOperationException("Per partition pending tree is not supported by " +
+            "this PagePartitionMetaIO version: ver=" + getVersion());
+    }
+
+    /**
+     * Sets new partition pending tree root.
+     *
+     * @param pageAddr Page address.
+     * @param treeRoot Pending Tree root
+     */
+    public void setPendingTreeRoot(long pageAddr, long treeRoot) {
+        throw new UnsupportedOperationException("Per partition pending tree is not supported by " +
+            "this PagePartitionMetaIO version: ver=" + getVersion());
+    }
+
     /** {@inheritDoc} */
     @Override protected void printPage(long pageAddr, int pageSize, GridStringBuilder sb) throws IgniteCheckedException {
         super.printPage(pageAddr, pageSize, sb);
 
         byte state = getPartitionState(pageAddr);
 
-        sb
-            .a(",\nPagePartitionMeta[\n\tsize=").a(getSize(pageAddr))
+        sb.a(",\nPagePartitionMeta[\n\tsize=").a(getSize(pageAddr))
             .a(",\n\tupdateCounter=").a(getUpdateCounter(pageAddr))
             .a(",\n\tglobalRemoveId=").a(getGlobalRemoveId(pageAddr))
             .a(",\n\tpartitionState=").a(state).a("(").a(GridDhtPartitionState.fromOrdinal(state)).a(")")
             .a(",\n\tcountersPageId=").a(getCountersPageId(pageAddr))
-            .a("\n]")
-            ;
+            .a("\n]");
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PagePartitionMetaIOV2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PagePartitionMetaIOV2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PagePartitionMetaIOV2.java
new file mode 100644
index 0000000..70556a1
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PagePartitionMetaIOV2.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.tree.io;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.pagemem.PageUtils;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
+import org.apache.ignite.internal.util.GridStringBuilder;
+
+/**
+ * IO for partition metadata pages.
+ * Persistent partition contains it's own PendingTree.
+ */
+public class PagePartitionMetaIOV2 extends PagePartitionMetaIO {
+    /** */
+    private static final int PENDING_TREE_ROOT_OFF = PagePartitionMetaIO.END_OF_PARTITION_PAGE_META;
+
+    /**
+     * @param ver Version.
+     */
+    public PagePartitionMetaIOV2(int ver) {
+        super(ver);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void initNewPage(long pageAddr, long pageId, int pageSize) {
+        super.initNewPage(pageAddr, pageId, pageSize);
+
+        setPendingTreeRoot(pageAddr, 0L);
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getPendingTreeRoot(long pageAddr) {
+        return PageUtils.getLong(pageAddr, PENDING_TREE_ROOT_OFF);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setPendingTreeRoot(long pageAddr, long treeRoot) {
+        PageUtils.putLong(pageAddr, PENDING_TREE_ROOT_OFF, treeRoot);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void printPage(long pageAddr, int pageSize, GridStringBuilder sb) throws IgniteCheckedException {
+        byte state = getPartitionState(pageAddr);
+
+        sb.a("PagePartitionMeta[\n\ttreeRoot=").a(getReuseListRoot(pageAddr));
+        sb.a(",\n\tpendingTreeRoot=").a(getLastSuccessfulFullSnapshotId(pageAddr));
+        sb.a(",\n\tlastSuccessfulFullSnapshotId=").a(getLastSuccessfulFullSnapshotId(pageAddr));
+        sb.a(",\n\tlastSuccessfulSnapshotId=").a(getLastSuccessfulSnapshotId(pageAddr));
+        sb.a(",\n\tnextSnapshotTag=").a(getNextSnapshotTag(pageAddr));
+        sb.a(",\n\tlastSuccessfulSnapshotTag=").a(getLastSuccessfulSnapshotTag(pageAddr));
+        sb.a(",\n\tlastAllocatedPageCount=").a(getLastAllocatedPageCount(pageAddr));
+        sb.a(",\n\tcandidatePageCount=").a(getCandidatePageCount(pageAddr));
+        sb.a(",\n\tsize=").a(getSize(pageAddr));
+        sb.a(",\n\tupdateCounter=").a(getUpdateCounter(pageAddr));
+        sb.a(",\n\tglobalRemoveId=").a(getGlobalRemoveId(pageAddr));
+        sb.a(",\n\tpartitionState=").a(state).a("(").a(GridDhtPartitionState.fromOrdinal(state)).a(")");
+        sb.a(",\n\tcountersPageId=").a(getCountersPageId(pageAddr));
+        sb.a("\n]");
+    }
+
+    /**
+     * Upgrade page to PagePartitionMetaIOV2
+     *
+     * @param pageAddr Page address.
+     */
+    public void upgradePage(long pageAddr) {
+        assert PageIO.getType(pageAddr) == getType();
+        assert PageIO.getVersion(pageAddr) < 2;
+
+        PageIO.setVersion(pageAddr, getVersion());
+        setPendingTreeRoot(pageAddr, 0);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionsStateValidatorSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionsStateValidatorSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionsStateValidatorSelfTest.java
index 43a2303..ebe6f29 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionsStateValidatorSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionsStateValidatorSelfTest.java
@@ -55,10 +55,11 @@ public class GridCachePartitionsStateValidatorSelfTest extends GridCommonAbstrac
         Mockito.when(topologyMock.partitions()).thenReturn(3);
 
         List<GridDhtLocalPartition> localPartitions = Lists.newArrayList(
-                partitionMock(0, 1, 1),
-                partitionMock(1, 2, 2),
-                partitionMock(2, 3, 3)
+            partitionMock(0, 1, 1),
+            partitionMock(1, 2, 2),
+            partitionMock(2, 3, 3)
         );
+
         Mockito.when(topologyMock.localPartitions()).thenReturn(localPartitions);
         Mockito.when(topologyMock.currentLocalPartitions()).thenReturn(localPartitions);
     }
@@ -82,10 +83,13 @@ public class GridCachePartitionsStateValidatorSelfTest extends GridCommonAbstrac
      */
     private GridDhtPartitionsSingleMessage from(@Nullable Map<Integer, T2<Long, Long>> countersMap, @Nullable Map<Integer, Long> sizesMap) {
         GridDhtPartitionsSingleMessage msg = new GridDhtPartitionsSingleMessage();
+
         if (countersMap != null)
             msg.addPartitionUpdateCounters(0, countersMap);
+
         if (sizesMap != null)
             msg.addPartitionSizes(0, sizesMap);
+
         return msg;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsContinuousRestartTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsContinuousRestartTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsContinuousRestartTest.java
index 99614ed..a02ed11 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsContinuousRestartTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsContinuousRestartTest.java
@@ -17,32 +17,35 @@
 
 package org.apache.ignite.internal.processors.cache.persistence;
 
+import java.io.Serializable;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Random;
 import java.util.TreeMap;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
-import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteDataStreamer;
 import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.DataRegionConfiguration;
 import org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.WALMode;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 
 /**
- *
+ * Cause by https://issues.apache.org/jira/browse/IGNITE-7278
  */
 public class IgnitePdsContinuousRestartTest extends GridCommonAbstractTest {
     /** */
@@ -52,7 +55,7 @@ public class IgnitePdsContinuousRestartTest extends GridCommonAbstractTest {
     private static final int ENTRIES_COUNT = 10_000;
 
     /** */
-    public static final String CACHE_NAME = "cache1";
+    protected static final String CACHE_NAME = "cache1";
 
     /** Checkpoint delay. */
     private volatile int checkpointDelay = -1;
@@ -79,21 +82,23 @@ public class IgnitePdsContinuousRestartTest extends GridCommonAbstractTest {
 
         DataStorageConfiguration memCfg = new DataStorageConfiguration()
             .setDefaultDataRegionConfiguration(
-                new DataRegionConfiguration().setMaxSize(400 * 1024 * 1024).setPersistenceEnabled(true))
+                new DataRegionConfiguration()
+                    .setMaxSize(400 * 1024 * 1024)
+                    .setPersistenceEnabled(true))
             .setWalMode(WALMode.LOG_ONLY)
             .setCheckpointFrequency(checkpointDelay);
 
         cfg.setDataStorageConfiguration(memCfg);
 
-        CacheConfiguration ccfg1 = new CacheConfiguration();
+        CacheConfiguration ccfg = new CacheConfiguration();
 
-        ccfg1.setName(CACHE_NAME);
-        ccfg1.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
-        ccfg1.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
-        ccfg1.setAffinity(new RendezvousAffinityFunction(false, 128));
-        ccfg1.setBackups(2);
+        ccfg.setName(CACHE_NAME);
+        ccfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
+        ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+        ccfg.setAffinity(new RendezvousAffinityFunction(false, 128));
+        ccfg.setBackups(2);
 
-        cfg.setCacheConfiguration(ccfg1);
+        cfg.setCacheConfiguration(ccfg);
 
         return cfg;
     }
@@ -197,7 +202,6 @@ public class IgnitePdsContinuousRestartTest extends GridCommonAbstractTest {
     }
 
     /**
-     *
      * @throws Exception if failed.
      */
     public void testRebalncingDuringLoad_10_10_1_1() throws Exception {
@@ -205,7 +209,6 @@ public class IgnitePdsContinuousRestartTest extends GridCommonAbstractTest {
     }
 
     /**
-     *
      * @throws Exception if failed.
      */
     public void testRebalncingDuringLoad_10_500_8_16() throws Exception {
@@ -227,7 +230,7 @@ public class IgnitePdsContinuousRestartTest extends GridCommonAbstractTest {
 
         final Ignite load = ignite(0);
 
-        load.active(true);
+        load.cluster().active(true);
 
         try (IgniteDataStreamer<Object, Object> s = load.dataStreamer(CACHE_NAME)) {
             s.allowOverwrite(true);
@@ -245,10 +248,13 @@ public class IgnitePdsContinuousRestartTest extends GridCommonAbstractTest {
                 Random rnd = ThreadLocalRandom.current();
 
                 while (!done.get()) {
-                    Map<Integer, Integer> map = new TreeMap<>();
+                    Map<Integer, Person> map = new TreeMap<>();
 
-                    for (int i = 0; i < batch; i++)
-                        map.put(rnd.nextInt(ENTRIES_COUNT), rnd.nextInt());
+                    for (int i = 0; i < batch; i++) {
+                        int key = rnd.nextInt(ENTRIES_COUNT);
+
+                        map.put(key, new Person("fn" + key, "ln" + key));
+                    }
 
                     cache.putAll(map);
                 }
@@ -277,4 +283,51 @@ public class IgnitePdsContinuousRestartTest extends GridCommonAbstractTest {
 
         busyFut.get();
     }
+
+    /**
+     *
+     */
+    static class Person implements Serializable {
+        /** */
+        @GridToStringInclude
+        @QuerySqlField(index = true, groups = "full_name")
+        private String fName;
+
+        /** */
+        @GridToStringInclude
+        @QuerySqlField(index = true, groups = "full_name")
+        private String lName;
+
+        /**
+         * @param fName First name.
+         * @param lName Last name.
+         */
+        public Person(String fName, String lName) {
+            this.fName = fName;
+            this.lName = lName;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(Person.class, this);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            IgnitePersistentStoreCacheGroupsTest.Person person = (IgnitePersistentStoreCacheGroupsTest.Person)o;
+
+            return Objects.equals(fName, person.fName) && Objects.equals(lName, person.lName);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return Objects.hash(fName, lName);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsContinuousRestartTest2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsContinuousRestartTest2.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsContinuousRestartTest2.java
deleted file mode 100644
index 66b2047..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsContinuousRestartTest2.java
+++ /dev/null
@@ -1,281 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.persistence;
-
-import java.util.Map;
-import java.util.Random;
-import java.util.TreeMap;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteCache;
-import org.apache.ignite.IgniteDataStreamer;
-import org.apache.ignite.cache.CacheAtomicityMode;
-import org.apache.ignite.cache.CacheWriteSynchronizationMode;
-import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
-import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.configuration.DataRegionConfiguration;
-import org.apache.ignite.configuration.DataStorageConfiguration;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.configuration.WALMode;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.testframework.GridTestUtils;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-
-/**
- * Cause by https://issues.apache.org/jira/browse/IGNITE-7278
- */
-public class IgnitePdsContinuousRestartTest2 extends GridCommonAbstractTest {
-    /** */
-    private static final int GRID_CNT = 4;
-
-    /** */
-    private static final int ENTRIES_COUNT = 10_000;
-
-    /** */
-    public static final String CACHE_NAME = "cache1";
-
-    /** Checkpoint delay. */
-    private volatile int checkpointDelay = -1;
-
-    /** */
-    private boolean cancel;
-
-    /**
-     * Default constructor.
-     */
-    public IgnitePdsContinuousRestartTest2() {
-
-    }
-
-    /**
-     * @param cancel Cancel.
-     */
-    public IgnitePdsContinuousRestartTest2(boolean cancel) {
-        this.cancel = cancel;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
-        IgniteConfiguration cfg = super.getConfiguration(gridName);
-
-        DataStorageConfiguration memCfg = new DataStorageConfiguration()
-            .setDefaultDataRegionConfiguration(
-                new DataRegionConfiguration()
-                    .setMaxSize(400 * 1024 * 1024)
-                    .setPersistenceEnabled(true))
-            .setWalMode(WALMode.LOG_ONLY)
-            .setCheckpointFrequency(checkpointDelay);
-
-        cfg.setDataStorageConfiguration(memCfg);
-
-        CacheConfiguration ccfg = new CacheConfiguration();
-
-        ccfg.setName(CACHE_NAME);
-        ccfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
-        ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
-        ccfg.setAffinity(new RendezvousAffinityFunction(false, 128));
-        ccfg.setBackups(2);
-
-        cfg.setCacheConfiguration(ccfg);
-
-        return cfg;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void beforeTestsStarted() throws Exception {
-        stopAllGrids();
-
-        cleanPersistenceDir();
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void afterTest() throws Exception {
-        stopAllGrids();
-
-        cleanPersistenceDir();
-    }
-
-    /**
-     * @throws Exception if failed.
-     */
-    public void testRebalancingDuringLoad_1000_500_1_1() throws Exception {
-        checkRebalancingDuringLoad(1000, 500, 1, 1);
-    }
-
-    /**
-     * @throws Exception if failed.
-     */
-    public void testRebalancingDuringLoad_8000_500_1_1() throws Exception {
-        checkRebalancingDuringLoad(8000, 500, 1, 1);
-    }
-
-    /**
-     * @throws Exception if failed.
-     */
-    public void testRebalancingDuringLoad_1000_20000_1_1() throws Exception {
-        checkRebalancingDuringLoad(1000, 20000, 1, 1);
-    }
-
-    /**
-     * @throws Exception if failed.
-     */
-    public void testRebalancingDuringLoad_8000_8000_1_1() throws Exception {
-        checkRebalancingDuringLoad(8000, 8000, 1, 1);
-    }
-
-    /**
-     * @throws Exception if failed.
-     */
-    public void testRebalancingDuringLoad_1000_500_8_1() throws Exception {
-        checkRebalancingDuringLoad(1000, 500, 8, 1);
-    }
-
-    /**
-     * @throws Exception if failed.
-     */
-    public void testRebalancingDuringLoad_8000_500_8_1() throws Exception {
-        checkRebalancingDuringLoad(8000, 500, 8, 1);
-    }
-
-    /**
-     * @throws Exception if failed.
-     */
-    public void testRebalancingDuringLoad_1000_20000_8_1() throws Exception {
-        checkRebalancingDuringLoad(1000, 20000, 8, 1);
-    }
-
-    /**
-     * @throws Exception if failed.
-     */
-    public void testRebalancingDuringLoad_8000_8000_8_1() throws Exception {
-        checkRebalancingDuringLoad(8000, 8000, 8, 1);
-    }
-
-    /**
-     * @throws Exception if failed.
-     */
-    public void testRebalancingDuringLoad_1000_500_8_16() throws Exception {
-        checkRebalancingDuringLoad(1000, 500, 8, 16);
-    }
-
-    /**
-     * @throws Exception if failed.
-     */
-    public void testRebalancingDuringLoad_8000_500_8_16() throws Exception {
-        checkRebalancingDuringLoad(8000, 500, 8, 16);
-    }
-
-    /**
-     * @throws Exception if failed.
-     */
-    public void testRebalancingDuringLoad_1000_20000_8_16() throws Exception {
-        checkRebalancingDuringLoad(1000, 20000, 8, 16);
-    }
-
-    /**
-     * @throws Exception if failed.
-     */
-    public void testRebalancingDuringLoad_8000_8000_8_16() throws Exception {
-        checkRebalancingDuringLoad(8000, 8000, 8, 16);
-    }
-
-    /**
-     *
-     * @throws Exception if failed.
-     */
-    public void testRebalncingDuringLoad_10_10_1_1() throws Exception {
-        checkRebalancingDuringLoad(10, 10, 1, 1);
-    }
-
-    /**
-     *
-     * @throws Exception if failed.
-     */
-    public void testRebalncingDuringLoad_10_500_8_16() throws Exception {
-        checkRebalancingDuringLoad(10, 500, 8, 16);
-    }
-
-    /**
-     * @throws Exception if failed.
-     */
-    private void checkRebalancingDuringLoad(
-        int restartDelay,
-        int checkpointDelay,
-        int threads,
-        final int batch
-    ) throws Exception {
-        this.checkpointDelay = checkpointDelay;
-
-        startGrids(GRID_CNT);
-
-        final Ignite load = ignite(0);
-
-        load.cluster().active(true);
-
-        try (IgniteDataStreamer<Object, Object> s = load.dataStreamer(CACHE_NAME)) {
-            s.allowOverwrite(true);
-
-            for (int i = 0; i < ENTRIES_COUNT; i++)
-                s.addData(i, i);
-        }
-
-        final AtomicBoolean done = new AtomicBoolean(false);
-
-        IgniteInternalFuture<?> busyFut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() {
-            /** {@inheritDoc} */
-            @Override public Object call() throws Exception {
-                IgniteCache<Object, Object> cache = load.cache(CACHE_NAME);
-                Random rnd = ThreadLocalRandom.current();
-
-                while (!done.get()) {
-                    Map<Integer, Integer> map = new TreeMap<>();
-
-                    for (int i = 0; i < batch; i++)
-                        map.put(rnd.nextInt(ENTRIES_COUNT), rnd.nextInt());
-
-                    cache.putAll(map);
-                }
-
-                return null;
-            }
-        }, threads, "updater");
-
-        long end = System.currentTimeMillis() + 90_000;
-
-        Random rnd = ThreadLocalRandom.current();
-
-        while (System.currentTimeMillis() < end) {
-            int idx = rnd.nextInt(GRID_CNT - 1) + 1;
-
-            stopGrid(idx, cancel);
-
-            U.sleep(restartDelay);
-
-            startGrid(idx);
-
-            U.sleep(restartDelay);
-        }
-
-        done.set(true);
-
-        busyFut.get();
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsContinuousRestartTestWithExpiryPolicy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsContinuousRestartTestWithExpiryPolicy.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsContinuousRestartTestWithExpiryPolicy.java
new file mode 100644
index 0000000..d5b3f55
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsContinuousRestartTestWithExpiryPolicy.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence;
+
+import java.util.concurrent.TimeUnit;
+import javax.cache.expiry.CreatedExpiryPolicy;
+import javax.cache.expiry.Duration;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+
+/**
+ * Cause by https://issues.apache.org/jira/browse/IGNITE-5879
+ */
+public class IgnitePdsContinuousRestartTestWithExpiryPolicy extends IgnitePdsContinuousRestartTest {
+    /** Ip finder. */
+    private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /**
+     * Default constructor.
+     */
+    public IgnitePdsContinuousRestartTestWithExpiryPolicy() {
+        super(false);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        TcpDiscoverySpi discoverySpi = (TcpDiscoverySpi)cfg.getDiscoverySpi();
+        discoverySpi.setIpFinder(ipFinder);
+
+        CacheConfiguration ccfg = new CacheConfiguration();
+
+        ccfg.setName(CACHE_NAME);
+        ccfg.setGroupName("Group1");
+        ccfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
+        ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+        ccfg.setAffinity(new RendezvousAffinityFunction(false, 128));
+        ccfg.setBackups(2);
+        ccfg.setExpiryPolicyFactory(CreatedExpiryPolicy.factoryOf(new Duration(TimeUnit.SECONDS, 1)));
+
+        cfg.setCacheConfiguration(ccfg);
+
+        return cfg;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/baseline/IgniteBaselineAbstractFullApiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/baseline/IgniteBaselineAbstractFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/baseline/IgniteBaselineAbstractFullApiSelfTest.java
index 1825666..03dc445 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/baseline/IgniteBaselineAbstractFullApiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/baseline/IgniteBaselineAbstractFullApiSelfTest.java
@@ -33,7 +33,7 @@ public abstract class IgniteBaselineAbstractFullApiSelfTest extends GridCacheAbs
         cfg.setDataStorageConfiguration(new DataStorageConfiguration()
             .setDefaultDataRegionConfiguration(
                 new DataRegionConfiguration()
-                    .setMaxSize(200 * 1024 * 1024)
+                    .setMaxSize(256 * 1024 * 1024)
                     .setPersistenceEnabled(true))
             .setWalMode(WALMode.LOG_ONLY));
 
@@ -41,6 +41,13 @@ public abstract class IgniteBaselineAbstractFullApiSelfTest extends GridCacheAbs
     }
 
     /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        super.afterTestsStopped();
+
+        cleanPersistenceDir();
+    }
+
+    /** {@inheritDoc} */
     @Override protected int gridCount() {
         return 4;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsWithTtlTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsWithTtlTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsWithTtlTest.java
new file mode 100644
index 0000000..be09e70
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsWithTtlTest.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.db;
+
+import com.google.common.base.Strings;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+import javax.cache.expiry.AccessedExpiryPolicy;
+import javax.cache.expiry.Duration;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheRebalanceMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
+import org.apache.ignite.internal.util.typedef.PA;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ * Test TTL worker with persistence enabled
+ */
+public class IgnitePdsWithTtlTest extends GridCommonAbstractTest {
+    /** */
+    public static final String CACHE = "expirableCache";
+
+    /** */
+    private static final int EXPIRATION_TIMEOUT = 10;
+
+    /** */
+    public static final int ENTRIES = 7000;
+
+    /** */
+    private static final TcpDiscoveryVmIpFinder FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        cleanPersistenceDir();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        //protection if test failed to finish, e.g. by error
+        stopAllGrids();
+
+        cleanPersistenceDir();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        final IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        TcpDiscoverySpi disco = new TcpDiscoverySpi();
+        disco.setIpFinder(FINDER);
+
+        cfg.setDiscoverySpi(disco);
+
+        final CacheConfiguration ccfg = new CacheConfiguration();
+        ccfg.setName(CACHE);
+        ccfg.setAffinity(new RendezvousAffinityFunction(false, 128));
+        ccfg.setExpiryPolicyFactory(AccessedExpiryPolicy.factoryOf(new Duration(TimeUnit.SECONDS, EXPIRATION_TIMEOUT)));
+        ccfg.setEagerTtl(true);
+        ccfg.setGroupName("group1");
+
+        ccfg.setRebalanceMode(CacheRebalanceMode.SYNC);
+        cfg.setDataStorageConfiguration(
+            new DataStorageConfiguration()
+                .setDefaultDataRegionConfiguration(
+                    new DataRegionConfiguration()
+                        .setMaxSize(256L * 1024 * 1024)
+                        .setPersistenceEnabled(true)
+                ).setWalMode(WALMode.DEFAULT));
+
+        cfg.setCacheConfiguration(ccfg);
+        return cfg;
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testTtlIsApplied() throws Exception {
+        loadAndWaitForCleanup(false);
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testTtlIsAppliedAfterRestart() throws Exception {
+        loadAndWaitForCleanup(true);
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    private void loadAndWaitForCleanup(boolean restartGrid) throws Exception {
+        IgniteEx srv = startGrid(0);
+        srv.cluster().active(true);
+
+        fillCache(srv.cache(CACHE));
+
+        if (restartGrid) {
+            stopGrid(0);
+            srv = startGrid(0);
+            srv.cluster().active(true);
+        }
+
+        final IgniteCache<Integer, String> cache = srv.cache(CACHE);
+
+        pringStatistics((IgniteCacheProxy)cache, "After restart from LFS");
+
+        waitAndCheckExpired(cache);
+
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testRebalancingWithTtlExpirable() throws Exception {
+        IgniteEx srv = startGrid(0);
+        srv.cluster().active(true);
+
+        fillCache(srv.cache(CACHE));
+
+        //causes rebalancing start
+        srv = startGrid(1);
+
+        final IgniteCache<Integer, String> cache = srv.cache(CACHE);
+
+        pringStatistics((IgniteCacheProxy)cache, "After rebalancing start");
+
+        waitAndCheckExpired(cache);
+
+        stopAllGrids();
+    }
+
+    /** */
+    protected void fillCache(IgniteCache<Integer, String> cache) {
+        cache.putAll(new TreeMap<Integer, String>() {{
+            for (int i = 0; i < ENTRIES; i++)
+                put(i, Strings.repeat("Some value " + i, 125));
+        }});
+
+        //Touch entries.
+        for (int i = 0; i < ENTRIES; i++)
+            cache.get(i); // touch entries
+
+        pringStatistics((IgniteCacheProxy)cache, "After cache puts");
+    }
+
+    /** */
+    protected void waitAndCheckExpired(final IgniteCache<Integer, String> cache) throws IgniteInterruptedCheckedException {
+        GridTestUtils.waitForCondition(new PA() {
+            @Override public boolean apply() {
+                return cache.size() == 0;
+            }
+        }, TimeUnit.SECONDS.toMillis(EXPIRATION_TIMEOUT + 1));
+
+        pringStatistics((IgniteCacheProxy)cache, "After timeout");
+
+        for (int i = 0; i < ENTRIES; i++)
+            assertNull(cache.get(i));
+    }
+
+    /** */
+    private void pringStatistics(IgniteCacheProxy cache, String msg) {
+        System.out.println(msg + " {{");
+        cache.context().printMemoryStats();
+        System.out.println("}} " + msg);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java
index 1e32320..ab81d8f 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java
@@ -26,6 +26,7 @@ import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsDynamicC
 import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsSingleNodePutGetPersistenceTest;
 import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsCacheRestoreTest;
 import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsDataRegionMetricsTest;
+import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsWithTtlTest;
 import org.apache.ignite.internal.processors.cache.persistence.db.file.DefaultPageSizeBackwardsCompatibilityTest;
 import org.apache.ignite.internal.processors.cache.persistence.db.file.IgnitePdsCheckpointSimulationWithRealCpDisabledTest;
 import org.apache.ignite.internal.processors.cache.persistence.db.file.IgnitePdsPageReplacementTest;
@@ -111,6 +112,7 @@ public class IgnitePdsTestSuite extends TestSuite {
         // TODO uncomment when https://issues.apache.org/jira/browse/IGNITE-7510 is fixed
         // suite.addTestSuite(IgnitePdsClientNearCachePutGetTest.class);
         suite.addTestSuite(IgniteDbPutGetWithCacheStoreTest.class);
+        suite.addTestSuite(IgnitePdsWithTtlTest.class);
 
         suite.addTestSuite(IgniteClusterActivateDeactivateTestWithPersistence.class);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
index 3f6f713..76cfe4f 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
@@ -20,7 +20,7 @@ package org.apache.ignite.testsuites;
 import junit.framework.TestSuite;
 import org.apache.ignite.internal.processors.cache.persistence.IgniteDataStorageMetricsSelfTest;
 import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsContinuousRestartTest;
-import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsContinuousRestartTest2;
+import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsContinuousRestartTestWithExpiryPolicy;
 import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsContinuousRestartTestWithSharedGroupAndIndexes;
 import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsCorruptedCacheDataTest;
 import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsCorruptedStoreTest;
@@ -90,7 +90,7 @@ public class IgnitePdsTestSuite2 extends TestSuite {
 
         // Rebalancing test
         suite.addTestSuite(IgnitePdsContinuousRestartTest.class);
-        suite.addTestSuite(IgnitePdsContinuousRestartTest2.class);
+        suite.addTestSuite(IgnitePdsContinuousRestartTestWithExpiryPolicy.class);
 
         suite.addTestSuite(IgnitePdsContinuousRestartTestWithSharedGroupAndIndexes.class);
 
@@ -115,7 +115,6 @@ public class IgnitePdsTestSuite2 extends TestSuite {
 
         suite.addTestSuite(IgnitePdsWholeClusterRestartTest.class);
 
-
         // Rebalancing test
         suite.addTestSuite(IgniteWalHistoryReservationsTest.class);
 


[2/2] ignite git commit: IGNITE-5874 Store TTL expire times in B+ tree on per-partition basis - Fixes #3231.

Posted by ir...@apache.org.
IGNITE-5874 Store TTL expire times in B+ tree on per-partition basis - Fixes #3231.

Signed-off-by: Ivan Rakov <ir...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/89c77573
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/89c77573
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/89c77573

Branch: refs/heads/master
Commit: 89c775737936645eaf739b494cc1740cd9605095
Parents: 01f6054
Author: Andrey V. Mashenkov <an...@gmail.com>
Authored: Fri May 11 18:45:38 2018 +0300
Committer: Ivan Rakov <ir...@apache.org>
Committed: Fri May 11 18:45:38 2018 +0300

----------------------------------------------------------------------
 .../JettyRestProcessorAbstractSelfTest.java     |  21 -
 .../PdsWithTtlCompatibilityTest.java            | 191 +++++++++
 .../IgniteCompatibilityBasicTestSuite.java      |   3 +
 .../apache/ignite/IgniteSystemProperties.java   |  11 +
 .../delta/MetaPageUpdateLastAllocatedIndex.java |   6 +-
 .../processors/cache/CacheGroupContext.java     |  16 +-
 .../processors/cache/GridCacheMapEntry.java     |  41 +-
 .../cache/IgniteCacheOffheapManager.java        |  21 +-
 .../cache/IgniteCacheOffheapManagerImpl.java    | 128 +++---
 .../distributed/dht/GridDhtLocalPartition.java  | 156 +++----
 .../dht/GridDhtPartitionsStateValidator.java    |   1 +
 .../GridDhtPartitionsExchangeFuture.java        |  14 +-
 .../persistence/GridCacheOffheapManager.java    | 415 ++++++++++++++-----
 .../UpgradePendingTreeToPerPartitionTask.java   | 380 +++++++++++++++++
 .../cache/persistence/tree/io/PageIO.java       |  11 +-
 .../tree/io/PagePartitionMetaIO.java            |  35 +-
 .../tree/io/PagePartitionMetaIOV2.java          |  90 ++++
 ...idCachePartitionsStateValidatorSelfTest.java |  10 +-
 .../IgnitePdsContinuousRestartTest.java         |  89 +++-
 .../IgnitePdsContinuousRestartTest2.java        | 281 -------------
 ...dsContinuousRestartTestWithExpiryPolicy.java |  67 +++
 .../IgniteBaselineAbstractFullApiSelfTest.java  |   9 +-
 .../persistence/db/IgnitePdsWithTtlTest.java    | 197 +++++++++
 .../ignite/testsuites/IgnitePdsTestSuite.java   |   2 +
 .../ignite/testsuites/IgnitePdsTestSuite2.java  |   5 +-
 25 files changed, 1582 insertions(+), 618 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java
index 0285f3a..96391e9 100644
--- a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java
@@ -978,27 +978,10 @@ public abstract class JettyRestProcessorAbstractSelfTest extends AbstractRestPro
         assertCacheOperation(ret, true);
     }
 
-    /** */
-    private void failIgnite_5874() {
-        DataStorageConfiguration dsCfg = ignite(0).configuration().getDataStorageConfiguration();
-
-        if (dsCfg.getDefaultDataRegionConfiguration().isPersistenceEnabled())
-            fail("IGNITE-5874");
-
-        if (!F.isEmpty(dsCfg.getDataRegionConfigurations())) {
-            for (DataRegionConfiguration dataRegCfg : dsCfg.getDataRegionConfigurations()) {
-                if (dataRegCfg.isPersistenceEnabled())
-                    fail("IGNITE-5874");
-            }
-        }
-    }
-
     /**
      * @throws Exception If failed.
      */
     public void testPutWithExpiration() throws Exception {
-        failIgnite_5874();
-
         String ret = content(DEFAULT_CACHE_NAME, GridRestCommand.CACHE_PUT,
             "key", "putKey",
             "val", "putVal",
@@ -1035,8 +1018,6 @@ public abstract class JettyRestProcessorAbstractSelfTest extends AbstractRestPro
      * @throws Exception If failed.
      */
     public void testAddWithExpiration() throws Exception {
-        failIgnite_5874();
-
         String ret = content(DEFAULT_CACHE_NAME, GridRestCommand.CACHE_ADD,
             "key", "addKey",
             "val", "addVal",
@@ -1176,8 +1157,6 @@ public abstract class JettyRestProcessorAbstractSelfTest extends AbstractRestPro
      * @throws Exception If failed.
      */
     public void testReplaceWithExpiration() throws Exception {
-        failIgnite_5874();
-
         jcache().put("replaceKey", "replaceVal");
 
         assertEquals("replaceVal", jcache().get("replaceKey"));

http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/compatibility/src/test/java/org/apache/ignite/compatibility/PdsWithTtlCompatibilityTest.java
----------------------------------------------------------------------
diff --git a/modules/compatibility/src/test/java/org/apache/ignite/compatibility/PdsWithTtlCompatibilityTest.java b/modules/compatibility/src/test/java/org/apache/ignite/compatibility/PdsWithTtlCompatibilityTest.java
new file mode 100644
index 0000000..f3649f6
--- /dev/null
+++ b/modules/compatibility/src/test/java/org/apache/ignite/compatibility/PdsWithTtlCompatibilityTest.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.compatibility;
+
+import java.util.Collection;
+import java.util.concurrent.TimeUnit;
+import javax.cache.Cache;
+import javax.cache.expiry.AccessedExpiryPolicy;
+import javax.cache.expiry.Duration;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.compatibility.persistence.IgnitePersistenceCompatibilityAbstractTest;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.PersistentStoreConfiguration;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.processors.cache.GridCacheAbstractFullApiSelfTest;
+import org.apache.ignite.internal.processors.cache.persistence.migration.UpgradePendingTreeToPerPartitionTask;
+import org.apache.ignite.internal.util.typedef.PA;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.testframework.GridTestUtils;
+
+/**
+ * Test PendingTree upgrading to per-partition basis. Test fill cache with persistence enabled and with ExpirePolicy
+ * configured on ignite-2.1 version and check if entries will be correctly expired when a new version node started.
+ *
+ * Note: Test for ignite-2.3 version will always fails due to entry ttl update fails with assertion on checkpoint lock
+ * check.
+ */
+public class PdsWithTtlCompatibilityTest extends IgnitePersistenceCompatibilityAbstractTest {
+    /** */
+    static final String TEST_CACHE_NAME = PdsWithTtlCompatibilityTest.class.getSimpleName();
+
+    /** */
+    static final int DURATION_SEC = 10;
+
+    /** */
+    private static final int ENTRIES_CNT = 100;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        cfg.setPeerClassLoadingEnabled(false);
+
+        cfg.setDataStorageConfiguration(
+            new DataStorageConfiguration()
+                .setDefaultDataRegionConfiguration(
+                    new DataRegionConfiguration()
+                        .setMaxSize(32L * 1024 * 1024)
+                        .setPersistenceEnabled(true)
+                ).setWalMode(WALMode.LOG_ONLY));
+
+        return cfg;
+    }
+
+    /**
+     * Tests opportunity to read data from previous Ignite DB version.
+     *
+     * @throws Exception If failed.
+     */
+    public void testNodeStartByOldVersionPersistenceData_2_1() throws Exception {
+        doTestStartupWithOldVersion("2.1.0");
+    }
+
+    /**
+     * Tests opportunity to read data from previous Ignite DB version.
+     *
+     * @param igniteVer 3-digits version of ignite
+     * @throws Exception If failed.
+     */
+    protected void doTestStartupWithOldVersion(String igniteVer) throws Exception {
+        try {
+            startGrid(1, igniteVer, new ConfigurationClosure(), new PostStartupClosure());
+
+            stopAllGrids();
+
+            IgniteEx ignite = startGrid(0);
+
+            assertEquals(1, ignite.context().discovery().topologyVersion());
+
+            ignite.active(true);
+
+            validateResultingCacheData(ignite, ignite.cache(TEST_CACHE_NAME));
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * @param cache to be filled by different keys and values. Results may be validated in {@link
+     * #validateResultingCacheData(Ignite, IgniteCache)}.
+     */
+    public static void saveCacheData(Cache<Object, Object> cache) {
+        for (int i = 0; i < ENTRIES_CNT; i++)
+            cache.put(i, "data-" + i);
+
+        //Touch
+        for (int i = 0; i < ENTRIES_CNT; i++)
+            assertNotNull(cache.get(i));
+    }
+
+    /**
+     * Asserts cache contained all expected values as it was saved before.
+     *
+     * @param cache cache should be filled using {@link #saveCacheData(Cache)}.
+     */
+    public static void validateResultingCacheData(Ignite ignite,
+        IgniteCache<Object, Object> cache) throws IgniteInterruptedCheckedException {
+
+        final long expireTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(DURATION_SEC + 1);
+
+        final IgniteFuture<Collection<Boolean>> future = ignite.compute().broadcastAsync(new UpgradePendingTreeToPerPartitionTask());
+
+        GridTestUtils.waitForCondition(new PA() {
+            @Override public boolean apply() {
+                return future.isDone() && expireTime < System.currentTimeMillis();
+            }
+        }, TimeUnit.SECONDS.toMillis(DURATION_SEC + 2));
+
+        for (Boolean res : future.get())
+            assertTrue(res);
+
+        for (int i = 0; i < ENTRIES_CNT; i++)
+            assertNull(cache.get(i));
+    }
+
+    /** */
+    public static class ConfigurationClosure implements IgniteInClosure<IgniteConfiguration> {
+        /** {@inheritDoc} */
+        @Override public void apply(IgniteConfiguration cfg) {
+            cfg.setLocalHost("127.0.0.1");
+
+            TcpDiscoverySpi disco = new TcpDiscoverySpi();
+            disco.setIpFinder(GridCacheAbstractFullApiSelfTest.LOCAL_IP_FINDER);
+
+            cfg.setDiscoverySpi(disco);
+
+            cfg.setPeerClassLoadingEnabled(false);
+
+            cfg.setPersistentStoreConfiguration(new PersistentStoreConfiguration().setWalMode(WALMode.LOG_ONLY));
+        }
+    }
+
+    /** */
+    public static class PostStartupClosure implements IgniteInClosure<Ignite> {
+        /** {@inheritDoc} */
+        @Override public void apply(Ignite ignite) {
+            ignite.active(true);
+
+            CacheConfiguration<Object, Object> cacheCfg = new CacheConfiguration<>();
+            cacheCfg.setName(TEST_CACHE_NAME);
+            cacheCfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
+            cacheCfg.setBackups(1);
+            cacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+            cacheCfg.setExpiryPolicyFactory(AccessedExpiryPolicy.factoryOf(new Duration(TimeUnit.SECONDS, DURATION_SEC)));
+            cacheCfg.setEagerTtl(true);
+            cacheCfg.setGroupName("myGroup");
+
+            IgniteCache<Object, Object> cache = ignite.createCache(cacheCfg);
+
+            saveCacheData(cache);
+
+            ignite.active(false);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/compatibility/src/test/java/org/apache/ignite/compatibility/testsuites/IgniteCompatibilityBasicTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/compatibility/src/test/java/org/apache/ignite/compatibility/testsuites/IgniteCompatibilityBasicTestSuite.java b/modules/compatibility/src/test/java/org/apache/ignite/compatibility/testsuites/IgniteCompatibilityBasicTestSuite.java
index b526137..f6dd736 100644
--- a/modules/compatibility/src/test/java/org/apache/ignite/compatibility/testsuites/IgniteCompatibilityBasicTestSuite.java
+++ b/modules/compatibility/src/test/java/org/apache/ignite/compatibility/testsuites/IgniteCompatibilityBasicTestSuite.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.compatibility.testsuites;
 
 import junit.framework.TestSuite;
+import org.apache.ignite.compatibility.PdsWithTtlCompatibilityTest;
 import org.apache.ignite.compatibility.persistence.FoldersReuseCompatibilityTest;
 import org.apache.ignite.compatibility.persistence.MigratingToWalV2SerializerWithCompactionTest;
 import org.apache.ignite.compatibility.persistence.PersistenceBasicCompatibilityTest;
@@ -35,6 +36,8 @@ public class IgniteCompatibilityBasicTestSuite {
 
         suite.addTestSuite(PersistenceBasicCompatibilityTest.class);
 
+        suite.addTestSuite(PdsWithTtlCompatibilityTest.class);
+
         suite.addTestSuite(FoldersReuseCompatibilityTest.class);
 
         suite.addTestSuite(MigratingToWalV2SerializerWithCompactionTest.class);

http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index 008974c..727e809 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -888,6 +888,17 @@ public final class IgniteSystemProperties {
     public static final String IGNITE_DISABLE_WAL_DURING_REBALANCING = "IGNITE_DISABLE_WAL_DURING_REBALANCING";
 
     /**
+     * When set to {@code true}, Ignite will skip partitions sizes check on partition validation after rebalance has finished.
+     * Partitions sizes may differs on nodes when Expiry Policy is in use and it is ok due to lazy entry eviction mechanics.
+     *
+     * There is no need to disable partition size validation either in normal case or when expiry policy is configured for cache.
+     * But it should be disabled manually when policy is used on per entry basis to hint Ignite to skip this check.
+     *
+     * Default is {@code false}.
+     */
+    public static final String IGNITE_SKIP_PARTITION_SIZE_VALIDATION = "IGNITE_SKIP_PARTITION_SIZE_VALIDATION";
+
+    /**
      * Enforces singleton.
      */
     private IgniteSystemProperties() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/MetaPageUpdateLastAllocatedIndex.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/MetaPageUpdateLastAllocatedIndex.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/MetaPageUpdateLastAllocatedIndex.java
index 39f6a03..324227b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/MetaPageUpdateLastAllocatedIndex.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/MetaPageUpdateLastAllocatedIndex.java
@@ -41,9 +41,11 @@ public class MetaPageUpdateLastAllocatedIndex extends PageDeltaRecord {
 
     /** {@inheritDoc} */
     @Override public void applyDelta(PageMemory pageMem, long pageAddr) throws IgniteCheckedException {
-        assert PageIO.getType(pageAddr) == PageIO.T_META || PageIO.getType(pageAddr) == PageIO.T_PART_META;
+        int type = PageIO.getType(pageAddr);
 
-        PageMetaIO io = PageMetaIO.VERSIONS.forVersion(PageIO.getVersion(pageAddr));
+        assert type == PageIO.T_META || type == PageIO.T_PART_META;
+
+        PageMetaIO io = PageIO.getPageIO(type, PageIO.getVersion(pageAddr));
 
         io.setLastAllocatedPageCount(pageAddr, lastAllocatedIdx);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
index 5f750d5..d1bdbb6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
@@ -39,9 +39,9 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentResponse;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionsEvictor;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopologyImpl;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionsEvictor;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader;
 import org.apache.ignite.internal.processors.cache.persistence.DataRegion;
 import org.apache.ignite.internal.processors.cache.persistence.GridCacheOffheapManager;
@@ -313,7 +313,7 @@ public class CacheGroupContext {
             drEnabled = true;
 
         this.caches = caches;
-   }
+    }
 
     /**
      * @param cctx Cache context.
@@ -372,8 +372,8 @@ public class CacheGroupContext {
         List<GridCacheContext> caches = this.caches;
 
         assert !sharedGroup() && caches.size() == 1 :
-            "stopping=" +  ctx.kernalContext().isStopping() + ", groupName=" + ccfg.getGroupName() +
-            ", caches=" + caches;
+            "stopping=" + ctx.kernalContext().isStopping() + ", groupName=" + ccfg.getGroupName() +
+                ", caches=" + caches;
 
         return caches.get(0);
     }
@@ -434,6 +434,7 @@ public class CacheGroupContext {
             }
         }
     }
+
     /**
      * Adds partition unload event.
      *
@@ -514,13 +515,6 @@ public class CacheGroupContext {
     }
 
     /**
-     * @return {@code True} if fast eviction is allowed.
-     */
-    public boolean allowFastEviction() {
-        return persistenceEnabled() && !queriesEnabled();
-    }
-
-    /**
      * @return {@code True} in case replication is enabled.
      */
     public boolean isDrEnabled() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 9f3686a..767c314 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -34,6 +34,7 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.eviction.EvictableEntry;
+import org.apache.ignite.internal.NodeStoppingException;
 import org.apache.ignite.internal.pagemem.wal.StorageException;
 import org.apache.ignite.internal.pagemem.wal.WALPointer;
 import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
@@ -401,7 +402,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             checkObsolete();
 
             if (isStartVersion() && ((flags & IS_UNSWAPPED_MASK) == 0)) {
-                assert row == null || row.key() == key: "Unexpected row key";
+                assert row == null || row.key() == key : "Unexpected row key";
 
                 CacheDataRow read = row == null ? cctx.offheap().read(this) : row;
 
@@ -1411,7 +1412,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
             if (readThrough && needVal && old == null &&
                 (cctx.readThrough() && (op == GridCacheOperation.TRANSFORM || cctx.loadPreviousValue()))) {
-                    old0 = readThrough(null, key, false, subjId, taskName);
+                old0 = readThrough(null, key, false, subjId, taskName);
 
                 old = cctx.toCacheObject(old0);
 
@@ -2462,7 +2463,14 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
         ttlAndExpireTimeExtras(ttl, expireTime);
 
-        storeValue(val, expireTime, ver, null);
+        cctx.shared().database().checkpointReadLock();
+
+        try {
+            storeValue(val, expireTime, ver, null);
+        }
+        finally {
+            cctx.shared().database().checkpointReadUnlock();
+        }
     }
 
     /**
@@ -3108,7 +3116,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             GridCacheMvcc mvcc = mvccExtras();
 
             return mvcc != null && mvcc.isLocallyOwnedByIdOrThread(lockVer, threadId);
-        } finally {
+        }
+        finally {
             unlockEntry();
         }
     }
@@ -3347,6 +3356,10 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                     obsolete = true;
             }
         }
+        catch (NodeStoppingException ignore) {
+            if (log.isDebugEnabled())
+                log.warning("Node is stopping while removing expired value.", ignore);
+        }
         catch (IgniteCheckedException e) {
             U.error(log, "Failed to clean up expired cache entry: " + this, e);
         }
@@ -3406,7 +3419,14 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
         if (log.isTraceEnabled())
             log.trace("onExpired clear [key=" + key + ", entry=" + System.identityHashCode(this) + ']');
 
-        removeValue();
+        cctx.shared().database().checkpointReadLock();
+
+        try {
+            removeValue();
+        }
+        finally {
+            cctx.shared().database().checkpointReadUnlock();
+        }
 
         if (cctx.events().isRecordable(EVT_CACHE_OBJECT_EXPIRED)) {
             cctx.events().addEvent(partition(),
@@ -3586,8 +3606,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
      * @param ver New entry version.
      * @param oldRow Old row if available.
      * @param predicate Optional predicate.
-     * @throws IgniteCheckedException If update failed.
+     *
      * @return {@code True} if storage was modified.
+     * @throws IgniteCheckedException If update failed.
      */
     protected boolean storeValue(
         @Nullable CacheObject val,
@@ -3599,7 +3620,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
         UpdateClosure closure = new UpdateClosure(this, val, ver, expireTime, predicate);
 
-        cctx.offheap().invoke(cctx, key,  localPartition(), closure);
+        cctx.offheap().invoke(cctx, key, localPartition(), closure);
 
         return closure.treeOp != IgniteTree.OperationType.NOOP;
     }
@@ -4051,7 +4072,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
     }
 
     /**
-     *  Increments public size of map.
+     * Increments public size of map.
      */
     protected void incrementMapPublicSize() {
         GridDhtLocalPartition locPart = localPartition();
@@ -4782,7 +4803,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
                 needUpdate = true;
             }
-            else if (updateExpireTime && expiryPlc != null && entry.val != null){
+            else if (updateExpireTime && expiryPlc != null && entry.val != null) {
                 long ttl = expiryPlc.forAccess();
 
                 if (ttl != CU.TTL_NOT_CHANGED) {
@@ -4929,7 +4950,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             if (entry.val == null) {
                 boolean new0 = entry.isStartVersion();
 
-                assert entry.deletedUnlocked() || new0 || entry.isInternal(): "Invalid entry [entry=" + entry +
+                assert entry.deletedUnlocked() || new0 || entry.isInternal() : "Invalid entry [entry=" + entry +
                     ", locNodeId=" + cctx.localNodeId() + ']';
 
                 if (!new0 && !entry.isInternal())

http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
index a12c033..fa25412 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
@@ -21,12 +21,13 @@ import java.util.Map;
 import javax.cache.Cache;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtDemandedPartitionsMap;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtDemandedPartitionsMap;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.cache.persistence.RootPage;
 import org.apache.ignite.internal.processors.cache.persistence.RowStore;
 import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseList;
+import org.apache.ignite.internal.processors.cache.tree.PendingEntriesTree;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.query.GridQueryRowCacheCleaner;
 import org.apache.ignite.internal.util.GridAtomicLong;
@@ -47,7 +48,7 @@ public interface IgniteCacheOffheapManager {
      * @param grp Cache group.
      * @throws IgniteCheckedException If failed.
      */
-    public void start(GridCacheSharedContext ctx, CacheGroupContext grp) throws IgniteCheckedException;;
+    public void start(GridCacheSharedContext ctx, CacheGroupContext grp) throws IgniteCheckedException;
 
     /**
      * @param cctx Cache context.
@@ -142,6 +143,8 @@ public interface IgniteCacheOffheapManager {
     /**
      * @param cctx Cache context.
      * @param c Closure.
+     * @param amount Limit of processed entries by single call, {@code -1} for no limit.
+     * @return {@code True} if unprocessed expired entries remains.
      * @throws IgniteCheckedException If failed.
      */
     public boolean expire(GridCacheContext cctx, IgniteInClosure2X<GridCacheEntryEx, GridCacheVersion> c, int amount)
@@ -167,9 +170,9 @@ public interface IgniteCacheOffheapManager {
 
     /**
      * @param cctx Cache context.
-     * @param key  Key.
-     * @param val  Value.
-     * @param ver  Version.
+     * @param key Key.
+     * @param val Value.
+     * @param ver Version.
      * @param expireTime Expire time.
      * @param oldRow Old row if available.
      * @param part Partition.
@@ -537,5 +540,13 @@ public interface IgniteCacheOffheapManager {
          * @param rowCacheCleaner Rows cache cleaner.
          */
         public void setRowCacheCleaner(GridQueryRowCacheCleaner rowCacheCleaner);
+
+        /**
+         * Return PendingTree for data store.
+         *
+         * @return PendingTree instance.
+         * @throws IgniteCheckedException
+         */
+        PendingEntriesTree pendingTree();
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index 5c78eb5..bf0de02 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -101,16 +101,16 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
     protected final ConcurrentMap<Integer, CacheDataStore> partDataStores = new ConcurrentHashMap<>();
 
     /** */
-    protected PendingEntriesTree pendingEntries;
+    private PendingEntriesTree pendingEntries;
 
     /** */
-    private volatile boolean hasPendingEntries;
+    protected volatile boolean hasPendingEntries;
 
     /** */
     private final GridAtomicLong globalRmvId = new GridAtomicLong(U.currentTimeMillis() * 1000_000);
 
     /** */
-    private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
+    protected final GridSpinBusyLock busyLock = new GridSpinBusyLock();
 
     /** */
     private int updateValSizeThreshold;
@@ -148,19 +148,29 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
 
     /** {@inheritDoc} */
     public void onCacheStarted(GridCacheContext cctx) throws IgniteCheckedException {
+        initPendingTree(cctx);
+    }
+
+    /**
+     * @param cctx Cache context.
+     * @throws IgniteCheckedException If failed.
+     */
+    protected void initPendingTree(GridCacheContext cctx) throws IgniteCheckedException {
+        assert !cctx.group().persistenceEnabled();
+
         if (cctx.affinityNode() && cctx.ttl().eagerTtlEnabled() && pendingEntries == null) {
             String name = "PendingEntries";
 
-                long rootPage = allocateForTree();
+            long rootPage = allocateForTree();
 
-                pendingEntries = new PendingEntriesTree(
-                    grp,
-                    name,
-                    grp.dataRegion().pageMemory(),
-                    rootPage,
-                    grp.reuseList(),
-                    true);
-            }
+            pendingEntries = new PendingEntriesTree(
+                grp,
+                name,
+                grp.dataRegion().pageMemory(),
+                rootPage,
+                grp.reuseList(),
+                true);
+        }
     }
 
     /**
@@ -204,11 +214,11 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
         try {
             if (grp.sharedGroup()) {
                 assert cacheId != CU.UNDEFINED_CACHE_ID;
-                assert ctx.database().checkpointLockIsHeldByThread();
 
                 for (CacheDataStore store : cacheDataStores())
                     store.clear(cacheId);
 
+                // Clear non-persistent pending tree if needed.
                 if (pendingEntries != null) {
                     PendingRow row = new PendingRow(cacheId);
 
@@ -241,6 +251,14 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
         }
     }
 
+    /**
+     * @param part Partition.
+     * @return Data store for given entry.
+     */
+    public CacheDataStore dataStore(int part) {
+        return grp.isLocal() ? locCacheDataStore : partDataStores.get(part);
+    }
+
     /** {@inheritDoc} */
     @Override public long cacheEntriesCount(int cacheId) {
         long size = 0;
@@ -1011,51 +1029,56 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
     ) throws IgniteCheckedException {
         assert !cctx.isNear() : cctx.name();
 
-        if (hasPendingEntries && pendingEntries != null) {
-            GridCacheVersion obsoleteVer = null;
+        if (!hasPendingEntries || pendingEntries == null)
+            return false;
 
-            long now = U.currentTimeMillis();
+        GridCacheVersion obsoleteVer = null;
 
-            GridCursor<PendingRow> cur;
+        long now = U.currentTimeMillis();
 
-            if (grp.sharedGroup())
-                cur = pendingEntries.find(new PendingRow(cctx.cacheId()), new PendingRow(cctx.cacheId(), now, 0));
-            else
-                cur = pendingEntries.find(null, new PendingRow(CU.UNDEFINED_CACHE_ID, now, 0));
+        GridCursor<PendingRow> cur;
 
-            if (!cur.next())
-                return false;
+        if (grp.sharedGroup())
+            cur = pendingEntries.find(new PendingRow(cctx.cacheId()), new PendingRow(cctx.cacheId(), now, 0));
+        else
+            cur = pendingEntries.find(null, new PendingRow(CU.UNDEFINED_CACHE_ID, now, 0));
 
-            int cleared = 0;
+        if (!cur.next())
+            return false;
 
-            cctx.shared().database().checkpointReadLock();
+        int cleared = 0;
 
-            try {
-                do {
-                    PendingRow row = cur.get();
+        if (!busyLock.enterBusy())
+            return false;
 
-                    if (amount != -1 && cleared > amount)
-                        return true;
+        try {
+            do {
+                if (amount != -1 && cleared > amount)
+                    return true;
 
-                    if (row.key.partition() == -1)
-                        row.key.partition(cctx.affinity().partition(row.key));
+                PendingRow row = cur.get();
 
-                    assert row.key != null && row.link != 0 && row.expireTime != 0 : row;
+                if (row.key.partition() == -1)
+                    row.key.partition(cctx.affinity().partition(row.key));
 
-                    if (pendingEntries.removex(row)) {
-                        if (obsoleteVer == null)
-                            obsoleteVer = ctx.versions().next();
+                assert row.key != null && row.link != 0 && row.expireTime != 0 : row;
 
-                        c.apply(cctx.cache().entryEx(row.key), obsoleteVer);
-                    }
+                if (pendingEntries.removex(row)) {
+                    if (obsoleteVer == null)
+                        obsoleteVer = ctx.versions().next();
+
+                    GridCacheEntryEx entry = cctx.cache().entryEx(row.key);
 
-                    cleared++;
+                    if (entry != null)
+                        c.apply(entry, obsoleteVer);
                 }
-                while (cur.next());
-            }
-            finally {
-                cctx.shared().database().checkpointReadUnlock();
+
+                cleared++;
             }
+            while (cur.next());
+        }
+        finally {
+            busyLock.leaveBusy();
         }
 
         return false;
@@ -1395,15 +1418,15 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
             if (oldRow != null) {
                 assert oldRow.link() != 0 : oldRow;
 
-                if (pendingEntries != null && oldRow.expireTime() != 0)
-                    pendingEntries.removex(new PendingRow(cacheId, oldRow.expireTime(), oldRow.link()));
+                if (pendingTree() != null && oldRow.expireTime() != 0)
+                    pendingTree().removex(new PendingRow(cacheId, oldRow.expireTime(), oldRow.link()));
 
                 if (newRow.link() != oldRow.link())
                     rowStore.removeRow(oldRow.link());
             }
 
-            if (pendingEntries != null && expireTime != 0) {
-                pendingEntries.putx(new PendingRow(cacheId, expireTime, newRow.link()));
+            if (pendingTree() != null && expireTime != 0) {
+                pendingTree().putx(new PendingRow(cacheId, expireTime, newRow.link()));
 
                 hasPendingEntries = true;
             }
@@ -1444,8 +1467,8 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
                 assert cacheId == CU.UNDEFINED_CACHE_ID || oldRow.cacheId() == cacheId :
                     "Incorrect cache ID [expected=" + cacheId + ", actual=" + oldRow.cacheId() + "].";
 
-                if (pendingEntries != null && oldRow.expireTime() != 0)
-                    pendingEntries.removex(new PendingRow(cacheId, oldRow.expireTime(), oldRow.link()));
+                if (pendingTree() != null && oldRow.expireTime() != 0)
+                    pendingTree().removex(new PendingRow(cacheId, oldRow.expireTime(), oldRow.link()));
 
                 decrementSize(cctx.cacheId());
             }
@@ -1543,7 +1566,6 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
         /** {@inheritDoc} */
         @Override public void clear(int cacheId) throws IgniteCheckedException {
             assert cacheId != CU.UNDEFINED_CACHE_ID;
-            assert ctx.database().checkpointLockIsHeldByThread();
 
             if (cacheSize(cacheId) == 0)
                 return;
@@ -1624,6 +1646,11 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
             }
         }
 
+        /** {@inheritDoc} */
+        @Override public PendingEntriesTree pendingTree() {
+            return pendingEntries;
+        }
+
         /**
          * @param cctx Cache context.
          * @param key Key.
@@ -1676,5 +1703,4 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
                 return 0;
         }
     }
-
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
index be74eff..a199f6c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
@@ -47,7 +47,6 @@ import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader;
 import org.apache.ignite.internal.processors.cache.extras.GridCacheObsoleteEntryExtras;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
-import org.apache.ignite.internal.processors.cache.persistence.GridCacheOffheapManager;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.query.GridQueryRowCacheCleaner;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -57,9 +56,9 @@ import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.util.deque.FastSizeDeque;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
-import org.apache.ignite.util.deque.FastSizeDeque;
 
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_CACHE_REMOVED_ENTRIES_TTL;
@@ -342,9 +341,6 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
      * @return {@code True} if partition is empty.
      */
     public boolean isEmpty() {
-        if (grp.allowFastEviction())
-            return internalSize() == 0;
-
         return store.fullSize() == 0 && internalSize() == 0;
     }
 
@@ -981,78 +977,76 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
 
         long cleared = 0;
 
-        if (!grp.allowFastEviction()) {
-            CacheMapHolder hld = grp.sharedGroup() ? null : singleCacheEntryMap;
+        CacheMapHolder hld = grp.sharedGroup() ? null : singleCacheEntryMap;
 
-            try {
-                GridIterator<CacheDataRow> it0 = grp.offheap().partitionIterator(id);
+        try {
+            GridIterator<CacheDataRow> it0 = grp.offheap().partitionIterator(id);
 
-                while (it0.hasNext()) {
-                    ctx.database().checkpointReadLock();
+            while (it0.hasNext()) {
+                ctx.database().checkpointReadLock();
 
-                    try {
-                        CacheDataRow row = it0.next();
-
-                        // Do not clear fresh rows in case of single partition clearing.
-                        if (row.version().compareTo(clearVer) >= 0 && (state() == MOVING && clear))
-                            continue;
-
-                        if (grp.sharedGroup() && (hld == null || hld.cctx.cacheId() != row.cacheId()))
-                            hld = cacheMapHolder(ctx.cacheContext(row.cacheId()));
-
-                        assert hld != null;
-
-                        GridCacheMapEntry cached = putEntryIfObsoleteOrAbsent(
-                            hld,
-                            hld.cctx,
-                            grp.affinity().lastVersion(),
-                            row.key(),
-                            true,
-                            false);
-
-                        if (cached instanceof GridDhtCacheEntry && ((GridDhtCacheEntry)cached).clearInternal(clearVer, extras)) {
-                            removeEntry(cached);
-
-                            if (rec && !hld.cctx.config().isEventsDisabled()) {
-                                hld.cctx.events().addEvent(cached.partition(),
-                                    cached.key(),
-                                    ctx.localNodeId(),
-                                    (IgniteUuid)null,
-                                    null,
-                                    EVT_CACHE_REBALANCE_OBJECT_UNLOADED,
-                                    null,
-                                    false,
-                                    cached.rawGet(),
-                                    cached.hasValue(),
-                                    null,
-                                    null,
-                                    null,
-                                    false);
-                            }
-
-                            cleared++;
+                try {
+                    CacheDataRow row = it0.next();
+
+                    // Do not clear fresh rows in case of single partition clearing.
+                    if (row.version().compareTo(clearVer) >= 0 && (state() == MOVING && clear))
+                        continue;
+
+                    if (grp.sharedGroup() && (hld == null || hld.cctx.cacheId() != row.cacheId()))
+                        hld = cacheMapHolder(ctx.cacheContext(row.cacheId()));
+
+                    assert hld != null;
+
+                    GridCacheMapEntry cached = putEntryIfObsoleteOrAbsent(
+                        hld,
+                        hld.cctx,
+                        grp.affinity().lastVersion(),
+                        row.key(),
+                        true,
+                        false);
+
+                    if (cached instanceof GridDhtCacheEntry && ((GridDhtCacheEntry)cached).clearInternal(clearVer, extras)) {
+                        removeEntry(cached);
+
+                        if (rec && !hld.cctx.config().isEventsDisabled()) {
+                            hld.cctx.events().addEvent(cached.partition(),
+                                cached.key(),
+                                ctx.localNodeId(),
+                                (IgniteUuid)null,
+                                null,
+                                EVT_CACHE_REBALANCE_OBJECT_UNLOADED,
+                                null,
+                                false,
+                                cached.rawGet(),
+                                cached.hasValue(),
+                                null,
+                                null,
+                                null,
+                                false);
                         }
-                    }
-                    catch (GridDhtInvalidPartitionException e) {
-                        assert isEmpty() && state() == EVICTED : "Invalid error [e=" + e + ", part=" + this + ']';
 
-                        break; // Partition is already concurrently cleared and evicted.
-                    }
-                    finally {
-                        ctx.database().checkpointReadUnlock();
+                        cleared++;
                     }
                 }
-            }
-            catch (NodeStoppingException e) {
-                if (log.isDebugEnabled())
-                    log.debug("Failed to get iterator for evicted partition: " + id);
+                catch (GridDhtInvalidPartitionException e) {
+                    assert isEmpty() && state() == EVICTED : "Invalid error [e=" + e + ", part=" + this + ']';
 
-                throw e;
-            }
-            catch (IgniteCheckedException e) {
-                U.error(log, "Failed to get iterator for evicted partition: " + id, e);
+                    break; // Partition is already concurrently cleared and evicted.
+                }
+                finally {
+                    ctx.database().checkpointReadUnlock();
+                }
             }
         }
+        catch (NodeStoppingException e) {
+            if (log.isDebugEnabled())
+                log.debug("Failed to get iterator for evicted partition: " + id);
+
+            throw e;
+        }
+        catch (IgniteCheckedException e) {
+            U.error(log, "Failed to get iterator for evicted partition: " + id, e);
+        }
 
         return cleared;
     }
@@ -1406,37 +1400,9 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
         }
 
         /**
-         * Recreate cache data store after successful clearing and allowed fast eviction.
-         */
-        private void recreateCacheDataStore() {
-            assert grp.offheap() instanceof GridCacheOffheapManager;
-
-            try {
-                CacheDataStore store0 = store;
-
-                store = ((GridCacheOffheapManager) grp.offheap()).recreateCacheDataStore(store0);
-
-                // Inject row cache cleaner on store creation
-                // Used in case the cache with enabled SqlOnheapCache is single cache at the cache group
-                if (ctx.kernalContext().query().moduleEnabled()) {
-                    GridQueryRowCacheCleaner cleaner = ctx.kernalContext().query().getIndexing()
-                            .rowCacheCleaner(grp.groupId());
-
-                    if (store != null && cleaner != null)
-                        store.setRowCacheCleaner(cleaner);
-                }
-            } catch (IgniteCheckedException e) {
-                finish(e);
-            }
-        }
-
-        /**
          * Successfully finishes the future.
          */
         public void finish() {
-            if (state() == MOVING && clear && grp.allowFastEviction())
-                recreateCacheDataStore();
-
             synchronized (this) {
                 onDone();
                 finished = true;

http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsStateValidator.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsStateValidator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsStateValidator.java
index cc0542c..866c513 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsStateValidator.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsStateValidator.java
@@ -95,6 +95,7 @@ public class GridDhtPartitionsStateValidator {
 
         // Validate cache sizes.
         result = validatePartitionsSizes(top, messages, ignoringNodes);
+
         if (!result.isEmpty())
             throw new IgniteCheckedException("Partitions cache sizes are inconsistent for " + fold(topVer, result));
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 39f4ed1..1b79b76 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -136,6 +136,13 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     /** */
     private static final IgniteProductVersion FORCE_AFF_REASSIGNMENT_SINCE = IgniteProductVersion.fromString("2.4.3");
 
+    /**
+     * This may be useful when per-entry (not per-cache based) partition policy is in use.
+     * See {@link IgniteSystemProperties#IGNITE_SKIP_PARTITION_SIZE_VALIDATION} for details.
+     * Default value is {@code false}.
+     */
+    private static final boolean SKIP_PARTITION_SIZE_VALIDATION = Boolean.getBoolean(IgniteSystemProperties.IGNITE_SKIP_PARTITION_SIZE_VALIDATION);
+
     /** */
     @GridToStringExclude
     private final Object mux = new Object();
@@ -2755,13 +2762,16 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                     grpCtx.topology() :
                     cctx.exchange().clientTopology(grpId, events().discoveryCache());
 
-            // Do not validate read or write through caches or caches with disabled rebalance.
+            // Do not validate read or write through caches or caches with disabled rebalance
+            // or ExpiryPolicy is set or validation is disabled.
             if (grpCtx == null
                     || grpCtx.config().isReadThrough()
                     || grpCtx.config().isWriteThrough()
                     || grpCtx.config().getCacheStoreFactory() != null
                     || grpCtx.config().getRebalanceDelay() == -1
-                    || grpCtx.config().getRebalanceMode() == CacheRebalanceMode.NONE)
+                    || grpCtx.config().getRebalanceMode() == CacheRebalanceMode.NONE
+                    || grpCtx.config().getExpiryPolicyFactory() == null
+                    || SKIP_PARTITION_SIZE_VALIDATION)
                 continue;
 
             try {

http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index 5feaa25..d7cc623 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -40,6 +40,7 @@ import org.apache.ignite.internal.pagemem.wal.WALIterator;
 import org.apache.ignite.internal.pagemem.wal.WALPointer;
 import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
 import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
+import org.apache.ignite.internal.pagemem.wal.record.PageSnapshot;
 import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
 import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageInitRecord;
 import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageUpdateNextSnapshotId;
@@ -49,6 +50,8 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheGroupContext;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
+import org.apache.ignite.internal.processors.cache.GridCacheTtlManager;
 import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManagerImpl;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
@@ -56,6 +59,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartit
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteHistoricalIterator;
 import org.apache.ignite.internal.processors.cache.persistence.freelist.CacheFreeListImpl;
+import org.apache.ignite.internal.processors.cache.persistence.migration.UpgradePendingTreeToPerPartitionTask;
 import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx;
 import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
 import org.apache.ignite.internal.processors.cache.persistence.partstate.PagesAllocationRange;
@@ -64,6 +68,7 @@ import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
 import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageMetaIO;
 import org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionCountersIO;
 import org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionMetaIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionMetaIOV2;
 import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseList;
 import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseListImpl;
 import org.apache.ignite.internal.processors.cache.persistence.tree.util.PageHandler;
@@ -71,10 +76,13 @@ import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointe
 import org.apache.ignite.internal.processors.cache.tree.CacheDataRowStore;
 import org.apache.ignite.internal.processors.cache.tree.CacheDataTree;
 import org.apache.ignite.internal.processors.cache.tree.PendingEntriesTree;
+import org.apache.ignite.internal.processors.cache.tree.PendingRow;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.query.GridQueryRowCacheCleaner;
 import org.apache.ignite.internal.util.lang.GridCursor;
+import org.apache.ignite.internal.util.lang.IgniteInClosure2X;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiTuple;
@@ -95,7 +103,14 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
     private ReuseListImpl reuseList;
 
     /** {@inheritDoc} */
+    @Override protected void initPendingTree(GridCacheContext cctx) throws IgniteCheckedException {
+        // No-op. Per-partition PendingTree should be used.
+    }
+
+    /** {@inheritDoc} */
     @Override protected void initDataStructures() throws IgniteCheckedException {
+        assert ctx.database().checkpointLockIsHeldByThread();
+
         Metas metas = getOrAllocateCacheMetas();
 
         RootPage reuseListRoot = metas.reuseListRoot;
@@ -122,29 +137,12 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
         ((GridCacheDatabaseSharedManager)ctx.database()).addCheckpointListener(this);
     }
 
-    /** {@inheritDoc} */
-    @Override public void onCacheStarted(GridCacheContext cctx) throws IgniteCheckedException {
-        if (cctx.affinityNode() && cctx.ttl().eagerTtlEnabled() && pendingEntries == null) {
-            ctx.database().checkpointReadLock();
-
-            try {
-                final String name = "PendingEntries";
-
-                RootPage pendingRootPage = indexStorage.getOrAllocateForTree(name);
-
-                pendingEntries = new PendingEntriesTree(
-                    grp,
-                    name,
-                    grp.dataRegion().pageMemory(),
-                    pendingRootPage.pageId().pageId(),
-                    reuseList,
-                    pendingRootPage.isAllocated()
-                );
-            }
-            finally {
-                ctx.database().checkpointReadUnlock();
-            }
-        }
+    /**
+     * Get internal IndexStorage.
+     * See {@link UpgradePendingTreeToPerPartitionTask} for details.
+     */
+    public IndexStorage getIndexStorage() {
+        return indexStorage;
     }
 
     /** {@inheritDoc} */
@@ -218,8 +216,8 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
 
                 int grpId = grp.groupId();
                 long partMetaId = pageMem.partitionMetaPageId(grpId, store.partId());
-                long partMetaPage = pageMem.acquirePage(grpId, partMetaId);
 
+                long partMetaPage = pageMem.acquirePage(grpId, partMetaId);
                 try {
                     long partMetaPageAddr = pageMem.writeLock(grpId, partMetaId, partMetaPage);
 
@@ -274,7 +272,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
                         if (needSnapshot) {
                             pageCnt = this.ctx.pageStore().pages(grpId, store.partId());
 
-                            io.setCandidatePageCount(partMetaPageAddr, size == 0 ? 0: pageCnt);
+                            io.setCandidatePageCount(partMetaPageAddr, size == 0 ? 0 : pageCnt);
 
                             if (saveMeta) {
                                 saveMeta(ctx);
@@ -285,7 +283,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
                             if (state == OWNING) {
                                 assert part != null;
 
-                                if(!addPartition(
+                                if (!addPartition(
                                     part,
                                     ctx.partitionStatMap(),
                                     partMetaPageAddr,
@@ -295,8 +293,8 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
                                     this.ctx.pageStore().pages(grpId, store.partId()),
                                     store.fullSize()
                                 ))
-                                    U.warn(log,"Partition was concurrently evicted grpId=" +  grpId +
-                                            ", partitionId=" + part.id());
+                                    U.warn(log, "Partition was concurrently evicted grpId=" + grpId +
+                                        ", partitionId=" + part.id());
                             }
                             else if (state == MOVING || state == RENTING) {
                                 if (ctx.partitionStatMap().forceSkipIndexPartition(grpId)) {
@@ -333,7 +331,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
                 }
             }
             else if (needSnapshot)
-                tryAddEmptyPartitionToSnapshot(store, ctx);;
+                tryAddEmptyPartitionToSnapshot(store, ctx);
         }
         else if (needSnapshot)
             tryAddEmptyPartitionToSnapshot(store, ctx);
@@ -350,8 +348,8 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
     private void tryAddEmptyPartitionToSnapshot(CacheDataStore store, Context ctx) {
         if (getPartition(store).state() == OWNING) {
             ctx.partitionStatMap().put(
-                    new GroupPartitionId(grp.groupId(), store.partId()),
-                    new PagesAllocationRange(0, 0));
+                new GroupPartitionId(grp.groupId(), store.partId()),
+                new PagesAllocationRange(0, 0));
         }
     }
 
@@ -362,7 +360,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
      */
     private GridDhtLocalPartition getPartition(CacheDataStore store) {
         return grp.topology().localPartition(store.partId(),
-                AffinityTopologyVersion.NONE, false, true);
+            AffinityTopologyVersion.NONE, false, true);
     }
 
     /**
@@ -385,7 +383,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
 
         long nextId = cntrsPageId;
 
-        while (true){
+        while (true) {
             final long curId = nextId;
             final long curPage = pageMem.acquirePage(grpId, curId);
 
@@ -542,19 +540,19 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
      * @param currAllocatedPageCnt total number of pages allocated for partition <code>[partition, grpId]</code>
      */
     private static boolean addPartition(
-            GridDhtLocalPartition part,
-            final PartitionAllocationMap map,
-            final long metaPageAddr,
-            final PageMetaIO io,
-            final int grpId,
-            final int partId,
-            final int currAllocatedPageCnt,
-            final long partSize
+        GridDhtLocalPartition part,
+        final PartitionAllocationMap map,
+        final long metaPageAddr,
+        final PageMetaIO io,
+        final int grpId,
+        final int partId,
+        final int currAllocatedPageCnt,
+        final long partSize
     ) {
         if (part != null) {
             boolean reserved = part.reserve();
 
-            if(!reserved)
+            if (!reserved)
                 return false;
         }
         else
@@ -596,43 +594,6 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
         }
     }
 
-    /**
-     * Destroys given {@code store} and creates new with the same update counters as in given.
-     *
-     * @param store Store to destroy.
-     * @return New cache data store.
-     * @throws IgniteCheckedException If failed.
-     */
-    public CacheDataStore recreateCacheDataStore(CacheDataStore store) throws IgniteCheckedException {
-        long updCounter = store.updateCounter();
-        long initUpdCounter = store.initialUpdateCounter();
-
-        int p = store.partId();
-
-        PageMemoryEx pageMemory = (PageMemoryEx)grp.dataRegion().pageMemory();
-
-        int tag = pageMemory.invalidate(grp.groupId(), p);
-
-        ctx.pageStore().onPartitionDestroyed(grp.groupId(), p, tag);
-
-        CacheDataStore store0;
-
-        partStoreLock.lock(p);
-
-        try {
-            store0 = createCacheDataStore0(p);
-            store0.updateCounter(updCounter);
-            store0.updateInitialCounter(initUpdCounter);
-
-            partDataStores.put(p, store0);
-        }
-        finally {
-            partStoreLock.unlock(p);
-        }
-
-        return store0;
-    }
-
     /** {@inheritDoc} */
     @Override public void onPartitionCounterUpdated(int part, long cntr) {
         CacheDataStore store = partDataStores.get(part);
@@ -743,7 +704,8 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
 
                 return new Metas(
                     new RootPage(new FullPageId(metastoreRoot, grpId), allocated),
-                    new RootPage(new FullPageId(reuseListRoot, grpId), allocated));
+                    new RootPage(new FullPageId(reuseListRoot, grpId), allocated),
+                    null);
             }
             finally {
                 pageMem.writeUnlock(grpId, metaId, metaPage, null, allocated);
@@ -787,6 +749,47 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
         return iterator;
     }
 
+    /** {@inheritDoc} */
+    @Override public boolean expire(
+        GridCacheContext cctx,
+        IgniteInClosure2X<GridCacheEntryEx, GridCacheVersion> c,
+        int amount
+    ) throws IgniteCheckedException {
+        assert !cctx.isNear() : cctx.name();
+
+        if (!hasPendingEntries)
+            return false;
+
+        if (!busyLock.enterBusy())
+            return false;
+
+        try {
+            int cleared = 0;
+
+            for (CacheDataStore store : cacheDataStores()) {
+                cleared += ((GridCacheDataStore)store).purgeExpired(cctx, c, amount - cleared);
+
+                if (amount != -1 && cleared >= amount)
+                    return true;
+            }
+        }
+        finally {
+            busyLock.leaveBusy();
+        }
+
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long expiredSize() throws IgniteCheckedException {
+        long size = 0;
+
+        for (CacheDataStore store : cacheDataStores())
+            size += ((GridCacheDataStore)store).expiredSize();
+
+        return size;
+    }
+
     /**
      * Calculates free space of all partition data stores - number of bytes available for use in allocated pages.
      *
@@ -1098,13 +1101,18 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
         @GridToStringInclude
         private final RootPage treeRoot;
 
+        /** */
+        @GridToStringInclude
+        private final RootPage pendingTreeRoot;
+
         /**
          * @param treeRoot Metadata storage root.
          * @param reuseListRoot Reuse list root.
          */
-        Metas(RootPage treeRoot, RootPage reuseListRoot) {
+        Metas(RootPage treeRoot, RootPage reuseListRoot, RootPage pendingTreeRoot) {
             this.treeRoot = treeRoot;
             this.reuseListRoot = reuseListRoot;
+            this.pendingTreeRoot = pendingTreeRoot;
         }
 
         /** {@inheritDoc} */
@@ -1116,7 +1124,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
     /**
      *
      */
-    private class GridCacheDataStore implements CacheDataStore {
+    public class GridCacheDataStore implements CacheDataStore {
         /** */
         private final int partId;
 
@@ -1127,6 +1135,9 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
         private volatile CacheFreeListImpl freeList;
 
         /** */
+        private PendingEntriesTree pendingTree;
+
+        /** */
         private volatile CacheDataStore delegate;
 
         /** */
@@ -1164,11 +1175,10 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
                     return null;
             }
 
-            IgniteCacheDatabaseSharedManager dbMgr = ctx.database();
-
-            dbMgr.checkpointReadLock();
-
             if (init.compareAndSet(false, true)) {
+                IgniteCacheDatabaseSharedManager dbMgr = ctx.database();
+
+                dbMgr.checkpointReadLock();
                 try {
                     Metas metas = getOrAllocatePartitionMetas();
 
@@ -1183,6 +1193,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
                         ctx.wal(),
                         reuseRoot.pageId().pageId(),
                         reuseRoot.isAllocated()) {
+                        /** {@inheritDoc} */
                         @Override protected long allocatePageNoReuse() throws IgniteCheckedException {
                             assert grp.shared().database().checkpointLockIsHeldByThread();
 
@@ -1201,6 +1212,24 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
                         rowStore,
                         treeRoot.pageId().pageId(),
                         treeRoot.isAllocated()) {
+                        /** {@inheritDoc} */
+                        @Override protected long allocatePageNoReuse() throws IgniteCheckedException {
+                            assert grp.shared().database().checkpointLockIsHeldByThread();
+
+                            return pageMem.allocatePage(grpId, partId, PageIdAllocator.FLAG_DATA);
+                        }
+                    };
+
+                    RootPage pendingTreeRoot = metas.pendingTreeRoot;
+
+                    final PendingEntriesTree pendingTree0 = new PendingEntriesTree(
+                        grp,
+                        "PendingEntries-" + partId,
+                        grp.dataRegion().pageMemory(),
+                        pendingTreeRoot.pageId().pageId(),
+                        reuseList,
+                        pendingTreeRoot.isAllocated()) {
+                        /** {@inheritDoc} */
                         @Override protected long allocatePageNoReuse() throws IgniteCheckedException {
                             assert grp.shared().database().checkpointLockIsHeldByThread();
 
@@ -1210,7 +1239,17 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
 
                     PageMemoryEx pageMem = (PageMemoryEx)grp.dataRegion().pageMemory();
 
-                    delegate0 = new CacheDataStoreImpl(partId, name, rowStore, dataTree);
+                    delegate0 = new CacheDataStoreImpl(partId, name, rowStore, dataTree) {
+                        /** {@inheritDoc} */
+                        @Override public PendingEntriesTree pendingTree() {
+                            return pendingTree0;
+                        }
+                    };
+
+                    pendingTree = pendingTree0;
+
+                    if (!hasPendingEntries && pendingTree0.size() > 0)
+                        hasPendingEntries = true;
 
                     int grpId = grp.groupId();
                     long partMetaId = pageMem.partitionMetaPageId(grpId, partId);
@@ -1258,8 +1297,6 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
                 }
             }
             else {
-                dbMgr.checkpointReadUnlock();
-
                 U.await(latch);
 
                 delegate0 = delegate;
@@ -1280,13 +1317,15 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
 
             int grpId = grp.groupId();
             long partMetaId = pageMem.partitionMetaPageId(grpId, partId);
+
             long partMetaPage = pageMem.acquirePage(grpId, partMetaId);
             try {
                 boolean allocated = false;
-                long pageAddr = pageMem.writeLock(grpId, partMetaId, partMetaPage);
+                boolean pendingTreeAllocated = false;
 
+                long pageAddr = pageMem.writeLock(grpId, partMetaId, partMetaPage);
                 try {
-                    long treeRoot, reuseListRoot;
+                    long treeRoot, reuseListRoot, pendingTreeRoot;
 
                     // Initialize new page.
                     if (PageIO.getType(pageAddr) != PageIO.T_PART_META) {
@@ -1296,22 +1335,18 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
 
                         treeRoot = pageMem.allocatePage(grpId, partId, PageMemory.FLAG_DATA);
                         reuseListRoot = pageMem.allocatePage(grpId, partId, PageMemory.FLAG_DATA);
+                        pendingTreeRoot = pageMem.allocatePage(grpId, partId, PageMemory.FLAG_DATA);
 
                         assert PageIdUtils.flag(treeRoot) == PageMemory.FLAG_DATA;
                         assert PageIdUtils.flag(reuseListRoot) == PageMemory.FLAG_DATA;
+                        assert PageIdUtils.flag(pendingTreeRoot) == PageMemory.FLAG_DATA;
 
                         io.setTreeRoot(pageAddr, treeRoot);
                         io.setReuseListRoot(pageAddr, reuseListRoot);
+                        io.setPendingTreeRoot(pageAddr, pendingTreeRoot);
 
                         if (PageHandler.isWalDeltaRecordNeeded(pageMem, grpId, partMetaId, partMetaPage, wal, null))
-                            wal.log(new MetaPageInitRecord(
-                                grpId,
-                                partMetaId,
-                                io.getType(),
-                                io.getVersion(),
-                                treeRoot,
-                                reuseListRoot
-                            ));
+                            wal.log(new PageSnapshot(new FullPageId(partMetaId, grpId), pageAddr, pageMem.pageSize()));
 
                         allocated = true;
                     }
@@ -1321,6 +1356,33 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
                         treeRoot = io.getTreeRoot(pageAddr);
                         reuseListRoot = io.getReuseListRoot(pageAddr);
 
+                        int pageVersion = PagePartitionMetaIO.getVersion(pageAddr);
+
+                        if (pageVersion < 2) {
+                            assert pageVersion == 1;
+
+                            if (log.isDebugEnabled())
+                                log.info("Upgrade partition meta page version: [part=" + partId +
+                                    ", grpId=" + grpId + ", oldVer=" + pageVersion +
+                                    ", newVer=" + io.getVersion()
+                                );
+
+                            io = PagePartitionMetaIO.VERSIONS.latest();
+
+                            ((PagePartitionMetaIOV2)io).upgradePage(pageAddr);
+
+                            pendingTreeRoot = pageMem.allocatePage(grpId, partId, PageMemory.FLAG_DATA);
+
+                            io.setPendingTreeRoot(pageAddr, pendingTreeRoot);
+
+                            if (PageHandler.isWalDeltaRecordNeeded(pageMem, grpId, partMetaId, partMetaPage, wal, null))
+                                wal.log(new PageSnapshot(new FullPageId(partMetaId, grpId), pageAddr, pageMem.pageSize()));
+
+                            pendingTreeAllocated = true;
+                        }
+                        else
+                            pendingTreeRoot = io.getPendingTreeRoot(pageAddr);
+
                         if (PageIdUtils.flag(treeRoot) != PageMemory.FLAG_DATA)
                             throw new StorageException("Wrong tree root page id flag: treeRoot="
                                 + U.hexLong(treeRoot) + ", part=" + partId + ", grpId=" + grpId);
@@ -1328,14 +1390,19 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
                         if (PageIdUtils.flag(reuseListRoot) != PageMemory.FLAG_DATA)
                             throw new StorageException("Wrong reuse list root page id flag: reuseListRoot="
                                 + U.hexLong(reuseListRoot) + ", part=" + partId + ", grpId=" + grpId);
+
+                        if (PageIdUtils.flag(pendingTreeRoot) != PageMemory.FLAG_DATA)
+                            throw new StorageException("Wrong pending tree root page id flag: reuseListRoot="
+                                + U.hexLong(reuseListRoot) + ", part=" + partId + ", grpId=" + grpId);
                     }
 
                     return new Metas(
                         new RootPage(new FullPageId(treeRoot, grpId), allocated),
-                        new RootPage(new FullPageId(reuseListRoot, grpId), allocated));
+                        new RootPage(new FullPageId(reuseListRoot, grpId), allocated),
+                        new RootPage(new FullPageId(pendingTreeRoot, grpId), allocated || pendingTreeAllocated));
                 }
                 finally {
-                    pageMem.writeUnlock(grpId, partMetaId, partMetaPage, null, allocated);
+                    pageMem.writeUnlock(grpId, partMetaId, partMetaPage, null, allocated || pendingTreeAllocated);
                 }
             }
             finally {
@@ -1485,6 +1552,8 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
             long expireTime,
             @Nullable CacheDataRow oldRow
         ) throws IgniteCheckedException {
+            assert ctx.database().checkpointLockIsHeldByThread();
+
             CacheDataStore delegate = init0(false);
 
             delegate.update(cctx, key, val, ver, expireTime, oldRow);
@@ -1498,6 +1567,8 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
             GridCacheVersion ver,
             long expireTime,
             @Nullable CacheDataRow oldRow) throws IgniteCheckedException {
+            assert ctx.database().checkpointLockIsHeldByThread();
+
             CacheDataStore delegate = init0(false);
 
             return delegate.createRow(cctx, key, val, ver, expireTime, oldRow);
@@ -1506,6 +1577,8 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
         /** {@inheritDoc} */
         @Override public void invoke(GridCacheContext cctx, KeyCacheObject key, OffheapInvokeClosure c)
             throws IgniteCheckedException {
+            assert ctx.database().checkpointLockIsHeldByThread();
+
             CacheDataStore delegate = init0(false);
 
             delegate.invoke(cctx, key, c);
@@ -1514,6 +1587,8 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
         /** {@inheritDoc} */
         @Override public void remove(GridCacheContext cctx, KeyCacheObject key, int partId)
             throws IgniteCheckedException {
+            assert ctx.database().checkpointLockIsHeldByThread();
+
             CacheDataStore delegate = init0(false);
 
             delegate.remove(cctx, key, partId);
@@ -1583,10 +1658,142 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
 
         /** {@inheritDoc} */
         @Override public void clear(int cacheId) throws IgniteCheckedException {
-            CacheDataStore delegate = init0(true);
+            CacheDataStore delegate0 = init0(true);
 
-            if (delegate != null)
-                delegate.clear(cacheId);
+            if (delegate0 == null)
+                return;
+
+            ctx.database().checkpointReadLock();
+            try {
+                // Clear persistent pendingTree
+                if (pendingTree != null) {
+                    PendingRow row = new PendingRow(cacheId);
+
+                    GridCursor<PendingRow> cursor = pendingTree.find(row, row, PendingEntriesTree.WITHOUT_KEY);
+
+                    while (cursor.next()) {
+                        PendingRow row0 = cursor.get();
+
+                        assert row0.link != 0 : row;
+
+                        boolean res = pendingTree.removex(row0);
+
+                        assert res;
+                    }
+                }
+
+                delegate0.clear(cacheId);
+            }
+            finally {
+                ctx.database().checkpointReadUnlock();
+            }
+        }
+
+        /**
+         * Gets the number of entries pending expire.
+         *
+         * @return Number of pending entries.
+         * @throws IgniteCheckedException If failed to get number of pending entries.
+         */
+        public long expiredSize() throws IgniteCheckedException {
+            CacheDataStore delegate0 = init0(true);
+
+            return delegate0 == null ? 0 : pendingTree.size();
+        }
+
+        /**
+         * Removes expired entries from data store.
+         *
+         * @param cctx Cache context.
+         * @param c Expiry closure that should be applied to expired entry. See {@link GridCacheTtlManager} for details.
+         * @param amount Limit of processed entries by single call, {@code -1} for no limit.
+         * @return {@code True} if unprocessed expired entries remains.
+         * @throws IgniteCheckedException If failed.
+         */
+        public int purgeExpired(GridCacheContext cctx,
+            IgniteInClosure2X<GridCacheEntryEx, GridCacheVersion> c,
+            int amount) throws IgniteCheckedException {
+            CacheDataStore delegate0 = init0(true);
+
+            if (delegate0 == null || pendingTree == null)
+                return 0;
+
+            GridDhtLocalPartition part = cctx.topology().localPartition(partId, AffinityTopologyVersion.NONE, false, false);
+
+            // Skip non-owned partitions.
+            if (part == null || part.state() != OWNING || pendingTree.size() == 0)
+                return 0;
+
+            cctx.shared().database().checkpointReadLock();
+            try {
+                if (!part.reserve())
+                    return 0;
+
+                try {
+                    if (part.state() != OWNING)
+                        return 0;
+
+                    long now = U.currentTimeMillis();
+
+                    GridCursor<PendingRow> cur;
+
+                    if (grp.sharedGroup())
+                        cur = pendingTree.find(new PendingRow(cctx.cacheId()), new PendingRow(cctx.cacheId(), now, 0));
+                    else
+                        cur = pendingTree.find(null, new PendingRow(CU.UNDEFINED_CACHE_ID, now, 0));
+
+                    if (!cur.next())
+                        return 0;
+
+                    GridCacheVersion obsoleteVer = null;
+
+                    int cleared = 0;
+
+                    do {
+                        PendingRow row = cur.get();
+
+                        if (amount != -1 && cleared > amount)
+                            return cleared;
+
+                        assert row.key != null && row.link != 0 && row.expireTime != 0 : row;
+
+                        row.key.partition(partId);
+
+                        if (pendingTree.removex(row)) {
+                            if (obsoleteVer == null)
+                                obsoleteVer = ctx.versions().next();
+
+                            GridCacheEntryEx e1 = cctx.cache().entryEx(row.key);
+
+                            if (e1 != null)
+                                c.apply(e1, obsoleteVer);
+                        }
+
+                        cleared++;
+                    }
+                    while (cur.next());
+
+                    return cleared;
+                }
+                finally {
+                    part.release();
+                }
+            }
+            finally {
+                cctx.shared().database().checkpointReadUnlock();
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public PendingEntriesTree pendingTree() {
+            try {
+                CacheDataStore delegate0 = init0(true);
+
+                return delegate0 == null ? null : pendingTree;
+            }
+            catch (IgniteCheckedException e) {
+                throw new IgniteException(e);
+            }
         }
     }