You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2018/11/20 13:06:06 UTC

[1/2] ignite git commit: IGNITE-9999 Added verbose logging for node recovery - Fixes #5371

Repository: ignite
Updated Branches:
  refs/heads/master 74f312e0c -> b48a291e1


http://git-wip-us.apache.org/repos/asf/ignite/blob/b48a291e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgniteLogicalRecoveryTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgniteLogicalRecoveryTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgniteLogicalRecoveryTest.java
index 425f9b9..93cc074 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgniteLogicalRecoveryTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgniteLogicalRecoveryTest.java
@@ -21,21 +21,24 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.file.OpenOption;
+import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
 import com.google.common.collect.Lists;
+import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
-import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteIllegalStateException;
 import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.DataRegionConfiguration;
 import org.apache.ignite.configuration.DataStorageConfiguration;
@@ -45,12 +48,17 @@ import org.apache.ignite.failure.FailureHandler;
 import org.apache.ignite.failure.StopNodeFailureHandler;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.GridCacheUtils;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
 import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
 import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
 import org.apache.ignite.internal.processors.cache.persistence.file.FileIODecorator;
 import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
 import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
-import org.apache.ignite.internal.processors.cache.persistence.wal.memtracker.PageMemoryTrackerConfiguration;
+import org.apache.ignite.internal.util.future.GridCompoundFuture;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
@@ -94,7 +102,6 @@ public class IgniteLogicalRecoveryTest extends GridCommonAbstractTest {
         );
 
         DataStorageConfiguration dsCfg = new DataStorageConfiguration()
-            .setAlwaysWriteFullPages(true)
             .setWalMode(WALMode.LOG_ONLY)
             .setCheckpointFrequency(1024 * 1024 * 1024) // Disable automatic checkpoints.
             .setDefaultDataRegionConfiguration(
@@ -110,7 +117,11 @@ public class IgniteLogicalRecoveryTest extends GridCommonAbstractTest {
         if (ioFactory != null)
             dsCfg.setFileIOFactory(ioFactory);
 
-        cfg.setPluginConfigurations(new PageMemoryTrackerConfiguration().setEnabled(false).setCheckPagesOnCheckpoint(true));
+        TestRecordingCommunicationSpi spi = new TestRecordingCommunicationSpi();
+
+        spi.record(GridDhtPartitionDemandMessage.class);
+
+        cfg.setCommunicationSpi(spi);
 
         return cfg;
     }
@@ -130,16 +141,16 @@ public class IgniteLogicalRecoveryTest extends GridCommonAbstractTest {
      * @param cacheMode Cache mode.
      * @param atomicityMode Atomicity mode.
      */
-    private CacheConfiguration<Object, Object> cacheConfiguration(String name, @Nullable String groupName, CacheMode cacheMode, CacheAtomicityMode atomicityMode) {
-        CacheConfiguration<Object, Object> cfg = new CacheConfiguration<>();
+    protected CacheConfiguration<Object, Object> cacheConfiguration(String name, @Nullable String groupName, CacheMode cacheMode, CacheAtomicityMode atomicityMode) {
+        CacheConfiguration<Object, Object> cfg = new CacheConfiguration<>(name)
+            .setGroupName(groupName)
+            .setCacheMode(cacheMode)
+            .setAtomicityMode(atomicityMode)
+            .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC)
+            .setBackups(2)
+            .setAffinity(new RendezvousAffinityFunction(false, 32));
 
-        cfg.setGroupName(groupName);
-        cfg.setName(name);
-        cfg.setCacheMode(cacheMode);
-        cfg.setAtomicityMode(atomicityMode);
-        cfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
-        cfg.setBackups(2);
-        cfg.setAffinity(new RendezvousAffinityFunction(false, 32));
+        cfg.setIndexedTypes(Integer.class, Integer.class);
 
         return cfg;
     }
@@ -172,25 +183,23 @@ public class IgniteLogicalRecoveryTest extends GridCommonAbstractTest {
 
         IgniteEx node = grid(2);
 
-        AggregateCacheLoader aggCacheLoader = new AggregateCacheLoader(node);
-
-        aggCacheLoader.start();
+        AggregateCacheLoader cacheLoader = new AggregateCacheLoader(node);
 
-        U.sleep(3000);
+        cacheLoader.loadByTime(5_000).get();
 
         forceCheckpoint();
 
-        U.sleep(3000);
-
-        aggCacheLoader.stop();
+        cacheLoader.loadByTime(5_000).get();
 
         stopGrid(2, true);
 
-        startGrid(2);
+        node = startGrid(2);
 
         awaitPartitionMapExchange();
 
-        aggCacheLoader.consistencyCheck(grid(2));
+        cacheLoader.consistencyCheck(node);
+
+        checkNoRebalanceAfterRecovery();
     }
 
     /**
@@ -203,29 +212,27 @@ public class IgniteLogicalRecoveryTest extends GridCommonAbstractTest {
 
         IgniteEx node = grid(2);
 
-        AggregateCacheLoader aggCacheLoader = new AggregateCacheLoader(node);
-
-        aggCacheLoader.start();
+        AggregateCacheLoader cacheLoader = new AggregateCacheLoader(node);
 
-        U.sleep(3000);
+        cacheLoader.loadByTime(5_000).get();
 
         forceCheckpoint();
 
-        U.sleep(3000);
-
-        aggCacheLoader.stop();
+        cacheLoader.loadByTime(5_000).get();
 
         stopGrid(2, true);
 
         crd.cluster().active(false);
 
-        startGrid(2);
+        node = startGrid(2);
 
         crd.cluster().active(true);
 
         awaitPartitionMapExchange();
 
-        aggCacheLoader.consistencyCheck(grid(2));
+        checkNoRebalanceAfterRecovery();
+
+        cacheLoader.consistencyCheck(node);
     }
 
     /**
@@ -268,17 +275,13 @@ public class IgniteLogicalRecoveryTest extends GridCommonAbstractTest {
 
         node.getOrCreateCaches(dynamicCaches);
 
-        AggregateCacheLoader aggCacheLoader = new AggregateCacheLoader(node);
-
-        aggCacheLoader.start();
+        AggregateCacheLoader cacheLoader = new AggregateCacheLoader(node);
 
-        U.sleep(3000);
+        cacheLoader.loadByTime(5_000).get();
 
         forceCheckpoint();
 
-        U.sleep(3000);
-
-        aggCacheLoader.stop();
+        cacheLoader.loadByTime(5_000).get();
 
         stopGrid(2, true);
 
@@ -286,8 +289,10 @@ public class IgniteLogicalRecoveryTest extends GridCommonAbstractTest {
 
         awaitPartitionMapExchange();
 
+        checkNoRebalanceAfterRecovery();
+
         for (int idx = 0; idx < 3; idx++)
-            aggCacheLoader.consistencyCheck(grid(idx));
+            cacheLoader.consistencyCheck(grid(idx));
     }
 
     /**
@@ -300,17 +305,13 @@ public class IgniteLogicalRecoveryTest extends GridCommonAbstractTest {
 
         IgniteEx node = grid(2);
 
-        AggregateCacheLoader aggCacheLoader = new AggregateCacheLoader(node);
+        AggregateCacheLoader cacheLoader = new AggregateCacheLoader(node);
 
-        aggCacheLoader.start();
-
-        U.sleep(3000);
+        cacheLoader.loadByTime(5_000).get();
 
         forceCheckpoint();
 
-        U.sleep(3000);
-
-        aggCacheLoader.stop();
+        cacheLoader.loadByTime(5_000).get();
 
         stopGrid(2, true);
 
@@ -323,34 +324,26 @@ public class IgniteLogicalRecoveryTest extends GridCommonAbstractTest {
         awaitPartitionMapExchange();
 
         for (int idx = 0; idx < 3; idx++)
-            aggCacheLoader.consistencyCheck(grid(idx));
+            cacheLoader.consistencyCheck(grid(idx));
     }
 
     /**
      *
      */
     public void testRecoveryOnCrushDuringCheckpointOnNodeStart() throws Exception {
-        // Crash recovery fails because of the bug in pages recycling.
-        // Test passes if don't perform removes in cache loader.
-        fail("https://issues.apache.org/jira/browse/IGNITE-9303");
-
         IgniteEx crd = (IgniteEx) startGridsMultiThreaded(3, false);
 
         crd.cluster().active(true);
 
         IgniteEx node = grid(2);
 
-        AggregateCacheLoader aggCacheLoader = new AggregateCacheLoader(node);
-
-        aggCacheLoader.start();
+        AggregateCacheLoader cacheLoader = new AggregateCacheLoader(node);
 
-        U.sleep(3000);
+        cacheLoader.loadByTime(5_000).get();
 
         forceCheckpoint();
 
-        U.sleep(3000);
-
-        aggCacheLoader.stop();
+        cacheLoader.loadByTime(5_000).get();
 
         stopGrid(2, false);
 
@@ -363,6 +356,18 @@ public class IgniteLogicalRecoveryTest extends GridCommonAbstractTest {
         }
         catch (Exception expected) { }
 
+        // Wait until node will leave cluster.
+        GridTestUtils.waitForCondition(() -> {
+            try {
+                grid(2);
+            }
+            catch (IgniteIllegalStateException e) {
+                return true;
+            }
+
+            return false;
+        }, getTestTimeout());
+
         ioFactory = null;
 
         // Start node again and check recovery.
@@ -370,8 +375,10 @@ public class IgniteLogicalRecoveryTest extends GridCommonAbstractTest {
 
         awaitPartitionMapExchange();
 
+        checkNoRebalanceAfterRecovery();
+
         for (int idx = 0; idx < 3; idx++)
-            aggCacheLoader.consistencyCheck(grid(idx));
+            cacheLoader.consistencyCheck(grid(idx));
     }
 
     /** {@inheritDoc} */
@@ -381,7 +388,30 @@ public class IgniteLogicalRecoveryTest extends GridCommonAbstractTest {
 
     /** {@inheritDoc} */
     @Override protected long getTestTimeout() {
-        return 600 * 1000;
+        return 120 * 1000;
+    }
+
+    /**
+     * Method checks that there were no rebalance for all caches (excluding sys cache).
+     */
+    private void checkNoRebalanceAfterRecovery() {
+        int sysCacheGroupId = CU.cacheId(GridCacheUtils.UTILITY_CACHE_NAME);
+
+        List<Ignite> nodes = G.allGrids();
+
+        for (Ignite node : nodes) {
+            TestRecordingCommunicationSpi spi = TestRecordingCommunicationSpi.spi(node);
+
+            List<Integer> rebalancedGroups = spi.recordedMessages(true).stream()
+                .map(msg -> (GridDhtPartitionDemandMessage) msg)
+                .map(msg -> msg.groupId())
+                .filter(grpId -> grpId != sysCacheGroupId)
+                .distinct()
+                .collect(Collectors.toList());
+
+            Assert.assertTrue("There was unexpected rebalance for some groups" +
+                    " [node=" + node.name() + ", groups=" + rebalancedGroups + ']', rebalancedGroups.isEmpty());
+        }
     }
 
     /**
@@ -389,59 +419,50 @@ public class IgniteLogicalRecoveryTest extends GridCommonAbstractTest {
      */
     private static class AggregateCacheLoader {
         /** Ignite. */
-        IgniteEx ignite;
-
-        /** Stop flag. */
-        AtomicBoolean stopFlag;
+        final IgniteEx ignite;
 
         /** Cache loaders. */
-        Map<CacheLoader, IgniteInternalFuture> cacheLoaders;
+        final List<CacheLoader> cacheLoaders;
 
         /**
          * @param ignite Ignite.
          */
         public AggregateCacheLoader(IgniteEx ignite) {
             this.ignite = ignite;
+
+            List<CacheLoader> cacheLoaders = new ArrayList<>();
+
+            for (String cacheName : ignite.cacheNames())
+                cacheLoaders.add(new CacheLoader(ignite, cacheName));
+
+            this.cacheLoaders = cacheLoaders;
         }
 
         /**
-         *
+         * @param timeMillis Loading time in milliseconds.
          */
-        public void start() {
-            if (stopFlag != null && !stopFlag.get())
-                throw new IllegalStateException("Cache loaders must be stopped before start again");
-
-            stopFlag = new AtomicBoolean();
-            cacheLoaders = new HashMap<>();
+        public IgniteInternalFuture<?> loadByTime(int timeMillis) {
+            GridCompoundFuture<?, ?> loadFut = new GridCompoundFuture();
 
-            for (String cacheName : ignite.cacheNames()) {
-                CacheLoader loader = new CacheLoader(ignite, stopFlag, cacheName);
+            for (CacheLoader cacheLoader : cacheLoaders) {
+                long endTime = U.currentTimeMillis() + timeMillis;
 
-                IgniteInternalFuture loadFuture = GridTestUtils.runAsync(loader);
+                cacheLoader.stopPredicate = it -> U.currentTimeMillis() >= endTime;
 
-                cacheLoaders.put(loader, loadFuture);
+                loadFut.add(GridTestUtils.runAsync(cacheLoader));
             }
-        }
 
-        /**
-         *
-         */
-        public void stop() throws IgniteCheckedException {
-            if (stopFlag != null)
-                stopFlag.set(true);
+            loadFut.markInitialized();
 
-            if (cacheLoaders != null)
-                for (IgniteInternalFuture loadFuture : cacheLoaders.values())
-                    loadFuture.get();
+            return loadFut;
         }
 
         /**
-         * @param ignite Ignite.
+         * @param ignite Ignite node to check consistency from.
          */
         public void consistencyCheck(IgniteEx ignite) {
-            if (cacheLoaders != null)
-                for (CacheLoader cacheLoader : cacheLoaders.keySet())
-                    cacheLoader.consistencyCheck(ignite);
+            for (CacheLoader cacheLoader : cacheLoaders)
+                cacheLoader.consistencyCheck(ignite);
         }
     }
 
@@ -455,38 +476,38 @@ public class IgniteLogicalRecoveryTest extends GridCommonAbstractTest {
         /** Ignite. */
         final IgniteEx ignite;
 
-        /** Stop flag. */
-        final AtomicBoolean stopFlag;
+        /** Stop predicate. */
+        volatile Predicate<IgniteEx> stopPredicate;
 
         /** Cache name. */
         final String cacheName;
 
         /** Local cache. */
-        final Map<Object, Object> locCache = new ConcurrentHashMap<>();
+        final Map<Integer, TestValue> locCache = new ConcurrentHashMap<>();
 
         /**
          * @param ignite Ignite.
-         * @param stopFlag Stop flag.
          * @param cacheName Cache name.
          */
-        public CacheLoader(IgniteEx ignite, AtomicBoolean stopFlag, String cacheName) {
+        public CacheLoader(IgniteEx ignite, String cacheName) {
             this.ignite = ignite;
-            this.stopFlag = stopFlag;
             this.cacheName = cacheName;
         }
 
         /** {@inheritDoc} */
         @Override public void run() {
-            while (!stopFlag.get()) {
+            final Predicate<IgniteEx> predicate = stopPredicate;
+
+            while (!predicate.test(ignite)) {
                 ThreadLocalRandom rnd = ThreadLocalRandom.current();
 
                 int key = rnd.nextInt(KEYS_SPACE);
 
                 boolean remove = rnd.nextInt(100) <= 20;
 
-                IgniteCache<Object, Object> cache = ignite.getOrCreateCache(cacheName);
-
                 try {
+                    IgniteCache<Object, Object> cache = ignite.getOrCreateCache(cacheName);
+
                     if (remove) {
                         cache.remove(key);
 
@@ -496,10 +517,15 @@ public class IgniteLogicalRecoveryTest extends GridCommonAbstractTest {
                         int[] payload = new int[KEYS_SPACE];
                         Arrays.fill(payload, key);
 
-                        cache.put(key, payload);
+                        TestValue val = new TestValue(key, payload);
 
-                        locCache.put(key, payload);
+                        cache.put(key, val);
+
+                        locCache.put(key, val);
                     }
+
+                    // Throttle against GC.
+                    U.sleep(1);
                 }
                 catch (Exception ignored) { }
             }
@@ -509,14 +535,14 @@ public class IgniteLogicalRecoveryTest extends GridCommonAbstractTest {
          *
          */
         public void consistencyCheck(IgniteEx ignite) {
-            IgniteCache<Object, Object> cache = ignite.getOrCreateCache(cacheName);
+            IgniteCache<Integer, TestValue> cache = ignite.getOrCreateCache(cacheName);
 
             for (int key = 0; key < KEYS_SPACE; key++) {
-                int[] expectedValue = (int[]) locCache.get(key);
-                int[] actualValue = (int[]) cache.get(key);
+                TestValue expectedVal = locCache.get(key);
+                TestValue actualVal = cache.get(key);
 
                 Assert.assertEquals("Consistency check failed for: " + cache.getName() + ", key=" + key,
-                    arrayToString(expectedValue), arrayToString(actualValue));
+                    expectedVal, actualVal);
             }
         }
 
@@ -566,21 +592,45 @@ public class IgniteLogicalRecoveryTest extends GridCommonAbstractTest {
     }
 
     /**
-     * @param arr Array.
+     * Test payload with indexed field.
      */
-    static String arrayToString(int[] arr) {
-        if (arr == null)
-            return "null";
+    static class TestValue {
+        /** Indexed field. */
+        @QuerySqlField(index = true)
+        private final int indexedField;
 
-        StringBuilder sb = new StringBuilder();
+        /** Payload. */
+        private final int[] payload;
 
-        sb.append('[');
+        /**
+         * @param indexedField Indexed field.
+         * @param payload Payload.
+         */
+        public TestValue(int indexedField, int[] payload) {
+            this.indexedField = indexedField;
+            this.payload = payload;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            TestValue testValue = (TestValue) o;
+
+            return indexedField == testValue.indexedField &&
+                Arrays.equals(payload, testValue.payload);
+        }
 
-        for (int i = 0; i < Math.min(arr.length, 10); i++)
-            sb.append(i).append(",");
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            int result = Objects.hash(indexedField);
 
-        sb.append(']');
+            result = 31 * result + Arrays.hashCode(payload);
 
-        return sb.toString();
+            return result;
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/b48a291e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsDiskErrorsRecoveringTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsDiskErrorsRecoveringTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsDiskErrorsRecoveringTest.java
index 93c4047..7e17271 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsDiskErrorsRecoveringTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsDiskErrorsRecoveringTest.java
@@ -160,64 +160,6 @@ public class IgnitePdsDiskErrorsRecoveringTest extends GridCommonAbstractTest {
     }
 
     /**
-     * Test node stopping & recovering on start marker writing fail during activation.
-     *
-     * @throws Exception If test failed.
-     */
-    public void testRecoveringOnNodeStartMarkerWriteFail() throws Exception {
-        // Fail to write node start marker tmp file at the second checkpoint. Pass only initial checkpoint.
-        ioFactory = new FilteringFileIOFactory("started.bin" + FilePageStoreManager.TMP_SUFFIX, new LimitedSizeFileIOFactory(new RandomAccessFileIOFactory(), 20));
-
-        IgniteEx grid = startGrid(0);
-        grid.cluster().active(true);
-
-        for (int i = 0; i < 1000; i++) {
-            byte payload = (byte) i;
-            byte[] data = new byte[2048];
-            Arrays.fill(data, payload);
-
-            grid.cache(CACHE_NAME).put(i, data);
-        }
-
-        stopAllGrids();
-
-        boolean activationFailed = false;
-        try {
-            grid = startGrid(0);
-        }
-        catch (IgniteCheckedException e) {
-            boolean interrupted = Thread.interrupted();
-
-            if (interrupted)
-                log.warning("Ignore interrupted excpetion [" +
-                    "thread=" + Thread.currentThread().getName() + ']', e);
-
-            activationFailed = true;
-        }
-
-        Assert.assertTrue("Ignite instance startup must be failed", activationFailed);
-
-        // Grid should be automatically stopped after checkpoint fail.
-        awaitStop(grid);
-
-        // Grid should be successfully recovered after stopping.
-        ioFactory = null;
-
-        IgniteEx recoveredGrid = startGrid(0);
-        recoveredGrid.cluster().active(true);
-
-        for (int i = 0; i < 1000; i++) {
-            byte payload = (byte) i;
-            byte[] data = new byte[2048];
-            Arrays.fill(data, payload);
-
-            byte[] actualData = (byte[]) recoveredGrid.cache(CACHE_NAME).get(i);
-            Assert.assertArrayEquals(data, actualData);
-        }
-    }
-
-
-    /**
      * Test node stopping & recovering on checkpoint begin fail.
      *
      * @throws Exception If test failed.

http://git-wip-us.apache.org/repos/asf/ignite/blob/b48a291e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteNodeStoppedDuringDisableWALTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteNodeStoppedDuringDisableWALTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteNodeStoppedDuringDisableWALTest.java
index 906f191..1a1d449 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteNodeStoppedDuringDisableWALTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteNodeStoppedDuringDisableWALTest.java
@@ -23,7 +23,6 @@ import java.nio.file.FileVisitResult;
 import java.nio.file.Path;
 import java.nio.file.SimpleFileVisitor;
 import java.nio.file.attribute.BasicFileAttributes;
-import java.util.HashMap;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteDataStreamer;
@@ -178,7 +177,7 @@ public class IgniteNodeStoppedDuringDisableWALTest extends GridCommonAbstractTes
         boolean fail = false;
 
         try (WALIterator it = sharedContext.wal().replay(null)) {
-            dbMgr.applyUpdatesOnRecovery(it, (tup) -> true, (entry) -> true, new HashMap<>());
+            dbMgr.applyUpdatesOnRecovery(it, (ptr, rec) -> true, (entry) -> true);
         }
         catch (IgniteCheckedException e) {
             if (nodeStopPoint.needCleanUp)

http://git-wip-us.apache.org/repos/asf/ignite/blob/b48a291e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionTest.java
index 3374860..5bf7e7f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionTest.java
@@ -387,13 +387,12 @@ public class WalCompactionTest extends GridCommonAbstractTest {
 
         File[] cpMarkers = cpMarkersDir.listFiles(new FilenameFilter() {
             @Override public boolean accept(File dir, String name) {
-                return !(
-                    name.equals(cpMarkersToSave[0].getName()) ||
-                    name.equals(cpMarkersToSave[1].getName()) ||
-                    name.equals(cpMarkersToSave[2].getName()) ||
-                    name.equals(cpMarkersToSave[3].getName()) ||
-                    name.equals(cpMarkersToSave[4].getName())
-                );
+                for (File cpMarker : cpMarkersToSave) {
+                    if (cpMarker.getName().equals(name))
+                        return false;
+                }
+
+                return true;
             }
         });
 


[2/2] ignite git commit: IGNITE-9999 Added verbose logging for node recovery - Fixes #5371

Posted by ag...@apache.org.
IGNITE-9999 Added verbose logging for node recovery - Fixes #5371


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b48a291e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b48a291e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b48a291e

Branch: refs/heads/master
Commit: b48a291e1a2fc531085cba3b60ff8647ccc1639e
Parents: 74f312e
Author: Pavel Kovalenko <jo...@gmail.com>
Authored: Tue Nov 20 15:45:39 2018 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Tue Nov 20 15:56:59 2018 +0300

----------------------------------------------------------------------
 .../apache/ignite/IgniteSystemProperties.java   |   7 +
 .../processors/cache/CacheGroupContext.java     | 148 -------
 .../processors/cache/GridCacheProcessor.java    |  44 +-
 .../cache/IgniteCacheOffheapManager.java        |  11 +
 .../cache/IgniteCacheOffheapManagerImpl.java    |   7 +
 .../GridDhtPartitionsExchangeFuture.java        |  13 +-
 .../persistence/DatabaseLifecycleListener.java  |  43 +-
 .../GridCacheDatabaseSharedManager.java         | 425 +++++++++----------
 .../persistence/GridCacheOffheapManager.java    | 140 ++++++
 .../IgniteCacheDatabaseSharedManager.java       |  12 +-
 .../cache/persistence/wal/FileWALPointer.java   |   2 +-
 .../wal/serializer/RecordDataV1Serializer.java  |   2 +-
 .../db/IgniteLogicalRecoveryTest.java           | 284 ++++++++-----
 .../file/IgnitePdsDiskErrorsRecoveringTest.java |  58 ---
 .../IgniteNodeStoppedDuringDisableWALTest.java  |   3 +-
 .../persistence/db/wal/WalCompactionTest.java   |  13 +-
 16 files changed, 619 insertions(+), 593 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/b48a291e/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index 3f71642..ccf7ebf 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -1040,6 +1040,13 @@ public final class IgniteSystemProperties {
     public static final String IGNITE_ALLOW_START_CACHES_IN_PARALLEL = "IGNITE_ALLOW_START_CACHES_IN_PARALLEL";
 
     /**
+     * Allows to log additional information about all restored partitions after binary and logical recovery phases.
+     *
+     * Default is {@code true}.
+     */
+    public static final String IGNITE_RECOVERY_VERBOSE_LOGGING = "IGNITE_RECOVERY_VERBOSE_LOGGING";
+
+    /**
      * Enforces singleton.
      */
     private IgniteSystemProperties() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/b48a291e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
index 95fc08f..fc4f79d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
@@ -22,7 +22,6 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -42,17 +41,11 @@ import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCach
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader;
-import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
-import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
 import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
 import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopologyImpl;
 import org.apache.ignite.internal.processors.cache.persistence.DataRegion;
 import org.apache.ignite.internal.processors.cache.persistence.GridCacheOffheapManager;
 import org.apache.ignite.internal.processors.cache.persistence.freelist.FreeList;
-import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx;
-import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
-import org.apache.ignite.internal.processors.cache.persistence.partstate.PartitionRecoverState;
-import org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionMetaIO;
 import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseList;
 import org.apache.ignite.internal.processors.cache.query.continuous.CounterSkipContext;
 import org.apache.ignite.internal.processors.query.QueryUtils;
@@ -170,9 +163,6 @@ public class CacheGroupContext {
     /** Flag indicates that cache group is under recovering and not attached to topology. */
     private final AtomicBoolean recoveryMode;
 
-    /** Flag indicates that all group partitions have restored their state from page memory / disk. */
-    private volatile boolean partitionStatesRestored;
-
     /**
      * @param ctx Context.
      * @param grpId Group ID.
@@ -791,144 +781,6 @@ public class CacheGroupContext {
     }
 
     /**
-     * Pre-create partitions that resides in page memory or WAL and restores their state.
-     */
-    public long restorePartitionStates(Map<GroupPartitionId, PartitionRecoverState> partitionRecoveryStates) throws IgniteCheckedException {
-        if (isLocal() || !affinityNode() || !dataRegion().config().isPersistenceEnabled())
-            return 0;
-
-        if (partitionStatesRestored)
-            return 0;
-
-        long processed = 0;
-
-        PageMemoryEx pageMem = (PageMemoryEx)dataRegion().pageMemory();
-
-        for (int p = 0; p < affinity().partitions(); p++) {
-            PartitionRecoverState recoverState = partitionRecoveryStates.get(new GroupPartitionId(grpId, p));
-
-            if (ctx.pageStore().exists(grpId, p)) {
-                ctx.pageStore().ensure(grpId, p);
-
-                if (ctx.pageStore().pages(grpId, p) <= 1) {
-                    if (log.isDebugEnabled())
-                        log.debug("Skipping partition on recovery (pages less than 1) " +
-                            "[grp=" + cacheOrGroupName() + ", p=" + p + "]");
-
-                    continue;
-                }
-
-                if (log.isDebugEnabled())
-                    log.debug("Creating partition on recovery (exists in page store) " +
-                        "[grp=" + cacheOrGroupName() + ", p=" + p + "]");
-
-                processed++;
-
-                GridDhtLocalPartition part = topology().forceCreatePartition(p);
-
-                offheap().onPartitionInitialCounterUpdated(p, 0);
-
-                ctx.database().checkpointReadLock();
-
-                try {
-                    long partMetaId = pageMem.partitionMetaPageId(grpId, p);
-                    long partMetaPage = pageMem.acquirePage(grpId, partMetaId);
-
-                    try {
-                        long pageAddr = pageMem.writeLock(grpId, partMetaId, partMetaPage);
-
-                        boolean changed = false;
-
-                        try {
-                            PagePartitionMetaIO io = PagePartitionMetaIO.VERSIONS.forPage(pageAddr);
-
-                            if (recoverState != null) {
-                                io.setPartitionState(pageAddr, (byte) recoverState.stateId());
-
-                                changed = updateState(part, recoverState.stateId());
-
-                                if (recoverState.stateId() == GridDhtPartitionState.OWNING.ordinal()
-                                    || (recoverState.stateId() == GridDhtPartitionState.MOVING.ordinal()
-                                    && part.initialUpdateCounter() < recoverState.updateCounter())) {
-                                    part.initialUpdateCounter(recoverState.updateCounter());
-
-                                    changed = true;
-                                }
-
-                                if (log.isInfoEnabled())
-                                    log.warning("Restored partition state (from WAL) " +
-                                        "[grp=" + cacheOrGroupName() + ", p=" + p + ", state=" + part.state() +
-                                        ", updCntr=" + part.initialUpdateCounter() + "]");
-                            }
-                            else {
-                                int stateId = (int) io.getPartitionState(pageAddr);
-
-                                changed = updateState(part, stateId);
-
-                                if (log.isDebugEnabled())
-                                    log.debug("Restored partition state (from page memory) " +
-                                        "[grp=" + cacheOrGroupName() + ", p=" + p + ", state=" + part.state() +
-                                        ", updCntr=" + part.initialUpdateCounter() + ", stateId=" + stateId + "]");
-                            }
-                        }
-                        finally {
-                            pageMem.writeUnlock(grpId, partMetaId, partMetaPage, null, changed);
-                        }
-                    }
-                    finally {
-                        pageMem.releasePage(grpId, partMetaId, partMetaPage);
-                    }
-                }
-                finally {
-                    ctx.database().checkpointReadUnlock();
-                }
-            }
-            else if (recoverState != null) {
-                GridDhtLocalPartition part = topology().forceCreatePartition(p);
-
-                offheap().onPartitionInitialCounterUpdated(p, recoverState.updateCounter());
-
-                updateState(part, recoverState.stateId());
-
-                processed++;
-
-                if (log.isDebugEnabled())
-                    log.debug("Restored partition state (from WAL) " +
-                        "[grp=" + cacheOrGroupName() + ", p=" + p + ", state=" + part.state() +
-                        ", updCntr=" + part.initialUpdateCounter() + "]");
-            }
-            else {
-                if (log.isDebugEnabled())
-                    log.debug("Skipping partition on recovery (no page store OR wal state) " +
-                        "[grp=" + cacheOrGroupName() + ", p=" + p + "]");
-            }
-        }
-
-        partitionStatesRestored = true;
-
-        return processed;
-    }
-
-    /**
-     * @param part Partition to restore state for.
-     * @param stateId State enum ordinal.
-     * @return Updated flag.
-     */
-    private boolean updateState(GridDhtLocalPartition part, int stateId) {
-        if (stateId != -1) {
-            GridDhtPartitionState state = GridDhtPartitionState.fromOrdinal(stateId);
-
-            assert state != null;
-
-            part.restoreState(state == GridDhtPartitionState.EVICTED ? GridDhtPartitionState.RENTING : state);
-
-            return true;
-        }
-
-        return false;
-    }
-
-    /**
      * @return {@code True} if current cache group is in recovery mode.
      */
     public boolean isRecoveryMode() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/b48a291e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 23c3623..ce81468 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -106,6 +106,8 @@ import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStor
 import org.apache.ignite.internal.processors.cache.persistence.freelist.FreeList;
 import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetastorageLifecycleListener;
 import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadOnlyMetastorage;
+import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
+import org.apache.ignite.internal.processors.cache.persistence.partstate.PartitionRecoverState;
 import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteCacheSnapshotManager;
 import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotDiscoveryMessage;
 import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseList;
@@ -5438,6 +5440,15 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         private final Map<Integer, QuerySchema> querySchemas = new ConcurrentHashMap<>();
 
         /** {@inheritDoc} */
+        @Override public void onBaselineChange() {
+            onKernalStopCaches(true);
+
+            stopCaches(true);
+
+            sharedCtx.database().cleanupRestoredCaches();
+        }
+
+        /** {@inheritDoc} */
         @Override public void onReadyForRead(ReadOnlyMetastorage metastorage) throws IgniteCheckedException {
             restoreCacheConfigurations();
         }
@@ -5449,13 +5460,44 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         }
 
         /** {@inheritDoc} */
-        @Override public void afterBinaryMemoryRestore(IgniteCacheDatabaseSharedManager mgr) throws IgniteCheckedException {
+        @Override public void afterBinaryMemoryRestore(GridCacheDatabaseSharedManager.RestoreBinaryState binaryState) throws IgniteCheckedException {
             for (DynamicCacheDescriptor cacheDescriptor : persistentCaches()) {
                 startCacheInRecoveryMode(cacheDescriptor);
 
                 querySchemas.put(cacheDescriptor.cacheId(), cacheDescriptor.schema().copy());
             }
         }
+
+        /** {@inheritDoc} */
+        @Override public void afterLogicalUpdatesApplied(GridCacheDatabaseSharedManager.RestoreLogicalState logicalState) throws IgniteCheckedException {
+            restorePartitionStates(cacheGroups(), logicalState.partitionRecoveryStates());
+        }
+
+        /**
+         * @param forGroups Cache groups.
+         * @param partitionStates Partition states.
+         * @throws IgniteCheckedException If failed.
+         */
+        private void restorePartitionStates(
+            Collection<CacheGroupContext> forGroups,
+            Map<GroupPartitionId, PartitionRecoverState> partitionStates
+        ) throws IgniteCheckedException {
+            long startRestorePart = U.currentTimeMillis();
+
+            if (log.isInfoEnabled())
+                log.info("Restoring partition state for local groups.");
+
+            long totalProcessed = 0;
+
+            for (CacheGroupContext grp : forGroups)
+                totalProcessed += grp.offheap().restorePartitionStates(partitionStates);
+
+            if (log.isInfoEnabled())
+                log.info("Finished restoring partition state for local groups [" +
+                    "groupsProcessed=" + forGroups.size() +
+                    ", partitionsProcessed=" + totalProcessed +
+                    ", time=" + (U.currentTimeMillis() - startRestorePart) + "ms]");
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/b48a291e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
index 2cf302f..db09a89 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
@@ -31,6 +31,8 @@ import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.cache.persistence.CacheSearchRow;
 import org.apache.ignite.internal.processors.cache.persistence.RootPage;
 import org.apache.ignite.internal.processors.cache.persistence.RowStore;
+import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
+import org.apache.ignite.internal.processors.cache.persistence.partstate.PartitionRecoverState;
 import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseList;
 import org.apache.ignite.internal.processors.cache.tree.PendingEntriesTree;
 import org.apache.ignite.internal.processors.cache.tree.mvcc.data.MvccUpdateResult;
@@ -82,6 +84,15 @@ public interface IgniteCacheOffheapManager {
     public void stop();
 
     /**
+     * Pre-create partitions that resides in page memory or WAL and restores their state.
+     *
+     * @param partitionRecoveryStates Partition recovery states.
+     * @return Number of processed partitions.
+     * @throws IgniteCheckedException If failed.
+     */
+    long restorePartitionStates(Map<GroupPartitionId, PartitionRecoverState> partitionRecoveryStates) throws IgniteCheckedException;
+
+    /**
      * Partition counter update callback. May be overridden by plugin-provided subclasses.
      *
      * @param part Partition.

http://git-wip-us.apache.org/repos/asf/ignite/blob/b48a291e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index 08ce978..6835795 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -60,6 +60,8 @@ import org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapt
 import org.apache.ignite.internal.processors.cache.persistence.CacheSearchRow;
 import org.apache.ignite.internal.processors.cache.persistence.RootPage;
 import org.apache.ignite.internal.processors.cache.persistence.RowStore;
+import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
+import org.apache.ignite.internal.processors.cache.persistence.partstate.PartitionRecoverState;
 import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
 import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
 import org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPageIO;
@@ -258,6 +260,11 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
     }
 
     /** {@inheritDoc} */
+    @Override public long restorePartitionStates(Map<GroupPartitionId, PartitionRecoverState> partitionRecoveryStates) throws IgniteCheckedException {
+        return 0; // No-op.
+    }
+
+    /** {@inheritDoc} */
     @Override public void onKernalStop() {
         busyLock.block();
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/b48a291e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 44fc266..2e792f1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -92,6 +92,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.topology.Grid
 import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
 import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionsStateValidator;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator;
+import org.apache.ignite.internal.processors.cache.persistence.DatabaseLifecycleListener;
 import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotDiscoveryMessage;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -890,15 +891,11 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
             cctx.exchange().exchangerBlockingSectionBegin();
 
             try {
-                // Stop all recovered caches and groups.
-                cctx.cache().onKernalStopCaches(true);
+                List<DatabaseLifecycleListener> listeners = cctx.kernalContext().internalSubscriptionProcessor()
+                    .getDatabaseListeners();
 
-                cctx.cache().stopCaches(true);
-
-                cctx.database().cleanupRestoredCaches();
-
-                // Set initial node started marker.
-                cctx.database().nodeStart(null);
+                for (DatabaseLifecycleListener lsnr : listeners)
+                    lsnr.onBaselineChange();
             }
             finally {
                 cctx.exchange().exchangerBlockingSectionEnd();

http://git-wip-us.apache.org/repos/asf/ignite/blob/b48a291e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DatabaseLifecycleListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DatabaseLifecycleListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DatabaseLifecycleListener.java
index 1f7ba84..6762109 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DatabaseLifecycleListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DatabaseLifecycleListener.java
@@ -22,6 +22,7 @@ import org.apache.ignite.IgniteCheckedException;
 /**
  *
  */
+@SuppressWarnings("RedundantThrows")
 public interface DatabaseLifecycleListener {
     /**
      * Callback executed when data regions become to start-up.
@@ -29,7 +30,15 @@ public interface DatabaseLifecycleListener {
      * @param mgr Database shared manager.
      * @throws IgniteCheckedException If failed.
      */
-    default void onInitDataRegions(IgniteCacheDatabaseSharedManager mgr) throws IgniteCheckedException {};
+    public default void onInitDataRegions(IgniteCacheDatabaseSharedManager mgr) throws IgniteCheckedException {}
+
+    /**
+     * Callback executed when node detected that baseline topology is changed and node is not in that baseline.
+     * It's useful to cleanup and invalidate all available data restored at that moment.
+     *
+     * @throws IgniteCheckedException If failed.
+     */
+    public default void onBaselineChange() throws IgniteCheckedException {}
 
     /**
      * Callback executed right before node become perform binary recovery.
@@ -37,30 +46,46 @@ public interface DatabaseLifecycleListener {
      * @param mgr Database shared manager.
      * @throws IgniteCheckedException If failed.
      */
-    default void beforeBinaryMemoryRestore(IgniteCacheDatabaseSharedManager mgr) throws IgniteCheckedException {};
+    public default void beforeBinaryMemoryRestore(IgniteCacheDatabaseSharedManager mgr) throws IgniteCheckedException {}
 
     /**
      * Callback executed when binary memory has fully restored and WAL logging is resumed.
      *
-     * @param mgr Database shared manager.
+     * @param binaryState Result of binary recovery.
      * @throws IgniteCheckedException If failed.
      */
-    default void afterBinaryMemoryRestore(IgniteCacheDatabaseSharedManager mgr) throws IgniteCheckedException {};
+    public default void afterBinaryMemoryRestore(GridCacheDatabaseSharedManager.RestoreBinaryState binaryState)
+        throws IgniteCheckedException {}
 
     /**
+     * Callback executed when all logical updates were applied and page memory become to fully consistent state.
      *
-     * @param mgr
-     * @throws IgniteCheckedException
+     * @param logicalState Result of logical recovery.
+     * @throws IgniteCheckedException If failed.
      */
-    default void beforeResumeWalLogging(IgniteCacheDatabaseSharedManager mgr) throws IgniteCheckedException {};
+    public default void afterLogicalUpdatesApplied(GridCacheDatabaseSharedManager.RestoreLogicalState logicalState)
+        throws IgniteCheckedException {}
 
     /**
+     * Callback executed when all physical updates are applied and we are ready to write new physical records
+     * during logical recovery.
+     *
      * @param mgr Database shared manager.
+     * @throws IgniteCheckedException If failed.
      */
-    default void afterInitialise(IgniteCacheDatabaseSharedManager mgr) throws IgniteCheckedException {};
+    public default void beforeResumeWalLogging(IgniteCacheDatabaseSharedManager mgr) throws IgniteCheckedException {}
 
     /**
+     * Callback executed after all data regions are initialized.
+     *
+     * @param mgr Database shared manager.
+     */
+    public default void afterInitialise(IgniteCacheDatabaseSharedManager mgr) throws IgniteCheckedException {}
+
+    /**
+     * Callback executed before shared manager will be stopped.
+     *
      * @param mgr Database shared manager.
      */
-    default void beforeStop(IgniteCacheDatabaseSharedManager mgr) {};
+    public default void beforeStop(IgniteCacheDatabaseSharedManager mgr) {}
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/b48a291e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index 019d1aa..c74972a 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -93,7 +93,6 @@ import org.apache.ignite.internal.pagemem.wal.record.CacheState;
 import org.apache.ignite.internal.pagemem.wal.record.CheckpointRecord;
 import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
 import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
-import org.apache.ignite.internal.pagemem.wal.record.MemoryRecoveryRecord;
 import org.apache.ignite.internal.pagemem.wal.record.MetastoreDataRecord;
 import org.apache.ignite.internal.pagemem.wal.record.MvccDataEntry;
 import org.apache.ignite.internal.pagemem.wal.record.MvccTxRecord;
@@ -133,6 +132,7 @@ import org.apache.ignite.internal.processors.cache.persistence.partstate.Partiti
 import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteCacheSnapshotManager;
 import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotOperation;
 import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionMetaIO;
 import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer;
 import org.apache.ignite.internal.processors.cache.persistence.wal.crc.IgniteDataIntegrityViolationException;
 import org.apache.ignite.internal.processors.port.GridPortRecord;
@@ -153,6 +153,7 @@ import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.SB;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.internal.util.worker.GridWorker;
+import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.lang.IgniteOutClosure;
@@ -167,7 +168,6 @@ import org.jsr166.ConcurrentLinkedHashMap;
 
 import static java.nio.file.StandardOpenOption.READ;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_CHECKPOINT_READ_LOCK_TIMEOUT;
-import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_SKIP_CRC;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD;
 import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR;
 import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION;
@@ -192,9 +192,6 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
     private final boolean skipSync = IgniteSystemProperties.getBoolean(IGNITE_PDS_CHECKPOINT_TEST_SKIP_SYNC);
 
     /** */
-    private boolean skipCrc = IgniteSystemProperties.getBoolean(IGNITE_PDS_SKIP_CRC, false);
-
-    /** */
     private final int walRebalanceThreshold = IgniteSystemProperties.getInteger(
         IGNITE_PDS_WAL_REBALANCE_THRESHOLD, 500_000);
 
@@ -218,9 +215,6 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
     /** Checkpoint file name pattern. */
     public static final Pattern CP_FILE_NAME_PATTERN = Pattern.compile("(\\d+)-(.*)-(START|END)\\.bin");
 
-    /** Node started file suffix. */
-    public static final String NODE_STARTED_FILE_NAME_SUFFIX = "-node-started.bin";
-
     /** */
     private static final String MBEAN_NAME = "DataStorageMetrics";
 
@@ -362,6 +356,10 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
     /** Timeout for checkpoint read lock acquisition in milliseconds. */
     private volatile long checkpointReadLockTimeout;
 
+    /** Flag allows to log additional information about partitions during recovery phases. */
+    private final boolean recoveryVerboseLogging = IgniteSystemProperties.getBoolean(
+            IgniteSystemProperties.IGNITE_RECOVERY_VERBOSE_LOGGING, true);
+
     /**
      * @param ctx Kernal context.
      */
@@ -484,6 +482,8 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
         final GridKernalContext kernalCtx = cctx.kernalContext();
 
         if (!kernalCtx.clientNode()) {
+            kernalCtx.internalSubscriptionProcessor().registerDatabaseListener(new MetastorageRecoveryLifecycle());
+
             checkpointer = new Checkpointer(cctx.igniteInstanceName(), "db-checkpoint-thread", log);
 
             cpHistory = new CheckpointHistory(kernalCtx);
@@ -543,7 +543,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
                 continue;
 
             if (log.isInfoEnabled())
-                log.info("Page memory " + region + " for " + grpDesc + " has invalidated.");
+                log.info("Page memory " + region.config().getName() + " for " + grpDesc + " has invalidated.");
 
             int partitions = grpDesc.config().getAffinity().partitions();
 
@@ -895,31 +895,40 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
      * @return Last seen WAL pointer during binary memory recovery.
      * @throws IgniteCheckedException If failed.
      */
-    private WALPointer restoreBinaryMemory(Predicate<Integer> cacheGroupsPredicate) throws IgniteCheckedException {
-        assert !cctx.kernalContext().clientNode();
-
+    private RestoreBinaryState restoreBinaryMemory(Predicate<Integer> cacheGroupsPredicate) throws IgniteCheckedException {
         long time = System.currentTimeMillis();
 
-        checkpointReadLock();
-
         try {
+            log.info("Starting binary memory restore for: " + cctx.cache().cacheGroupDescriptors().keySet());
+
+            for (DatabaseLifecycleListener lsnr : getDatabaseListeners(cctx.kernalContext()))
+                lsnr.beforeBinaryMemoryRestore(this);
+
             CheckpointStatus status = readCheckpointStatus();
 
             // First, bring memory to the last consistent checkpoint state if needed.
             // This method should return a pointer to the last valid record in the WAL.
-            WALPointer restored = performBinaryMemoryRestore(status, cacheGroupsPredicate, true);
+            RestoreBinaryState binaryState = performBinaryMemoryRestore(status, cacheGroupsPredicate, true);
+
+            WALPointer restored = binaryState.lastReadRecordPointer();
 
             if (restored == null && !status.endPtr.equals(CheckpointStatus.NULL_PTR)) {
                 throw new StorageException("The memory cannot be restored. The critical part of WAL archive is missing " +
                     "[tailWalPtr=" + restored + ", endPtr=" + status.endPtr + ']');
             }
+            else if (restored != null)
+                U.log(log, "Binary memory state restored at node startup [restoredPtr=" + restored + ']');
 
-            nodeStart(restored);
+            // Wal logging is now available.
+            cctx.wal().resumeLogging(restored);
+
+            for (DatabaseLifecycleListener lsnr : getDatabaseListeners(cctx.kernalContext()))
+                lsnr.afterBinaryMemoryRestore(binaryState);
 
             if (log.isInfoEnabled())
                 log.info("Binary recovery performed in " + (System.currentTimeMillis() - time) + " ms.");
 
-            return restored;
+            return binaryState;
         }
         catch (IgniteCheckedException e) {
             if (X.hasCause(e, StorageException.class, IOException.class))
@@ -927,97 +936,6 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
 
             throw e;
         }
-        finally {
-            checkpointReadUnlock();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void nodeStart(@Nullable WALPointer ptr) throws IgniteCheckedException {
-        FileWALPointer p = (FileWALPointer)(ptr == null ? CheckpointStatus.NULL_PTR : ptr);
-
-        String fileName = U.currentTimeMillis() + NODE_STARTED_FILE_NAME_SUFFIX;
-        String tmpFileName = fileName + FilePageStoreManager.TMP_SUFFIX;
-
-        ByteBuffer buf = ByteBuffer.allocate(FileWALPointer.POINTER_SIZE);
-        buf.order(ByteOrder.nativeOrder());
-
-        try {
-            try (FileIO io = ioFactory.create(Paths.get(cpDir.getAbsolutePath(), tmpFileName).toFile(),
-                    StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE)) {
-                buf.putLong(p.index());
-
-                buf.putInt(p.fileOffset());
-
-                buf.putInt(p.length());
-
-                buf.flip();
-
-                io.writeFully(buf);
-
-                buf.clear();
-
-                io.force(true);
-            }
-
-            Files.move(Paths.get(cpDir.getAbsolutePath(), tmpFileName), Paths.get(cpDir.getAbsolutePath(), fileName));
-        }
-        catch (IOException e) {
-            throw new StorageException("Failed to write node start marker: " + ptr, e);
-        }
-    }
-
-    /**
-     * Collects memory recovery pointers from node started files. See {@link #nodeStart(WALPointer)}.
-     * Each pointer associated with timestamp extracted from file.
-     * Tuples are sorted by timestamp.
-     *
-     * @return Sorted list of tuples (node started timestamp, memory recovery pointer).
-     *
-     * @throws IgniteCheckedException If failed.
-     */
-    public List<T2<Long, WALPointer>> nodeStartedPointers() throws IgniteCheckedException {
-        List<T2<Long, WALPointer>> res = new ArrayList<>();
-
-        try (DirectoryStream<Path> nodeStartedFiles = Files.newDirectoryStream(
-            cpDir.toPath(),
-            path -> path.toFile().getName().endsWith(NODE_STARTED_FILE_NAME_SUFFIX))
-        ) {
-            ByteBuffer buf = ByteBuffer.allocate(FileWALPointer.POINTER_SIZE);
-            buf.order(ByteOrder.nativeOrder());
-
-            for (Path path : nodeStartedFiles) {
-                File f = path.toFile();
-
-                String name = f.getName();
-
-                Long ts = Long.valueOf(name.substring(0, name.length() - NODE_STARTED_FILE_NAME_SUFFIX.length()));
-
-                try (FileIO io = ioFactory.create(f, READ)) {
-                    io.readFully(buf);
-
-                    buf.flip();
-
-                    FileWALPointer ptr = new FileWALPointer(
-                        buf.getLong(), buf.getInt(), buf.getInt());
-
-                    res.add(new T2<>(ts, ptr));
-
-                    buf.clear();
-                }
-                catch (IOException e) {
-                    throw new StorageException("Failed to read node started marker file: " + f.getAbsolutePath(), e);
-                }
-            }
-        }
-        catch (IOException e) {
-            throw new StorageException("Failed to retreive node started files.", e);
-        }
-
-        // Sort start markers by file timestamp.
-        res.sort(Comparator.comparingLong(IgniteBiTuple::get1));
-
-        return res;
     }
 
     /** {@inheritDoc} */
@@ -1357,7 +1275,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
                     cctx.database().checkpointReadLock();
 
                     try {
-                        cacheGroup.restorePartitionStates(Collections.emptyMap());
+                        cacheGroup.offheap().restorePartitionStates(Collections.emptyMap());
 
                         if (cacheGroup.localStartVersion().equals(fut.initialVersion()))
                             cacheGroup.topology().afterStateRestored(fut.initialVersion());
@@ -1504,14 +1422,12 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
             for (IgniteBiTuple<CacheGroupContext, Boolean> tup : stoppedGrps) {
                 CacheGroupContext grp = tup.get1();
 
-                if (grp.affinityNode()) {
-                    try {
-                        cctx.pageStore().shutdownForCacheGroup(grp, tup.get2());
-                    }
-                    catch (IgniteCheckedException e) {
-                        U.error(log, "Failed to gracefully clean page store resources for destroyed cache " +
-                            "[cache=" + grp.cacheOrGroupName() + "]", e);
-                    }
+                try {
+                    cctx.pageStore().shutdownForCacheGroup(grp, tup.get2());
+                }
+                catch (IgniteCheckedException e) {
+                    U.error(log, "Failed to gracefully clean page store resources for destroyed cache " +
+                        "[cache=" + grp.cacheOrGroupName() + "]", e);
                 }
             }
         }
@@ -1978,32 +1894,16 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
             // Preform early regions startup before restoring state.
             initAndStartRegions(kctx.config().getDataStorageConfiguration());
 
-            for (DatabaseLifecycleListener lsnr : getDatabaseListeners(kctx))
-                lsnr.beforeBinaryMemoryRestore(this);
-
-            log.info("Starting binary memory restore for: " + cctx.cache().cacheGroupDescriptors().keySet());
-
-            cctx.pageStore().initializeForMetastorage();
-
             // Restore binary memory for all not WAL disabled cache groups.
-            WALPointer restored = restoreBinaryMemory(
+            restoreBinaryMemory(
                     g -> !initiallyGlobalWalDisabledGrps.contains(g) && !initiallyLocalWalDisabledGrps.contains(g)
             );
 
-            if (restored != null)
-                U.log(log, "Binary memory state restored at node startup [restoredPtr=" + restored + ']');
-
-            for (DatabaseLifecycleListener lsnr : getDatabaseListeners(kctx))
-                lsnr.afterBinaryMemoryRestore(this);
+            if (recoveryVerboseLogging && log.isInfoEnabled()) {
+                log.info("Partition states information after BINARY RECOVERY phase:");
 
-            cctx.wal().resumeLogging(restored);
-
-            // We should log this record to ensure that node start marker pointer will be found in compacted segment.
-            cctx.wal().log(new MemoryRecoveryRecord(System.currentTimeMillis()));
-
-            assert metaStorage == null;
-
-            metaStorage = createMetastorage(false);
+                dumpPartitionsInfo(cctx, log);
+            }
 
             CheckpointStatus status = readCheckpointStatus();
 
@@ -2014,8 +1914,11 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
                     false
             );
 
-            // Restore state for all groups.
-            restorePartitionStates(cctx.cache().cacheGroups(), logicalState.partitionRecoveryStates);
+            if (recoveryVerboseLogging && log.isInfoEnabled()) {
+                log.info("Partition states information after LOGICAL RECOVERY phase:");
+
+                dumpPartitionsInfo(cctx, log);
+            }
 
             walTail = tailPointer(logicalState.lastRead);
 
@@ -2056,32 +1959,6 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
     }
 
     /**
-     * @param forGroups Cache groups.
-     * @param partitionStates Partition states.
-     * @throws IgniteCheckedException If failed.
-     */
-    private void restorePartitionStates(
-        Collection<CacheGroupContext> forGroups,
-        Map<GroupPartitionId, PartitionRecoverState> partitionStates
-    ) throws IgniteCheckedException {
-        long startRestorePart = U.currentTimeMillis();
-
-        if (log.isInfoEnabled())
-            log.info("Restoring partition state for local groups.");
-
-        long totalProcessed = 0;
-
-        for (CacheGroupContext grp : forGroups)
-            totalProcessed += grp.restorePartitionStates(partitionStates);
-
-        if (log.isInfoEnabled())
-            log.info("Finished restoring partition state for local groups [" +
-                "groupsProcessed" + forGroups.size() +
-                "partitionsProcessed=" + totalProcessed +
-                ", time=" + (U.currentTimeMillis() - startRestorePart) + "ms]");
-    }
-
-    /**
      * Called when all partitions have been fully restored and pre-created on node start.
      *
      * Starts checkpointing process and initiates first checkpoint.
@@ -2108,7 +1985,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
      * @throws IgniteCheckedException If failed.
      * @throws StorageException In case I/O error occurred during operations with storage.
      */
-    private WALPointer performBinaryMemoryRestore(
+    private RestoreBinaryState performBinaryMemoryRestore(
         CheckpointStatus status,
         Predicate<Integer> cacheGroupsPredicate,
         boolean finalizeState
@@ -2257,7 +2134,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
         if (!finalizeState)
             return null;
 
-        WALPointer lastReadPtr = restoreBinaryState.lastReadRecordPointer();
+        FileWALPointer lastReadPtr = restoreBinaryState.lastReadRecordPointer();
 
         if (status.needRestoreMemory()) {
             if (restoreBinaryState.needApplyBinaryUpdate())
@@ -2274,7 +2151,10 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
 
         cpHistory.initialize(retreiveHistory());
 
-        return lastReadPtr != null ? lastReadPtr.next() : null;
+        // Move pointer position to the end of last read record.
+        restoreBinaryState.lastRead = lastReadPtr != null ? lastReadPtr.next() : lastReadPtr;
+
+        return restoreBinaryState;
     }
 
     /**
@@ -2311,61 +2191,61 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
      * @param it WalIterator.
      * @param recPredicate Wal record filter.
      * @param entryPredicate Entry filter.
-     * @param partitionRecoveryStates Partition to restore state.
      */
     public void applyUpdatesOnRecovery(
         @Nullable WALIterator it,
-        IgnitePredicate<IgniteBiTuple<WALPointer, WALRecord>> recPredicate,
-        IgnitePredicate<DataEntry> entryPredicate,
-        Map<GroupPartitionId, PartitionRecoverState> partitionRecoveryStates
+        IgniteBiPredicate<WALPointer, WALRecord> recPredicate,
+        IgnitePredicate<DataEntry> entryPredicate
     ) throws IgniteCheckedException {
+        if (it == null)
+            return;
+
         cctx.walState().runWithOutWAL(() -> {
-            if (it != null) {
-                while (it.hasNext()) {
-                    IgniteBiTuple<WALPointer, WALRecord> next = it.next();
+            while (it.hasNext()) {
+                IgniteBiTuple<WALPointer, WALRecord> next = it.next();
 
-                    WALRecord rec = next.get2();
+                WALRecord rec = next.get2();
 
-                    if (!recPredicate.apply(next))
-                        break;
+                if (!recPredicate.apply(next.get1(), rec))
+                    break;
 
-                    switch (rec.type()) {
-                        case MVCC_DATA_RECORD:
+                switch (rec.type()) {
+                    case MVCC_DATA_RECORD:
                         case DATA_RECORD:
-                            checkpointReadLock();
+                        checkpointReadLock();
 
-                            try {
-                                DataRecord dataRec = (DataRecord)rec;
+                        try {
+                            DataRecord dataRec = (DataRecord)rec;
 
-                                for (DataEntry dataEntry : dataRec.writeEntries()) {
-                                    if (entryPredicate.apply(dataEntry)) {
-                                        checkpointReadLock();
+                            for (DataEntry dataEntry : dataRec.writeEntries()) {
+                                if (entryPredicate.apply(dataEntry)) {
+                                    checkpointReadLock();
 
-                                        try {
-                                            int cacheId = dataEntry.cacheId();
+                                    try {
+                                        int cacheId = dataEntry.cacheId();
 
-                                            GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
+                                        GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
 
-                                            if (cacheCtx != null)
-                                                applyUpdate(cacheCtx, dataEntry);
-                                            else if (log != null)
-                                                log.warning("Cache is not started. Updates cannot be applied " +
-                                                    "[cacheId=" + cacheId + ']');
-                                        }
-                                        finally {
-                                            checkpointReadUnlock();
-                                        }
+                                        if (cacheCtx != null)
+                                            applyUpdate(cacheCtx, dataEntry);
+                                        else if (log != null)
+                                            log.warning("Cache is not started. Updates cannot be applied " +
+                                                "[cacheId=" + cacheId + ']');
+                                    }
+                                    finally {
+                                        checkpointReadUnlock();
                                     }
                                 }
                             }
-                            catch (IgniteCheckedException e) {
-                                throw new IgniteException(e);
-                            }
-                            finally {
-                                checkpointReadUnlock();
-                            }
+                        }
+                        catch (IgniteCheckedException e) {
+                            throw new IgniteException(e);
+                        }
+                        finally {
+                            checkpointReadUnlock();
+                        }
 
-                            break;
+                        break;
 
                         case MVCC_TX_RECORD:
                             checkpointReadLock();
@@ -2386,23 +2266,10 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
 
                             break;
 
-                        default:
-                            // Skip other records.
-                    }
+                    default:
+                        // Skip other records.
                 }
             }
-
-            checkpointReadLock();
-
-            try {
-                restorePartitionStates(cctx.cache().cacheGroups(), partitionRecoveryStates);
-            }
-            catch (IgniteCheckedException e) {
-                throw new IgniteException(e);
-            }
-            finally {
-                checkpointReadUnlock();
-            }
         });
     }
 
@@ -2538,6 +2405,9 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
             log.info("Finished applying WAL changes [updatesApplied=" + applied +
                 ", time=" + (U.currentTimeMillis() - start) + " ms]");
 
+        for (DatabaseLifecycleListener lsnr : getDatabaseListeners(cctx.kernalContext()))
+            lsnr.afterLogicalUpdatesApplied(restoreLogicalState);
+
         return restoreLogicalState;
     }
 
@@ -4223,12 +4093,12 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
     /**
      *
      */
-    private static class CheckpointStatus {
+    public static class CheckpointStatus {
         /** Null checkpoint UUID. */
         private static final UUID NULL_UUID = new UUID(0L, 0L);
 
         /** Null WAL pointer. */
-        private static final WALPointer NULL_PTR = new FileWALPointer(0, 0, 0);
+        public static final WALPointer NULL_PTR = new FileWALPointer(0, 0, 0);
 
         /** */
         private long cpStartTs;
@@ -4693,6 +4563,97 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
     }
 
     /**
+     * Method dumps partitions info see {@link #dumpPartitionsInfo(CacheGroupContext, IgniteLogger)}
+     * for all persistent cache groups.
+     *
+     * @param cctx Shared context.
+     * @param log Logger.
+     * @throws IgniteCheckedException If failed.
+     */
+    private static void dumpPartitionsInfo(GridCacheSharedContext cctx, IgniteLogger log) throws IgniteCheckedException {
+        for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
+            if (grp.isLocal() || !grp.persistenceEnabled())
+                continue;
+
+            dumpPartitionsInfo(grp, log);
+        }
+    }
+
+    /**
+     * Retrieves from page memory meta information about given {@code grp} group partitions
+     * and dumps this information to log INFO level.
+     *
+     * @param grp Cache group.
+     * @param log Logger.
+     * @throws IgniteCheckedException If failed.
+     */
+    private static void dumpPartitionsInfo(CacheGroupContext grp, IgniteLogger log) throws IgniteCheckedException {
+        PageMemoryEx pageMem = (PageMemoryEx)grp.dataRegion().pageMemory();
+
+        IgnitePageStoreManager pageStore = grp.shared().pageStore();
+
+        assert pageStore != null : "Persistent cache should have initialize page store manager.";
+
+        for (int p = 0; p < grp.affinity().partitions(); p++) {
+            if (!pageStore.exists(grp.groupId(), p))
+                continue;
+
+            pageStore.ensure(grp.groupId(), p);
+
+            if (pageStore.pages(grp.groupId(), p) <= 1) {
+                log.info("Partition [id=" + p + ", state=N/A (only file header) ]");
+
+                continue;
+            }
+
+            long partMetaId = pageMem.partitionMetaPageId(grp.groupId(), p);
+            long partMetaPage = pageMem.acquirePage(grp.groupId(), partMetaId);
+
+            try {
+                long pageAddr = pageMem.readLock(grp.groupId(), partMetaId, partMetaPage);
+
+                try {
+                    PagePartitionMetaIO io = PagePartitionMetaIO.VERSIONS.forPage(pageAddr);
+
+                    GridDhtPartitionState partitionState = GridDhtPartitionState.fromOrdinal(io.getPartitionState(pageAddr));
+
+                    String state = partitionState != null ? partitionState.toString() : "N/A";
+
+                    long updateCounter = io.getUpdateCounter(pageAddr);
+                    long size = io.getSize(pageAddr);
+
+                    log.info("Partition [grp=" + grp.cacheOrGroupName()
+                            + ", id=" + p
+                            + ", state=" + state
+                            + ", counter=" + updateCounter
+                            + ", size=" + size + "]");
+                }
+                finally {
+                    pageMem.readUnlock(grp.groupId(), partMetaId, partMetaPage);
+                }
+            }
+            finally {
+                pageMem.releasePage(grp.groupId(), partMetaId, partMetaPage);
+            }
+        }
+    }
+
+    /**
+     * Recovery lifecycle for read-write metastorage.
+     */
+    private class MetastorageRecoveryLifecycle implements DatabaseLifecycleListener {
+        @Override public void beforeBinaryMemoryRestore(IgniteCacheDatabaseSharedManager mgr) throws IgniteCheckedException {
+            cctx.pageStore().initializeForMetastorage();
+        }
+
+        @Override public void afterBinaryMemoryRestore(RestoreBinaryState binaryState) throws IgniteCheckedException {
+            assert metaStorage == null;
+
+            metaStorage = createMetastorage(false);
+        }
+    }
+
+    /**
      * Abstract class for create restore context.
      */
     private abstract class RestoreStateContext {
@@ -4764,12 +4725,9 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
                                 // Filter data entries by group id.
                                 List<DataEntry> filteredEntries = dataRecord.writeEntries().stream()
                                         .filter(entry -> {
-                                            if (entry == null)
-                                                return false;
-
                                             int cacheId = entry.cacheId();
 
-                                            return cctx != null && cctx.cacheContext(cacheId) != null && cacheGroupPredicate.test(cctx.cacheContext(cacheId).groupId());
+                                            return cctx.cacheContext(cacheId) != null && cacheGroupPredicate.test(cctx.cacheContext(cacheId).groupId());
                                         })
                                         .collect(Collectors.toList());
 
@@ -4805,7 +4763,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
          *
          * @return Last read WAL record pointer.
          */
-        public WALPointer lastReadRecordPointer() {
+        public FileWALPointer lastReadRecordPointer() {
             return lastRead;
         }
 
@@ -4823,7 +4781,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
     /**
      * Restore memory context. Tracks the safety of binary recovery.
      */
-    private class RestoreBinaryState extends RestoreStateContext {
+    public class RestoreBinaryState extends RestoreStateContext {
         /** Checkpoint status. */
         private final CheckpointStatus status;
 
@@ -4898,7 +4856,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
     /**
      * Restore logical state context. Tracks the safety of logical recovery.
      */
-    private class RestoreLogicalState extends RestoreStateContext {
+    public class RestoreLogicalState extends RestoreStateContext {
         /** States of partitions recovered during applying logical updates. */
         private final Map<GroupPartitionId, PartitionRecoverState> partitionRecoveryStates = new HashMap<>();
 
@@ -4908,6 +4866,13 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
         public RestoreLogicalState(long lastArchivedSegment, Predicate<Integer> cacheGroupsPredicate) {
             super(lastArchivedSegment, cacheGroupsPredicate, false);
         }
+
+        /**
+         * @return Map of restored partition states for cache groups.
+         */
+        public Map<GroupPartitionId, PartitionRecoverState> partitionRecoveryStates() {
+            return Collections.unmodifiableMap(partitionRecoveryStates);
+        }
     }
 
     /** Indicates checkpoint read lock acquisition failure which did not lead to node invalidation. */

http://git-wip-us.apache.org/repos/asf/ignite/blob/b48a291e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index f24900f..73cb878 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -71,6 +71,7 @@ import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemor
 import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
 import org.apache.ignite.internal.processors.cache.persistence.partstate.PagesAllocationRange;
 import org.apache.ignite.internal.processors.cache.persistence.partstate.PartitionAllocationMap;
+import org.apache.ignite.internal.processors.cache.persistence.partstate.PartitionRecoverState;
 import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
 import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageMetaIO;
 import org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionCountersIO;
@@ -112,6 +113,9 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
     /** */
     private ReuseListImpl reuseList;
 
+    /** Flag indicates that all group partitions have restored their state from page memory / disk. */
+    private volatile boolean partitionStatesRestored;
+
     /** {@inheritDoc} */
     @Override protected void initPendingTree(GridCacheContext cctx) throws IgniteCheckedException {
         // No-op. Per-partition PendingTree should be used.
@@ -396,6 +400,142 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
             tryAddEmptyPartitionToSnapshot(store, ctx);
     }
 
+    /** {@inheritDoc} */
+    @Override public long restorePartitionStates(Map<GroupPartitionId, PartitionRecoverState> partitionRecoveryStates) throws IgniteCheckedException {
+        if (grp.isLocal() || !grp.affinityNode() || !grp.dataRegion().config().isPersistenceEnabled())
+            return 0;
+
+        if (partitionStatesRestored)
+            return 0;
+
+        long processed = 0;
+
+        PageMemoryEx pageMem = (PageMemoryEx)grp.dataRegion().pageMemory();
+
+        for (int p = 0; p < grp.affinity().partitions(); p++) {
+            PartitionRecoverState recoverState = partitionRecoveryStates.get(new GroupPartitionId(grp.groupId(), p));
+
+            if (ctx.pageStore().exists(grp.groupId(), p)) {
+                ctx.pageStore().ensure(grp.groupId(), p);
+
+                if (ctx.pageStore().pages(grp.groupId(), p) <= 1) {
+                    if (log.isDebugEnabled())
+                        log.debug("Skipping partition on recovery (pages less than 1) " +
+                            "[grp=" + grp.cacheOrGroupName() + ", p=" + p + "]");
+
+                    continue;
+                }
+
+                if (log.isDebugEnabled())
+                    log.debug("Creating partition on recovery (exists in page store) " +
+                        "[grp=" + grp.cacheOrGroupName() + ", p=" + p + "]");
+
+                processed++;
+
+                GridDhtLocalPartition part = grp.topology().forceCreatePartition(p);
+
+                onPartitionInitialCounterUpdated(p, 0);
+
+                ctx.database().checkpointReadLock();
+
+                try {
+                    long partMetaId = pageMem.partitionMetaPageId(grp.groupId(), p);
+                    long partMetaPage = pageMem.acquirePage(grp.groupId(), partMetaId);
+
+                    try {
+                        long pageAddr = pageMem.writeLock(grp.groupId(), partMetaId, partMetaPage);
+
+                        boolean changed = false;
+
+                        try {
+                            PagePartitionMetaIO io = PagePartitionMetaIO.VERSIONS.forPage(pageAddr);
+
+                            if (recoverState != null) {
+                                io.setPartitionState(pageAddr, (byte) recoverState.stateId());
+
+                                changed = updateState(part, recoverState.stateId());
+
+                                if (recoverState.stateId() == GridDhtPartitionState.OWNING.ordinal()
+                                    || (recoverState.stateId() == GridDhtPartitionState.MOVING.ordinal()
+                                    && part.initialUpdateCounter() < recoverState.updateCounter())) {
+                                    part.initialUpdateCounter(recoverState.updateCounter());
+
+                                    changed = true;
+                                }
+
+                                if (log.isDebugEnabled())
+                                    log.debug("Restored partition state (from WAL) " +
+                                        "[grp=" + grp.cacheOrGroupName() + ", p=" + p + ", state=" + part.state() +
+                                        ", updCntr=" + part.initialUpdateCounter() + "]");
+                            }
+                            else {
+                                int stateId = (int) io.getPartitionState(pageAddr);
+
+                                changed = updateState(part, stateId);
+
+                                if (log.isDebugEnabled())
+                                    log.debug("Restored partition state (from page memory) " +
+                                        "[grp=" + grp.cacheOrGroupName() + ", p=" + p + ", state=" + part.state() +
+                                        ", updCntr=" + part.initialUpdateCounter() + ", stateId=" + stateId + "]");
+                            }
+                        }
+                        finally {
+                            pageMem.writeUnlock(grp.groupId(), partMetaId, partMetaPage, null, changed);
+                        }
+                    }
+                    finally {
+                        pageMem.releasePage(grp.groupId(), partMetaId, partMetaPage);
+                    }
+                }
+                finally {
+                    ctx.database().checkpointReadUnlock();
+                }
+            }
+            else if (recoverState != null) {
+                GridDhtLocalPartition part = grp.topology().forceCreatePartition(p);
+
+                onPartitionInitialCounterUpdated(p, recoverState.updateCounter());
+
+                updateState(part, recoverState.stateId());
+
+                processed++;
+
+                if (log.isDebugEnabled())
+                    log.debug("Restored partition state (from WAL) " +
+                        "[grp=" + grp.cacheOrGroupName() + ", p=" + p + ", state=" + part.state() +
+                        ", updCntr=" + part.initialUpdateCounter() + "]");
+            }
+            else {
+                if (log.isDebugEnabled())
+                    log.debug("Skipping partition on recovery (no page store OR wal state) " +
+                        "[grp=" + grp.cacheOrGroupName() + ", p=" + p + "]");
+            }
+        }
+
+        partitionStatesRestored = true;
+
+        return processed;
+    }
+
+    /**
+     * @param part Partition to restore state for.
+     * @param stateId State enum ordinal.
+     * @return Updated flag.
+     */
+    private boolean updateState(GridDhtLocalPartition part, int stateId) {
+        if (stateId != -1) {
+            GridDhtPartitionState state = GridDhtPartitionState.fromOrdinal(stateId);
+
+            assert state != null;
+
+            part.restoreState(state == GridDhtPartitionState.EVICTED ? GridDhtPartitionState.RENTING : state);
+
+            return true;
+        }
+
+        return false;
+    }
+
     /**
      * Check that we need to snapshot this partition and add it to map.
      *

http://git-wip-us.apache.org/repos/asf/ignite/blob/b48a291e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
index 4966bca..7fc70d0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
@@ -697,21 +697,11 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap
     }
 
     /**
-     * Creates file with current timestamp and specific "node-started.bin" suffix
-     * and writes into memory recovery pointer.
-     *
-     * @param ptr Memory recovery wal pointer.
-     */
-    public void nodeStart(@Nullable WALPointer ptr) throws IgniteCheckedException {
-        // No-op.
-    }
-
-    /**
      * @param memPlcName Name of {@link DataRegion} to obtain {@link DataRegionMetrics} for.
      * @return {@link DataRegionMetrics} snapshot for specified {@link DataRegion} or {@code null} if
      * no {@link DataRegion} is configured for specified name.
      */
-    @Nullable public DataRegionMetrics memoryMetrics(String memPlcName) {
+    public @Nullable DataRegionMetrics memoryMetrics(String memPlcName) {
         if (!F.isEmpty(memMetricsMap)) {
             DataRegionMetrics memMetrics = memMetricsMap.get(memPlcName);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/b48a291e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWALPointer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWALPointer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWALPointer.java
index 6ea7e00..5e59178 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWALPointer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWALPointer.java
@@ -80,7 +80,7 @@ public class FileWALPointer implements WALPointer, Comparable<FileWALPointer> {
     }
 
     /** {@inheritDoc} */
-    @Override public WALPointer next() {
+    @Override public FileWALPointer next() {
         if (len == 0)
             throw new IllegalStateException("Failed to calculate next WAL pointer " +
                 "(this pointer is a terminal): " + this);

http://git-wip-us.apache.org/repos/asf/ignite/blob/b48a291e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
index 183e147..926e403 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
@@ -230,7 +230,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer {
         if (encSpi instanceof NoopEncryptionSpi)
             return false;
 
-        if (!(rec instanceof WalRecordCacheGroupAware) || rec instanceof MetastoreDataRecord)
+        if (!(rec instanceof WalRecordCacheGroupAware))
             return false;
 
         return needEncryption(((WalRecordCacheGroupAware)rec).groupId());