You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by dg...@apache.org on 2019/01/16 21:47:32 UTC

[ignite] branch master updated: IGNITE-10640 Create cluster-wide MetaStorage analogue - Fixes #5637.

This is an automated email from the ASF dual-hosted git repository.

dgovorukhin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new a2e1230  IGNITE-10640 Create cluster-wide MetaStorage analogue - Fixes #5637.
a2e1230 is described below

commit a2e1230aeec734aa0b09cc5688e266c645666ada
Author: ibessonov <be...@gmail.com>
AuthorDate: Tue Dec 25 14:01:50 2018 +0300

    IGNITE-10640 Create cluster-wide MetaStorage analogue - Fixes #5637.
    
    Signed-off-by: Dmitriy Govorukhin <dm...@gmail.com>
---
 .../org/apache/ignite/IgniteSystemProperties.java  |    6 +
 .../org/apache/ignite/internal/GridComponent.java  |    5 +-
 .../apache/ignite/internal/GridKernalContext.java  |   18 +-
 .../ignite/internal/GridKernalContextImpl.java     |   12 +
 .../org/apache/ignite/internal/IgniteKernal.java   |    2 +
 .../managers/communication/GridIoManager.java      |    8 +-
 .../managers/encryption/GridEncryptionManager.java |   18 +-
 .../IgniteAuthenticationProcessor.java             |   14 +-
 .../processors/cache/GridCacheIoManager.java       |   23 +-
 .../preloader/GridDhtPartitionsExchangeFuture.java |   36 +-
 .../GridCacheDatabaseSharedManager.java            |   36 +-
 .../cache/persistence/metastorage/MetaStorage.java |  132 ++-
 .../metastorage/ReadOnlyMetastorage.java           |   21 +-
 .../metastorage/ReadWriteMetastorage.java          |    3 +
 .../wal/reader/StandaloneGridKernalContext.java    |    6 +
 .../cluster/GridClusterStateProcessor.java         |   12 +-
 .../metastorage/DistributedMetaStorage.java        |   74 ++
 .../DistributedMetaStorageListener.java            |   45 +
 .../DistributedMetastorageLifecycleListener.java   |   43 +
 .../ReadableDistributedMetaStorage.java            |   62 ++
 .../persistence/DistributedMetaStorageBridge.java  |   91 ++
 .../DistributedMetaStorageCasAckMessage.java}      |   34 +-
 .../DistributedMetaStorageCasMessage.java          |   67 ++
 .../DistributedMetaStorageClusterNodeData.java     |   52 +
 .../DistributedMetaStorageHistoryItem.java         |   76 ++
 .../persistence/DistributedMetaStorageImpl.java    | 1169 ++++++++++++++++++++
 .../DistributedMetaStorageJoiningNodeData.java}    |   33 +-
 .../DistributedMetaStorageUpdateAckMessage.java    |   97 ++
 .../DistributedMetaStorageUpdateMessage.java       |  119 ++
 .../persistence/DistributedMetaStorageUtil.java    |  109 ++
 .../persistence/DistributedMetaStorageVersion.java |  159 +++
 .../EmptyDistributedMetaStorageBridge.java         |   64 ++
 ...InMemoryCachedDistributedMetaStorageBridge.java |  109 ++
 .../NotAvailableDistributedMetaStorageBridge.java  |   62 ++
 .../ReadOnlyDistributedMetaStorageBridge.java      |  212 ++++
 .../persistence/StartupExtras.java}                |   19 +-
 .../WritableDistributedMetaStorageBridge.java      |  163 +++
 .../GridInternalSubscriptionProcessor.java         |   25 +-
 .../ignite/marshaller/jdk/JdkMarshaller.java       |    3 +
 .../encryption/EncryptedCacheDestroyTest.java      |    4 +-
 .../metastorage/IgniteMetaStorageBasicTest.java    |   75 +-
 .../DistributedMetaStoragePersistentTest.java      |  698 ++++++++++++
 .../metastorage/DistributedMetaStorageTest.java    |  366 ++++++
 .../ignite/testsuites/IgnitePdsTestSuite.java      |    4 +
 .../persistence/db/wal/IgniteWalRecoveryTest.java  |   28 +-
 45 files changed, 4212 insertions(+), 202 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index 3b3a00b..b20d8ad 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -29,6 +29,7 @@ import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.internal.client.GridClient;
 import org.apache.ignite.internal.marshaller.optimized.OptimizedMarshaller;
+import org.apache.ignite.internal.processors.metastorage.DistributedMetaStorage;
 import org.apache.ignite.internal.processors.rest.GridRestCommand;
 import org.apache.ignite.internal.util.GridLogThrottle;
 import org.apache.ignite.stream.StreamTransformer;
@@ -1089,6 +1090,11 @@ public final class IgniteSystemProperties {
         "IGNITE_SQL_MAX_EXTRACTED_PARTS_FROM_BETWEEN";
 
     /**
+     * Maximum amount of bytes that can be stored in history of {@link DistributedMetaStorage} updates.
+     */
+    public static final String IGNITE_GLOBAL_METASTORAGE_HISTORY_MAX_BYTES = "IGNITE_GLOBAL_METASTORAGE_HISTORY_MAX_BYTES";
+
+    /**
      * Enforces singleton.
      */
     private IgniteSystemProperties() {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java b/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
index 1f9e02e..2e86198 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
@@ -73,7 +73,10 @@ public interface GridComponent {
         ENCRYPTION_MGR,
 
         /** Service processor. */
-        SERVICE_PROC
+        SERVICE_PROC,
+
+        /** Distributed MetaStorage processor. */
+        META_STORAGE;
     }
 
     /**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
index 691fe37..9651290 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
@@ -33,19 +33,16 @@ import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
 import org.apache.ignite.internal.managers.failover.GridFailoverManager;
 import org.apache.ignite.internal.managers.indexing.GridIndexingManager;
 import org.apache.ignite.internal.managers.loadbalancer.GridLoadBalancerManager;
-import org.apache.ignite.internal.processors.cache.mvcc.MvccProcessor;
-import org.apache.ignite.internal.stat.IoStatisticsManager;
-import org.apache.ignite.internal.processors.compress.CompressionProcessor;
-import org.apache.ignite.internal.processors.service.ServiceProcessorAdapter;
-import org.apache.ignite.internal.worker.WorkersRegistry;
 import org.apache.ignite.internal.processors.affinity.GridAffinityProcessor;
 import org.apache.ignite.internal.processors.authentication.IgniteAuthenticationProcessor;
 import org.apache.ignite.internal.processors.cache.GridCacheProcessor;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccProcessor;
 import org.apache.ignite.internal.processors.cache.persistence.filename.PdsFoldersResolver;
 import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;
 import org.apache.ignite.internal.processors.closure.GridClosureProcessor;
 import org.apache.ignite.internal.processors.cluster.ClusterProcessor;
 import org.apache.ignite.internal.processors.cluster.GridClusterStateProcessor;
+import org.apache.ignite.internal.processors.compress.CompressionProcessor;
 import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
 import org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor;
 import org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor;
@@ -57,6 +54,7 @@ import org.apache.ignite.internal.processors.igfs.IgfsProcessorAdapter;
 import org.apache.ignite.internal.processors.job.GridJobProcessor;
 import org.apache.ignite.internal.processors.jobmetrics.GridJobMetricsProcessor;
 import org.apache.ignite.internal.processors.marshaller.GridMarshallerMappingProcessor;
+import org.apache.ignite.internal.processors.metastorage.DistributedMetaStorage;
 import org.apache.ignite.internal.processors.odbc.ClientListenerProcessor;
 import org.apache.ignite.internal.processors.platform.PlatformProcessor;
 import org.apache.ignite.internal.processors.plugin.IgnitePluginProcessor;
@@ -68,14 +66,17 @@ import org.apache.ignite.internal.processors.rest.GridRestProcessor;
 import org.apache.ignite.internal.processors.schedule.IgniteScheduleProcessorAdapter;
 import org.apache.ignite.internal.processors.security.GridSecurityProcessor;
 import org.apache.ignite.internal.processors.segmentation.GridSegmentationProcessor;
+import org.apache.ignite.internal.processors.service.ServiceProcessorAdapter;
 import org.apache.ignite.internal.processors.session.GridTaskSessionProcessor;
 import org.apache.ignite.internal.processors.subscription.GridInternalSubscriptionProcessor;
 import org.apache.ignite.internal.processors.task.GridTaskProcessor;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
+import org.apache.ignite.internal.stat.IoStatisticsManager;
 import org.apache.ignite.internal.suggestions.GridPerformanceSuggestions;
 import org.apache.ignite.internal.util.IgniteExceptionRegistry;
 import org.apache.ignite.internal.util.StripedExecutor;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.worker.WorkersRegistry;
 import org.apache.ignite.plugin.PluginNotFoundException;
 import org.apache.ignite.plugin.PluginProvider;
 import org.apache.ignite.thread.IgniteStripedThreadPoolExecutor;
@@ -206,6 +207,13 @@ public interface GridKernalContext extends Iterable<GridComponent> {
     public GridClusterStateProcessor state();
 
     /**
+     * Gets global metastorage.
+     *
+     * @return Global metastorage.
+     */
+    public DistributedMetaStorage distributedMetastorage();
+
+    /**
      * Gets task session processor.
      *
      * @return Session processor.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
index 1219d00..cc18d49 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
@@ -72,6 +72,7 @@ import org.apache.ignite.internal.processors.igfs.IgfsProcessorAdapter;
 import org.apache.ignite.internal.processors.job.GridJobProcessor;
 import org.apache.ignite.internal.processors.jobmetrics.GridJobMetricsProcessor;
 import org.apache.ignite.internal.processors.marshaller.GridMarshallerMappingProcessor;
+import org.apache.ignite.internal.processors.metastorage.DistributedMetaStorage;
 import org.apache.ignite.internal.processors.nodevalidation.DiscoveryNodeValidationProcessor;
 import org.apache.ignite.internal.processors.odbc.ClientListenerProcessor;
 import org.apache.ignite.internal.processors.platform.PlatformProcessor;
@@ -218,6 +219,10 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
     @GridToStringInclude
     private GridClusterStateProcessor stateProc;
 
+    /** Global metastorage. */
+    @GridToStringInclude
+    private DistributedMetaStorage distributedMetastorage;
+
     /** */
     @GridToStringInclude
     private GridTaskSessionProcessor sesProc;
@@ -602,6 +607,8 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
             cacheProc = (GridCacheProcessor)comp;
         else if (comp instanceof GridClusterStateProcessor)
             stateProc = (GridClusterStateProcessor)comp;
+        else if (comp instanceof DistributedMetaStorage)
+            distributedMetastorage = (DistributedMetaStorage)comp;
         else if (comp instanceof GridTaskSessionProcessor)
             sesProc = (GridTaskSessionProcessor)comp;
         else if (comp instanceof GridPortProcessor)
@@ -752,6 +759,11 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
     }
 
     /** {@inheritDoc} */
+    @Override public DistributedMetaStorage distributedMetastorage() {
+        return distributedMetastorage;
+    }
+
+    /** {@inheritDoc} */
     @Override public GridTaskSessionProcessor session() {
         return sesProc;
     }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 9633f89..30af4f4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -149,6 +149,7 @@ import org.apache.ignite.internal.processors.hadoop.HadoopProcessorAdapter;
 import org.apache.ignite.internal.processors.job.GridJobProcessor;
 import org.apache.ignite.internal.processors.jobmetrics.GridJobMetricsProcessor;
 import org.apache.ignite.internal.processors.marshaller.GridMarshallerMappingProcessor;
+import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageImpl;
 import org.apache.ignite.internal.processors.nodevalidation.DiscoveryNodeValidationProcessor;
 import org.apache.ignite.internal.processors.nodevalidation.OsDiscoveryNodeValidationProcessor;
 import org.apache.ignite.internal.processors.odbc.ClientListenerProcessor;
@@ -1035,6 +1036,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
                 startProcessor(new DataStructuresProcessor(ctx));
                 startProcessor(createComponent(PlatformProcessor.class, ctx));
                 startProcessor(new GridMarshallerMappingProcessor(ctx));
+                startProcessor(new DistributedMetaStorageImpl(ctx));
 
                 // Start plugins.
                 for (PluginProvider provider : ctx.plugins().allProviders()) {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 09c7b96..c1e1684 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -205,11 +205,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
     private final AtomicLong ioTestId = new AtomicLong();
 
     /** No-op runnable. */
-    private static final IgniteRunnable NOOP = new IgniteRunnable() {
-        @Override public void run() {
-            // No-op.
-        }
-    };
+    private static final IgniteRunnable NOOP = () -> {};
 
     /**
      * @param ctx Grid kernal context.
@@ -301,7 +297,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
                     throws IgniteCheckedException {
 
                     return new DirectMessageReader(msgFactory,
-                        rmtNodeId != null ? U.directProtocolVersion(ctx, rmtNodeId) : GridIoManager.DIRECT_PROTO_VER);
+                        rmtNodeId != null ? U.directProtocolVersion(ctx, rmtNodeId) : DIRECT_PROTO_VER);
                 }
             };
         }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/GridEncryptionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/GridEncryptionManager.java
index 5667676..dd2db02 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/GridEncryptionManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/GridEncryptionManager.java
@@ -33,7 +33,6 @@ import java.util.concurrent.ConcurrentMap;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.spi.encryption.EncryptionSpi;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.managers.GridManagerAdapter;
@@ -53,7 +52,6 @@ import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.lang.IgniteFutureCancelledException;
-import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.lang.IgniteProductVersion;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.spi.IgniteNodeValidationResult;
@@ -62,6 +60,7 @@ import org.apache.ignite.spi.discovery.DiscoveryDataBag.GridDiscoveryData;
 import org.apache.ignite.spi.discovery.DiscoveryDataBag.JoiningNodeDiscoveryData;
 import org.apache.ignite.spi.discovery.DiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.encryption.EncryptionSpi;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
@@ -129,10 +128,6 @@ public class GridEncryptionManager extends GridManagerAdapter<EncryptionSpi> imp
     /** Prefix for a encryption group key in meta store. */
     public static final String ENCRYPTION_KEY_PREFIX = "grp-encryption-key-";
 
-    /** Encryption key predicate for meta store. */
-    private static final IgnitePredicate<String> ENCRYPTION_KEY_PREFIX_PRED =
-        (IgnitePredicate<String>)key -> key.startsWith(ENCRYPTION_KEY_PREFIX);
-
     /** Group encryption keys. */
     private final ConcurrentHashMap<Integer, Serializable> grpEncKeys = new ConcurrentHashMap<>();
 
@@ -551,18 +546,13 @@ public class GridEncryptionManager extends GridManagerAdapter<EncryptionSpi> imp
     /** {@inheritDoc} */
     @Override public void onReadyForRead(ReadOnlyMetastorage metastorage) {
         try {
-            Map<String, ? extends Serializable> encKeys = metastorage.readForPredicate(ENCRYPTION_KEY_PREFIX_PRED);
-
-            if (encKeys.isEmpty())
-                return;
-
-            for (String key : encKeys.keySet()) {
+            metastorage.iterate(ENCRYPTION_KEY_PREFIX, (key, val) -> {
                 Integer grpId = Integer.valueOf(key.replace(ENCRYPTION_KEY_PREFIX, ""));
 
-                byte[] encGrpKey = (byte[])encKeys.get(key);
+                byte[] encGrpKey = (byte[])val;
 
                 grpEncKeys.putIfAbsent(grpId, getSpi().decryptKey(encGrpKey));
-            }
+            }, true);
 
             if (!grpEncKeys.isEmpty()) {
                 U.quietAndInfo(log, "Encryption keys loaded from metastore. [grps=" +
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/authentication/IgniteAuthenticationProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/authentication/IgniteAuthenticationProcessor.java
index 98f4201..5a696f3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/authentication/IgniteAuthenticationProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/authentication/IgniteAuthenticationProcessor.java
@@ -61,7 +61,6 @@ import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.internal.util.worker.GridWorker;
 import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.lang.IgniteFutureCancelledException;
-import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.spi.IgniteNodeValidationResult;
 import org.apache.ignite.spi.discovery.DiscoveryDataBag;
@@ -387,11 +386,11 @@ public class IgniteAuthenticationProcessor extends GridProcessorAdapter implemen
         if (!ctx.clientNode()) {
             users = new ConcurrentHashMap<>();
 
-            Map<String, User> readUsers = (Map<String, User>)metastorage.readForPredicate(
-                (IgnitePredicate<String>)key -> key != null && key.startsWith(STORE_USER_PREFIX));
+            metastorage.iterate(STORE_USER_PREFIX, (key, val) -> {
+                User u = (User)val;
 
-            for (User u : readUsers.values())
                 users.put(u.name(), u);
+            }, true);
         }
         else
             users = null;
@@ -1318,10 +1317,11 @@ public class IgniteAuthenticationProcessor extends GridProcessorAdapter implemen
                 sharedCtx.database().checkpointReadLock();
 
             try {
-                Map<String, User> existUsrs = (Map<String, User>)metastorage.readForPredicate(
-                    (IgnitePredicate<String>)key -> key != null && key.startsWith(STORE_USER_PREFIX));
+                Set<String> existUsrsKeys = new HashSet<>();
 
-                for (String key : existUsrs.keySet())
+                metastorage.iterate(STORE_USER_PREFIX, (key, val) -> existUsrsKeys.add(key), false);
+
+                for (String key : existUsrsKeys)
                     metastorage.remove(key);
 
                 for (User u : newUsrs)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 710931f..06c6267 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -1374,10 +1374,11 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
      * @param type Type of message.
      * @param c Handler.
      */
-    public void addCacheHandler(
+    public <Msg extends GridCacheMessage> void addCacheHandler(
         int hndId,
-        Class<? extends GridCacheMessage> type,
-        IgniteBiInClosure<UUID, ? extends GridCacheMessage> c) {
+        Class<Msg> type,
+        IgniteBiInClosure<UUID, ? super Msg> c
+    ) {
         assert !type.isAssignableFrom(GridCacheGroupIdMessage.class) : type;
 
         addHandler(hndId, type, c, cacheHandlers);
@@ -1388,10 +1389,11 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
      * @param type Type of message.
      * @param c Handler.
      */
-    public void addCacheGroupHandler(
+    public <Msg extends GridCacheGroupIdMessage> void addCacheGroupHandler(
         int hndId,
-        Class<? extends GridCacheGroupIdMessage> type,
-        IgniteBiInClosure<UUID, ? extends GridCacheMessage> c) {
+        Class<Msg> type,
+        IgniteBiInClosure<UUID, ? super Msg> c
+    ) {
         assert !type.isAssignableFrom(GridCacheIdMessage.class) : type;
 
         addHandler(hndId, type, c, grpHandlers);
@@ -1403,11 +1405,12 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
      * @param c Handler.
      * @param msgHandlers Message handlers.
      */
-    private void addHandler(
+    private <Msg extends GridCacheMessage> void addHandler(
         int hndId,
-        Class<? extends GridCacheMessage> type,
-        IgniteBiInClosure<UUID, ? extends GridCacheMessage> c,
-        MessageHandlers msgHandlers) {
+        Class<Msg> type,
+        IgniteBiInClosure<UUID, ? super Msg> c,
+        MessageHandlers msgHandlers
+    ) {
         int msgIdx = messageIndex(type);
 
         if (msgIdx != -1) {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index a536011..8350fe6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -53,6 +53,7 @@ import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.failure.FailureContext;
 import org.apache.ignite.failure.FailureType;
+import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
 import org.apache.ignite.internal.IgniteDiagnosticAware;
 import org.apache.ignite.internal.IgniteDiagnosticPrepareContext;
@@ -101,6 +102,7 @@ import org.apache.ignite.internal.processors.cluster.BaselineTopology;
 import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
 import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage;
 import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState;
+import org.apache.ignite.internal.processors.cluster.IgniteChangeGlobalStateSupport;
 import org.apache.ignite.internal.processors.service.GridServiceProcessor;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.TimeBag;
@@ -1063,7 +1065,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
         assert req != null : exchActions;
 
-        DiscoveryDataClusterState state = cctx.kernalContext().state().clusterState();
+        GridKernalContext kctx = cctx.kernalContext();
+
+        DiscoveryDataClusterState state = kctx.state().clusterState();
 
         if (state.transitionError() != null)
             exchangeLocE = state.transitionError();
@@ -1072,7 +1076,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
             if (req.activate()) {
                 if (log.isInfoEnabled()) {
                     log.info("Start activation process [nodeId=" + cctx.localNodeId() +
-                        ", client=" + cctx.kernalContext().clientNode() +
+                        ", client=" + kctx.clientNode() +
                         ", topVer=" + initialVersion() + "]");
                 }
 
@@ -1093,7 +1097,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                     try {
                         registerCachesFuture = cctx.affinity().onCacheChangeRequest(this, crd, exchActions);
 
-                        if (!cctx.kernalContext().clientNode())
+                        if (!kctx.clientNode())
                             cctx.cache().shutdownNotFinishedRecoveryCaches();
                     }
                     finally {
@@ -1102,13 +1106,13 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
                     if (log.isInfoEnabled()) {
                         log.info("Successfully activated caches [nodeId=" + cctx.localNodeId() +
-                            ", client=" + cctx.kernalContext().clientNode() +
+                            ", client=" + kctx.clientNode() +
                             ", topVer=" + initialVersion() + "]");
                     }
                 }
                 catch (Exception e) {
                     U.error(log, "Failed to activate node components [nodeId=" + cctx.localNodeId() +
-                        ", client=" + cctx.kernalContext().clientNode() +
+                        ", client=" + kctx.clientNode() +
                         ", topVer=" + initialVersion() + "]", e);
 
                     exchangeLocE = e;
@@ -1130,34 +1134,36 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
             else {
                 if (log.isInfoEnabled()) {
                     log.info("Start deactivation process [nodeId=" + cctx.localNodeId() +
-                        ", client=" + cctx.kernalContext().clientNode() +
+                        ", client=" + kctx.clientNode() +
                         ", topVer=" + initialVersion() + "]");
                 }
 
                 cctx.exchange().exchangerBlockingSectionBegin();
 
                 try {
-                    cctx.kernalContext().dataStructures().onDeActivate(cctx.kernalContext());
+                    ((IgniteChangeGlobalStateSupport)kctx.distributedMetastorage()).onDeActivate(kctx);
+
+                    kctx.dataStructures().onDeActivate(kctx);
 
                     if (cctx.kernalContext().service() instanceof GridServiceProcessor)
-                        ((GridServiceProcessor)cctx.kernalContext().service()).onDeActivate(cctx.kernalContext());
+                        ((GridServiceProcessor)kctx.service()).onDeActivate(cctx.kernalContext());
 
                     assert registerCachesFuture == null : "No caches registration should be scheduled before new caches have started.";
 
                     registerCachesFuture = cctx.affinity().onCacheChangeRequest(this, crd, exchActions);
 
-                    cctx.kernalContext().encryption().onDeActivate(cctx.kernalContext());
+                    kctx.encryption().onDeActivate(kctx);
 
                     if (log.isInfoEnabled()) {
                         log.info("Successfully deactivated data structures, services and caches [" +
                             "nodeId=" + cctx.localNodeId() +
-                            ", client=" + cctx.kernalContext().clientNode() +
+                            ", client=" + kctx.clientNode() +
                             ", topVer=" + initialVersion() + "]");
                     }
                 }
                 catch (Exception e) {
                     U.error(log, "Failed to deactivate node components [nodeId=" + cctx.localNodeId() +
-                        ", client=" + cctx.kernalContext().clientNode() +
+                        ", client=" + kctx.clientNode() +
                         ", topVer=" + initialVersion() + "]", e);
 
                     exchangeLocE = e;
@@ -1181,13 +1187,13 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                     cctx.affinity().onBaselineTopologyChanged(this, crd);
                 }
 
-                if (CU.isPersistenceEnabled(cctx.kernalContext().config()) && !cctx.kernalContext().clientNode())
-                    cctx.kernalContext().state().onBaselineTopologyChanged(req.baselineTopology(),
+                if (CU.isPersistenceEnabled(kctx.config()) && !kctx.clientNode())
+                    kctx.state().onBaselineTopologyChanged(req.baselineTopology(),
                         req.prevBaselineTopologyHistoryItem());
             }
             catch (Exception e) {
                 U.error(log, "Failed to change baseline topology [nodeId=" + cctx.localNodeId() +
-                    ", client=" + cctx.kernalContext().clientNode() +
+                    ", client=" + kctx.clientNode() +
                     ", topVer=" + initialVersion() + "]", e);
 
                 exchangeLocE = e;
@@ -1197,7 +1203,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
             }
         }
 
-        return cctx.kernalContext().clientNode() ? ExchangeType.CLIENT : ExchangeType.ALL;
+        return kctx.clientNode() ? ExchangeType.CLIENT : ExchangeType.ALL;
     }
 
     /**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index c85099f..d7ad8c1 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -241,13 +241,6 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
     /** Prefix for meta store records which means that checkpoint entry for some group is not applicable for WAL rebalance. */
     private static final String CHECKPOINT_INAPPLICABLE_FOR_REBALANCE = "cp-wal-rebalance-inapplicable-";
 
-    /** WAL marker predicate for meta store. */
-    private static final IgnitePredicate<String> WAL_KEY_PREFIX_PRED = new IgnitePredicate<String>() {
-        @Override public boolean apply(String key) {
-            return key.startsWith(WAL_KEY_PREFIX);
-        }
-    };
-
     /** Timeout between partition file destroy and checkpoint to handle it. */
     private static final long PARTITION_DESTROY_CHECKPOINT_TIMEOUT = 30 * 1000; // 30 Seconds.
 
@@ -2214,9 +2207,9 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
                             if (pageMem == null)
                                 break;
 
-                                // Here we do not require tag check because we may be applying memory changes after
-                                // several repetitive restarts and the same pages may have changed several times.
-                                long page = pageMem.acquirePage(grpId, pageId, IoStatisticsHolderNoOp.INSTANCE, true);
+                            // Here we do not require tag check because we may be applying memory changes after
+                            // several repetitive restarts and the same pages may have changed several times.
+                            long page = pageMem.acquirePage(grpId, pageId, IoStatisticsHolderNoOp.INSTANCE, true);
 
                             try {
                                 long pageAddr = pageMem.writeLock(grpId, pageId, page, true);
@@ -4616,23 +4609,16 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
         assert metaStorage != null;
 
         try {
-            Set<String> keys = metaStorage.readForPredicate(WAL_KEY_PREFIX_PRED).keySet();
-
-            if (keys.isEmpty())
-                return;
-
-            for (String key : keys) {
+            metaStorage.iterate(WAL_KEY_PREFIX, (key, val) -> {
                 T2<Integer, Boolean> t2 = walKeyToGroupIdAndLocalFlag(key);
 
-                if (t2 == null)
-                    continue;
-
-                if (t2.get2())
-                    initiallyLocalWalDisabledGrps.add(t2.get1());
-                else
-                    initiallyGlobalWalDisabledGrps.add(t2.get1());
-            }
-
+                if (t2 != null) {
+                    if (t2.get2())
+                        initiallyLocalWalDisabledGrps.add(t2.get1());
+                    else
+                        initiallyGlobalWalDisabledGrps.add(t2.get1());
+                }
+            }, false);
         }
         catch (IgniteCheckedException e) {
             throw new IgniteException("Failed to read cache groups WAL state.", e);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java
index 7ff8257..0906605 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java
@@ -27,13 +27,14 @@ import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
 import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiConsumer;
 import java.util.stream.Stream;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
@@ -72,7 +73,6 @@ import org.apache.ignite.internal.util.lang.GridCursor;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiTuple;
-import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.marshaller.Marshaller;
 import org.apache.ignite.marshaller.jdk.JdkMarshaller;
 import org.jetbrains.annotations.NotNull;
@@ -85,7 +85,7 @@ import static org.apache.ignite.internal.pagemem.PageIdUtils.pageId;
 /**
  * General purpose key-value local-only storage.
  */
-public class MetaStorage implements DbCheckpointListener, ReadOnlyMetastorage, ReadWriteMetastorage {
+public class MetaStorage implements DbCheckpointListener, ReadWriteMetastorage {
     /** */
     public static final String METASTORAGE_CACHE_NAME = "MetaStorage";
 
@@ -139,10 +139,10 @@ public class MetaStorage implements DbCheckpointListener, ReadOnlyMetastorage, R
     private FreeListImpl freeList;
 
     /** */
-    private Map<String, byte[]> lastUpdates;
+    private SortedMap<String, byte[]> lastUpdates;
 
     /** */
-    private final Marshaller marshaller = new JdkMarshaller();
+    private final Marshaller marshaller = JdkMarshaller.DEFAULT;
 
     /** */
     private final FailureProcessor failureProcessor;
@@ -155,7 +155,7 @@ public class MetaStorage implements DbCheckpointListener, ReadOnlyMetastorage, R
 
     /** */
     public MetaStorage(
-        GridCacheSharedContext cctx,
+        GridCacheSharedContext<?, ?> cctx,
         DataRegion dataRegion,
         DataRegionMetricsImpl regionMetrics,
         boolean readOnly
@@ -224,7 +224,7 @@ public class MetaStorage implements DbCheckpointListener, ReadOnlyMetastorage, R
         for (Iterator<IgniteBiTuple<String, byte[]>> it = tmpStorage.stream().iterator(); it.hasNext(); ) {
             IgniteBiTuple<String, byte[]> t = it.next();
 
-            putData(t.get1(), t.get2());
+            writeRaw(t.get1(), t.get2());
         }
 
         try {
@@ -261,45 +261,50 @@ public class MetaStorage implements DbCheckpointListener, ReadOnlyMetastorage, R
 
     /** {@inheritDoc} */
     @Override public Serializable read(String key) throws IgniteCheckedException {
-        byte[] data = getData(key);
+        byte[] data = readRaw(key);
 
-        Object result = null;
+        Serializable res = null;
 
         if (data != null)
-            result = marshaller.unmarshal(data, getClass().getClassLoader());
+            res = marshaller.unmarshal(data, U.gridClassLoader());
 
-        return (Serializable)result;
+        return res;
     }
 
+
     /** {@inheritDoc} */
-    @Override public Map<String, ? extends Serializable> readForPredicate(IgnitePredicate<String> keyPred)
-        throws IgniteCheckedException {
-        Map<String, Serializable> res = null;
+    @Override public void iterate(
+        String keyPrefix,
+        BiConsumer<String, ? super Serializable> cb,
+        boolean unmarshal
+    ) throws IgniteCheckedException {
+        if (empty)
+            return;
 
-        if (readOnly) {
-            if (empty)
-                return Collections.emptyMap();
+        Iterator<Map.Entry<String, byte[]>> updatesIter = null;
 
+        if (readOnly) {
             if (lastUpdates != null) {
-                for (Map.Entry<String, byte[]> lastUpdate : lastUpdates.entrySet()) {
-                    if (keyPred.apply(lastUpdate.getKey())) {
-                        byte[] valBytes = lastUpdate.getValue();
+                SortedMap<String, byte[]> prefixedSubmap = lastUpdates.subMap(keyPrefix, keyPrefix + "\uFFFF");
 
-                        if (valBytes == TOMBSTONE)
-                            continue;
+                if (!prefixedSubmap.isEmpty())
+                    updatesIter = prefixedSubmap.entrySet().iterator();
+            }
+        }
 
-                        if (res == null)
-                            res = new HashMap<>();
+        Map.Entry<String, byte[]> curUpdatesEntry = null;
 
-                        Serializable val = marshaller.unmarshal(valBytes, getClass().getClassLoader());
+        if (updatesIter != null) {
+            assert updatesIter.hasNext();
 
-                        res.put(lastUpdate.getKey(), val);
-                    }
-                }
-            }
+            curUpdatesEntry = updatesIter.next();
         }
 
-        GridCursor<MetastorageDataRow> cur = tree.find(null, null);
+        MetastorageDataRow lower = new MetastorageDataRow(keyPrefix, null);
+
+        MetastorageDataRow upper = new MetastorageDataRow(keyPrefix + "\uFFFF", null);
+
+        GridCursor<MetastorageDataRow> cur = tree.find(lower, upper);
 
         while (cur.next()) {
             MetastorageDataRow row = cur.get();
@@ -307,24 +312,49 @@ public class MetaStorage implements DbCheckpointListener, ReadOnlyMetastorage, R
             String key = row.key();
             byte[] valBytes = row.value();
 
-            if (keyPred.apply(key)) {
-                // Either already added it, or this is a tombstone -> ignore.
-                if (lastUpdates != null && lastUpdates.containsKey(key))
-                    continue;
-
-                if (res == null)
-                    res = new HashMap<>();
+            int c = 0;
 
-                Serializable val = marshaller.unmarshal(valBytes, getClass().getClassLoader());
+            while (curUpdatesEntry != null && (c = curUpdatesEntry.getKey().compareTo(key)) < 0)
+                curUpdatesEntry = advanceCurrentUpdatesEntry(cb, unmarshal, updatesIter, curUpdatesEntry);
 
-                res.put(key, val);
-            }
+            if (curUpdatesEntry != null && c == 0)
+                curUpdatesEntry = advanceCurrentUpdatesEntry(cb, unmarshal, updatesIter, curUpdatesEntry);
+            else
+                applyCallback(cb, unmarshal, key, valBytes);
         }
 
-        if (res == null)
-            res = Collections.emptyMap();
+        while (curUpdatesEntry != null)
+            curUpdatesEntry = advanceCurrentUpdatesEntry(cb, unmarshal, updatesIter, curUpdatesEntry);
+    }
 
-        return res;
+    /** */
+    private Map.Entry<String, byte[]> advanceCurrentUpdatesEntry(
+        BiConsumer<String, ? super Serializable> cb,
+        boolean unmarshal,
+        Iterator<Map.Entry<String, byte[]>> updatesIter,
+        Map.Entry<String, byte[]> curUpdatesEntry
+    ) throws IgniteCheckedException {
+        applyCallback(cb, unmarshal, curUpdatesEntry.getKey(), curUpdatesEntry.getValue());
+
+        return updatesIter.hasNext() ? updatesIter.next() : null;
+    }
+
+    /** */
+    private void applyCallback(
+        BiConsumer<String, ? super Serializable> cb,
+        boolean unmarshal,
+        String key,
+        byte[] valBytes
+    ) throws IgniteCheckedException {
+        if (valBytes != TOMBSTONE) {
+            if (unmarshal) {
+                Serializable val = marshaller.unmarshal(valBytes, U.gridClassLoader());
+
+                cb.accept(key, val);
+            }
+            else
+                cb.accept(key, valBytes);
+        }
     }
 
     /**
@@ -350,7 +380,7 @@ public class MetaStorage implements DbCheckpointListener, ReadOnlyMetastorage, R
 
         byte[] data = marshaller.marshal(val);
 
-        putData(key, data);
+        writeRaw(key, data);
     }
 
     /** {@inheritDoc} */
@@ -358,8 +388,8 @@ public class MetaStorage implements DbCheckpointListener, ReadOnlyMetastorage, R
         removeData(key);
     }
 
-    /** */
-    public void putData(String key, byte[] data) throws IgniteCheckedException {
+    /** {@inheritDoc} */
+    @Override public void writeRaw(String key, byte[] data) throws IgniteCheckedException {
         if (!readOnly) {
             WALPointer ptr = wal.log(new MetastoreDataRecord(key, data));
 
@@ -380,8 +410,8 @@ public class MetaStorage implements DbCheckpointListener, ReadOnlyMetastorage, R
         }
     }
 
-    /** */
-    public byte[] getData(String key) throws IgniteCheckedException {
+    /** {@inheritDoc} */
+    @Override public byte[] readRaw(String key) throws IgniteCheckedException {
         if (readOnly) {
             if (lastUpdates != null) {
                 byte[] res = lastUpdates.get(key);
@@ -608,13 +638,13 @@ public class MetaStorage implements DbCheckpointListener, ReadOnlyMetastorage, R
     public void applyUpdate(String key, byte[] value) throws IgniteCheckedException {
         if (readOnly) {
             if (lastUpdates == null)
-                lastUpdates = new HashMap<>();
+                lastUpdates = new TreeMap<>();
 
             lastUpdates.put(key, value != null ? value : TOMBSTONE);
         }
         else {
             if (value != null)
-                putData(key, value);
+                writeRaw(key, value);
             else
                 removeData(key);
         }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/ReadOnlyMetastorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/ReadOnlyMetastorage.java
index 86cbf0d..40d2245 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/ReadOnlyMetastorage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/ReadOnlyMetastorage.java
@@ -17,10 +17,8 @@
 package org.apache.ignite.internal.processors.cache.persistence.metastorage;
 
 import java.io.Serializable;
-import java.util.Map;
-
+import java.util.function.BiConsumer;
 import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.lang.IgnitePredicate;
 
 /**
  *
@@ -29,12 +27,21 @@ public interface ReadOnlyMetastorage {
     /** */
     Serializable read(String key) throws IgniteCheckedException;
 
+    /** */
+    byte[] readRaw(String key) throws IgniteCheckedException;
+
     /**
-     * Read all keys matching provided predicate.
+     * Read all key/value pairs where key has provided prefix.
+     * It is guaranteed that callback will be applied to matching keys in ascending order.
      *
-     * @param keyPred Key predicate.
-     * @return Matched key-value pairs.
+     * @param keyPrefix Key prefix.
+     * @param cb Callback to invoke on each matching key/value pair.
+     * @param unmarshal {@code True} if object passed into {@code cb} should be unmarshalled.
      * @throws IgniteCheckedException If failed.
      */
-    Map<String, ? extends Serializable> readForPredicate(IgnitePredicate<String> keyPred) throws IgniteCheckedException;
+    public void iterate(
+        String keyPrefix,
+        BiConsumer<String, ? super Serializable> cb,
+        boolean unmarshal
+    ) throws IgniteCheckedException;
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/ReadWriteMetastorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/ReadWriteMetastorage.java
index ab48afc..41cd7ba 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/ReadWriteMetastorage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/ReadWriteMetastorage.java
@@ -28,5 +28,8 @@ public interface ReadWriteMetastorage extends ReadOnlyMetastorage {
     public void write(@NotNull String key, @NotNull Serializable val) throws IgniteCheckedException;
 
     /** */
+    public void writeRaw(String key, byte[] data) throws IgniteCheckedException;
+
+    /** */
     public void remove(@NotNull String key) throws IgniteCheckedException;
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
index e56b05b..72f2f3b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
@@ -69,6 +69,7 @@ import org.apache.ignite.internal.processors.igfs.IgfsProcessorAdapter;
 import org.apache.ignite.internal.processors.job.GridJobProcessor;
 import org.apache.ignite.internal.processors.jobmetrics.GridJobMetricsProcessor;
 import org.apache.ignite.internal.processors.marshaller.GridMarshallerMappingProcessor;
+import org.apache.ignite.internal.processors.metastorage.DistributedMetaStorage;
 import org.apache.ignite.internal.processors.odbc.ClientListenerProcessor;
 import org.apache.ignite.internal.processors.platform.PlatformProcessor;
 import org.apache.ignite.internal.processors.plugin.IgnitePluginProcessor;
@@ -301,6 +302,11 @@ public class StandaloneGridKernalContext implements GridKernalContext {
     }
 
     /** {@inheritDoc} */
+    @Override public DistributedMetaStorage distributedMetastorage() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
     @Override public GridTaskSessionProcessor session() {
         return null;
     }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
index b347d39..e04feff 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
@@ -66,7 +66,6 @@ import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.C1;
 import org.apache.ignite.internal.util.typedef.CI1;
-import org.apache.ignite.internal.util.typedef.CI2;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -630,11 +629,8 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
 
         sharedCtx.io().addCacheHandler(
             0, GridChangeGlobalStateMessageResponse.class,
-            new CI2<UUID, GridChangeGlobalStateMessageResponse>() {
-                @Override public void apply(UUID nodeId, GridChangeGlobalStateMessageResponse msg) {
-                    processChangeGlobalStateResponse(nodeId, msg);
-                }
-            });
+            this::processChangeGlobalStateResponse
+        );
     }
 
     /** {@inheritDoc} */
@@ -1163,8 +1159,6 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
             @Override public void run() {
                 boolean client = ctx.clientNode();
 
-                Exception e = null;
-
                 try {
                     if (ctx.service() instanceof GridServiceProcessor) {
                         GridServiceProcessor srvcProc = (GridServiceProcessor)ctx.service();
@@ -1182,6 +1176,8 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
 
                     ctx.encryption().onActivate(ctx);
 
+                    ((IgniteChangeGlobalStateSupport)ctx.distributedMetastorage()).onActivate(ctx);
+
                     if (log.isInfoEnabled())
                         log.info("Successfully performed final activation steps [nodeId="
                             + ctx.localNodeId() + ", client=" + client + ", topVer=" + req.topologyVersion() + "]");
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStorage.java
new file mode 100644
index 0000000..47236cb
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStorage.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.metastorage;
+
+import java.io.Serializable;
+import org.apache.ignite.IgniteCheckedException;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * API for distributed data storage with the ability to write into it.
+ *
+ * @see ReadableDistributedMetaStorage
+ */
+public interface DistributedMetaStorage extends ReadableDistributedMetaStorage {
+    /**
+     * Write value into distributed metastorage.
+     *
+     * @param key The key.
+     * @param val Value to write. Must not be null.
+     * @throws IgniteCheckedException If cluster is in deactivated state.
+     */
+    void write(@NotNull String key, @NotNull Serializable val) throws IgniteCheckedException;
+
+    /**
+     * Remove value from distributed metastorage.
+     *
+     * @param key The key.
+     * @throws IgniteCheckedException If cluster is in deactivated state.
+     */
+    void remove(@NotNull String key) throws IgniteCheckedException;
+
+    /**
+     * Write value into distributed metastorage but only if current value matches the expected one.
+     *
+     * @param key The key.
+     * @param expVal Expected value. Might be null.
+     * @param newVal Value to write. Must not be null.
+     * @throws IgniteCheckedException If cluster is in deactivated state.
+     * @return {@code True} if expected value matched the actual one and write was completed successfully.
+     *      {@code False} otherwise.
+     */
+    boolean compareAndSet(
+        @NotNull String key,
+        @Nullable Serializable expVal,
+        @NotNull Serializable newVal
+    ) throws IgniteCheckedException;
+
+    /**
+     * Remove value from distributed metastorage but only if current value matches the expected one.
+     *
+     * @param key The key.
+     * @param expVal Expected value. Must not be null.
+     * @throws IgniteCheckedException If cluster is in deactivated state.
+     * @return {@code True} if expected value matched the actual one and remove was completed successfully.
+     *      {@code False} otherwise.
+     */
+    boolean compareAndRemove(@NotNull String key, @NotNull Serializable expVal) throws IgniteCheckedException;
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStorageListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStorageListener.java
new file mode 100644
index 0000000..8c5cb30
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStorageListener.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.metastorage;
+
+import java.io.Serializable;
+import java.util.function.Predicate;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Listener for distributed metastorage data updates.
+ *
+ * @see ReadableDistributedMetaStorage#listen(Predicate, DistributedMetaStorageListener)
+ */
+@FunctionalInterface
+public interface DistributedMetaStorageListener<T extends Serializable> {
+    /**
+     * Invoked in two cases:
+     * <ul>
+     *     <li>data was dinamicaly updated;</li>
+     *     <li>node was activated. In this case {@code oldVal} and {@code newVal} might be different only if new data
+     *     was received from cluster before activation</li>
+     * </ul>
+     *
+     * @param key The key.
+     * @param oldVal Previous value associated with the key.
+     * @param newVal New value after update.
+     */
+    void onUpdate(@NotNull String key, @Nullable T oldVal, @Nullable T newVal);
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/DistributedMetastorageLifecycleListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/DistributedMetastorageLifecycleListener.java
new file mode 100644
index 0000000..6c07ddc
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/DistributedMetastorageLifecycleListener.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.metastorage;
+
+/**
+ * Listener for {@link DistributedMetaStorage} lifecycle events.
+ */
+public interface DistributedMetastorageLifecycleListener {
+    /**
+     * Called when global metastorage is ready for reading.
+     * <br>
+     * Normally this is the place where you should add listeners and read required data.
+     * Note that this data might be outdated.
+     *
+     * @param metastorage Read-only global metastorage.
+     * @see DistributedMetaStorageListener
+     */
+    default void onReadyForRead(ReadableDistributedMetaStorage metastorage) {}
+
+    /**
+     * Called when global metastorage is available for writing. Given instance guaranteed to be
+     * valid until cluster deactivation. In such case this method will be invoked once cluster
+     * is reactivated.
+     *
+     * @param metastorage Global metastorage instance.
+     */
+    default void onReadyForWrite(DistributedMetaStorage metastorage) {}
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/ReadableDistributedMetaStorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/ReadableDistributedMetaStorage.java
new file mode 100644
index 0000000..7047079
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/ReadableDistributedMetaStorage.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.metastorage;
+
+import java.io.Serializable;
+import java.util.function.BiConsumer;
+import java.util.function.Predicate;
+import org.apache.ignite.IgniteCheckedException;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * API for distributed data storage. It is guaranteed that every read value is the same on every node in the cluster
+ * all the time.
+ */
+public interface ReadableDistributedMetaStorage {
+    /**
+     * Get value by the key.
+     *
+     * @param key The key.
+     * @return Value associated with the key.
+     * @throws IgniteCheckedException If reading or unmarshalling failed.
+     */
+    @Nullable <T extends Serializable> T read(@NotNull String key) throws IgniteCheckedException;
+
+    /**
+     * Iterate over all values corresponding to the keys with given prefix. It is guaranteed that iteration will be
+     * executed in ascending keys order.
+     *
+     * @param keyPrefix Prefix for the keys that will be iterated.
+     * @param cb Callback that will be applied to all {@code <key, value>} pairs.
+     * @throws IgniteCheckedException If reading or unmarshalling failed.
+     */
+    void iterate(
+        @NotNull String keyPrefix,
+        @NotNull BiConsumer<String, ? super Serializable> cb
+    ) throws IgniteCheckedException;
+
+    /**
+     * Add listener on data updates.
+     *
+     * @param keyPred Predicate to check whether this listener should be invoked on given key update or not.
+     * @param lsnr Listener object.
+     * @see DistributedMetaStorageListener
+     */
+    void listen(@NotNull Predicate<String> keyPred, DistributedMetaStorageListener<?> lsnr);
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageBridge.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageBridge.java
new file mode 100644
index 0000000..f6d83fa
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageBridge.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.metastorage.persistence;
+
+import java.io.Serializable;
+import java.util.function.BiConsumer;
+import org.apache.ignite.IgniteCheckedException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Bridge interface to access data storage in {@link DistributedMetaStorageImpl}.
+ */
+interface DistributedMetaStorageBridge {
+    /**
+     * Get data by key.
+     *
+     * @param globalKey The key.
+     * @param unmarshal Whether the value should be unmarshalled or not.
+     * @return Value associated with the key.
+     * @throws IgniteCheckedException If reading or unmarshalling failed.
+     */
+    Serializable read(String globalKey, boolean unmarshal) throws IgniteCheckedException;
+
+    /**
+     * Iterate over all values corresponding to the keys with given prefix. It is guaranteed that iteration will be
+     * executed in ascending keys order.
+     *
+     * @param globalKeyPrefix Prefix for the keys that will be iterated.
+     * @param cb Callback that will be applied to all {@code <key, value>} pairs.
+     * @throws IgniteCheckedException If reading or unmarshalling failed.
+     */
+    void iterate(
+        String globalKeyPrefix,
+        BiConsumer<String, ? super Serializable> cb,
+        boolean unmarshal
+    ) throws IgniteCheckedException;
+
+    /**
+     * Write data into storage.
+     *
+     * @param globalKey The key.
+     * @param valBytes Value bytes.
+     * @throws IgniteCheckedException If some IO problem occured.
+     */
+    void write(String globalKey, @Nullable byte[] valBytes) throws IgniteCheckedException;
+
+    /**
+     * Invoked when update message was received. Prepares storage to the writing of new value and notifies listeners
+     * (optionally).
+     *
+     * @param histItem Update data.
+     * @param val Unmarshalled value that needs to be written. This value is ignored if listeners shouldn't be notified.
+     * @param notifyListeners Whether listeners should be notified about update or not.
+     * @throws IgniteCheckedException If some IO or unmarshalling errors occured.
+     */
+    void onUpdateMessage(
+        DistributedMetaStorageHistoryItem histItem,
+        Serializable val,
+        boolean notifyListeners
+    ) throws IgniteCheckedException;
+
+    /**
+     * Remove information about the specific update from the history.
+     *
+     * @param ver Specific version for which the update information should be deleted.
+     * @throws IgniteCheckedException If some IO error occured.
+     */
+    void removeHistoryItem(long ver) throws IgniteCheckedException;
+
+    /**
+     * Returns all {@code <key, value>} pairs currently stored in distributed metastorage. Values are not unmarshalled.
+     *
+     * @return Array of all keys and values.
+     */
+    DistributedMetaStorageHistoryItem[] localFullData() throws IgniteCheckedException;
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/ReadWriteMetastorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageCasAckMessage.java
similarity index 53%
copy from modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/ReadWriteMetastorage.java
copy to modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageCasAckMessage.java
index ab48afc..a403454 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/ReadWriteMetastorage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageCasAckMessage.java
@@ -14,19 +14,33 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.ignite.internal.processors.cache.persistence.metastorage;
 
-import java.io.Serializable;
-import org.apache.ignite.IgniteCheckedException;
-import org.jetbrains.annotations.NotNull;
+package org.apache.ignite.internal.processors.metastorage.persistence;
+
+import java.util.UUID;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/** */
+class DistributedMetaStorageCasAckMessage extends DistributedMetaStorageUpdateAckMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private final boolean updated;
 
-/**
- *
- */
-public interface ReadWriteMetastorage extends ReadOnlyMetastorage {
     /** */
-    public void write(@NotNull String key, @NotNull Serializable val) throws IgniteCheckedException;
+    public DistributedMetaStorageCasAckMessage(UUID reqId, boolean active, boolean updated) {
+        super(reqId, active);
+        this.updated = updated;
+    }
 
     /** */
-    public void remove(@NotNull String key) throws IgniteCheckedException;
+    public boolean updated() {
+        return updated;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(DistributedMetaStorageCasAckMessage.class, this);
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageCasMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageCasMessage.java
new file mode 100644
index 0000000..d5d5f8f
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageCasMessage.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.metastorage.persistence;
+
+import java.util.UUID;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
+
+/** */
+class DistributedMetaStorageCasMessage extends DistributedMetaStorageUpdateMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private final byte[] expectedVal;
+
+    /** */
+    private boolean matches = true;
+
+    /** */
+    public DistributedMetaStorageCasMessage(UUID reqId, String key, byte[] expValBytes, byte[] valBytes) {
+        super(reqId, key, valBytes);
+
+        expectedVal = expValBytes;
+    }
+
+    /** */
+    public byte[] expectedValue() {
+        return expectedVal;
+    }
+
+    /** */
+    public void setMatches(boolean matches) {
+        this.matches = matches;
+    }
+
+    /** */
+    public boolean matches() {
+        return matches;
+    }
+
+    /** {@inheritDoc} */
+    @Override @Nullable public DiscoveryCustomMessage ackMessage() {
+        return new DistributedMetaStorageCasAckMessage(requestId(), isActive(), matches);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(DistributedMetaStorageCasMessage.class, this);
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageClusterNodeData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageClusterNodeData.java
new file mode 100644
index 0000000..94351dc
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageClusterNodeData.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.metastorage.persistence;
+
+import java.io.Serializable;
+
+/** */
+@SuppressWarnings("PublicField")
+class DistributedMetaStorageClusterNodeData implements Serializable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    public final DistributedMetaStorageVersion ver;
+
+    /** */
+    public final DistributedMetaStorageHistoryItem[] fullData;
+
+    /** */
+    public final DistributedMetaStorageHistoryItem[] hist;
+
+    /** */
+    public DistributedMetaStorageHistoryItem[] updates;
+
+    /** */
+    public DistributedMetaStorageClusterNodeData(
+        DistributedMetaStorageVersion ver,
+        DistributedMetaStorageHistoryItem[] fullData,
+        DistributedMetaStorageHistoryItem[] hist,
+        DistributedMetaStorageHistoryItem[] updates
+    ) {
+        this.fullData = fullData;
+        this.ver = ver;
+        this.hist = hist;
+        this.updates = updates;
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageHistoryItem.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageHistoryItem.java
new file mode 100644
index 0000000..2d1ab50
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageHistoryItem.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.metastorage.persistence;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/** */
+@SuppressWarnings("PublicField")
+class DistributedMetaStorageHistoryItem implements Serializable {
+    /** */
+    public static final DistributedMetaStorageHistoryItem[] EMPTY_ARRAY = {};
+
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    @GridToStringInclude
+    public final String key;
+
+    /** */
+    @GridToStringInclude
+    public final byte[] valBytes;
+
+    /** */
+    public DistributedMetaStorageHistoryItem(String key, byte[] valBytes) {
+        this.key = key;
+        this.valBytes = valBytes;
+    }
+
+    /** */
+    public long estimateSize() {
+        // String encoding is ignored to make estimation faster. 2 "size" values added as well.
+        return 8 + key.length() * 2 + (valBytes == null ? 0 : valBytes.length);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object o) {
+        if (this == o)
+            return true;
+
+        if (o == null || getClass() != o.getClass())
+            return false;
+
+        DistributedMetaStorageHistoryItem item = (DistributedMetaStorageHistoryItem)o;
+
+        return key.equals(item.key) && Arrays.equals(valBytes, item.valBytes);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        return 31 * key.hashCode() + Arrays.hashCode(valBytes);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(DistributedMetaStorageHistoryItem.class, this);
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java
new file mode 100644
index 0000000..d96c73d
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java
@@ -0,0 +1,1169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.metastorage.persistence;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.function.BiConsumer;
+import java.util.function.Predicate;
+import java.util.stream.LongStream;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.failure.FailureContext;
+import org.apache.ignite.failure.FailureType;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
+import org.apache.ignite.internal.processors.GridProcessorAdapter;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage;
+import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetastorageLifecycleListener;
+import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadOnlyMetastorage;
+import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadWriteMetastorage;
+import org.apache.ignite.internal.processors.cluster.BaselineTopology;
+import org.apache.ignite.internal.processors.cluster.IgniteChangeGlobalStateSupport;
+import org.apache.ignite.internal.processors.metastorage.DistributedMetaStorage;
+import org.apache.ignite.internal.processors.metastorage.DistributedMetaStorageListener;
+import org.apache.ignite.internal.processors.metastorage.DistributedMetastorageLifecycleListener;
+import org.apache.ignite.internal.processors.subscription.GridInternalSubscriptionProcessor;
+import org.apache.ignite.internal.util.GridConcurrentLinkedHashSet;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.spi.IgniteNodeValidationResult;
+import org.apache.ignite.spi.discovery.DiscoveryDataBag;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.TestOnly;
+
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_GLOBAL_METASTORAGE_HISTORY_MAX_BYTES;
+import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType.META_STORAGE;
+import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isPersistenceEnabled;
+import static org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageHistoryItem.EMPTY_ARRAY;
+import static org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUtil.historyItemPrefix;
+import static org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUtil.historyItemVer;
+import static org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUtil.marshal;
+import static org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUtil.unmarshal;
+
+/**
+ * Implementation of {@link DistributedMetaStorage} based on {@link MetaStorage} for persistence and discovery SPI
+ * for communication.
+ */
+public class DistributedMetaStorageImpl extends GridProcessorAdapter
+    implements DistributedMetaStorage, IgniteChangeGlobalStateSupport
+{
+    /** Component ID required for {@link DiscoveryDataBag} instances. */
+    private static final int COMPONENT_ID = META_STORAGE.ordinal();
+
+    /** Default upper bound of history size in bytes. */
+    private static final long DFLT_MAX_HISTORY_BYTES = 100 * 1024 * 1024;
+
+    /** Cached subscription processor instance. Exists to make code shorter. */
+    private final GridInternalSubscriptionProcessor subscrProcessor;
+
+    /** Bridge. Has some "phase-specific" code. Exists to avoid countless {@code if}s in code. */
+    private volatile DistributedMetaStorageBridge bridge = new NotAvailableDistributedMetaStorageBridge();
+
+    /**
+     * {@link MetastorageLifecycleListener#onReadyForReadWrite(ReadWriteMetastorage)} is invoked asynchronously after
+     * cluster activation so there's a chance of a gap where someone alreasy tries to write data but distributed
+     * metastorage is not "writeable". Current latch aims to resolve this issue - every "write" action waits for it
+     * before actually trying to write anything.
+     */
+    private volatile CountDownLatch writeAvailable = new CountDownLatch(1);
+
+    /**
+     * Version of distributed metastorage.
+     */
+    volatile DistributedMetaStorageVersion ver;
+
+    /** Listeners set. */
+    final Set<IgniteBiTuple<Predicate<String>, DistributedMetaStorageListener<Serializable>>> lsnrs =
+        new GridConcurrentLinkedHashSet<>();
+
+    /**
+     * Map that contains latest changes in distributed metastorage. There should be no gaps in versions and the latest
+     * version is always present in the map. This means that the map is empty only if version is 0.
+     */
+    //TODO Use something similar to java.util.ArrayDeque.
+    private final Map<Long, DistributedMetaStorageHistoryItem> histCache = new ConcurrentHashMap<>();
+
+    /** Approximate number of bytes in values of {@link #histCache} map. */
+    private long histSizeApproximation;
+
+    /**
+     * Maximal acceptable value of {@link #histSizeApproximation}. After every write history would shrink until its size
+     * is not greater then given value.
+     */
+    private final long histMaxBytes = IgniteSystemProperties.getLong(
+        IGNITE_GLOBAL_METASTORAGE_HISTORY_MAX_BYTES,
+        DFLT_MAX_HISTORY_BYTES
+    );
+
+    /**
+     * Map with futures used to wait for async write/remove operations completion.
+     */
+    private final ConcurrentMap<UUID, GridFutureAdapter<Boolean>> updateFuts = new ConcurrentHashMap<>();
+
+    /**
+     * Some extra values that are useful only when node is not active. Otherwise it is nullized to remove
+     * excessive data from the heap.
+     *
+     * @see StartupExtras
+     */
+    private volatile StartupExtras startupExtras = new StartupExtras();
+
+    /**
+     * Lock to access/update {@link #bridge} and {@link #startupExtras} fields (probably some others as well).
+     */
+    private final Object innerStateLock = new Object();
+
+    /**
+     * Becomes {@code true} if node was deactivated, this information is useful for joining node validation.
+     *
+     * @see #validateNode(ClusterNode, DiscoveryDataBag.JoiningNodeDiscoveryData)
+     */
+    private boolean wasDeactivated;
+
+    /**
+     * @param ctx Kernal context.
+     */
+    public DistributedMetaStorageImpl(GridKernalContext ctx) {
+        super(ctx);
+
+        subscrProcessor = ctx.internalSubscriptionProcessor();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void start() throws IgniteCheckedException {
+        if (ctx.clientNode())
+            return;
+
+        if (isPersistenceEnabled(ctx.config())) {
+            subscrProcessor.registerMetastorageListener(new MetastorageLifecycleListener() {
+                /** {@inheritDoc} */
+                @Override public void onReadyForRead(
+                    ReadOnlyMetastorage metastorage
+                ) throws IgniteCheckedException {
+                    onMetaStorageReadyForRead(metastorage);
+                }
+
+                /** {@inheritDoc} */
+                @Override public void onReadyForReadWrite(
+                    ReadWriteMetastorage metastorage
+                ) throws IgniteCheckedException {
+                    onMetaStorageReadyForWrite(metastorage);
+                }
+            });
+        }
+        else {
+            ver = DistributedMetaStorageVersion.INITIAL_VERSION;
+
+            bridge = new EmptyDistributedMetaStorageBridge();
+
+            for (DistributedMetastorageLifecycleListener subscriber : subscrProcessor.getDistributedMetastorageSubscribers())
+                subscriber.onReadyForRead(this);
+        }
+
+        GridDiscoveryManager discovery = ctx.discovery();
+
+        discovery.setCustomEventListener(
+            DistributedMetaStorageUpdateMessage.class,
+            this::onUpdateMessage
+        );
+
+        discovery.setCustomEventListener(
+            DistributedMetaStorageUpdateAckMessage.class,
+            this::onAckMessage
+        );
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onKernalStart(boolean active) throws IgniteCheckedException {
+        if (active)
+            onActivate(ctx);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onActivate(GridKernalContext kctx) throws IgniteCheckedException {
+        if (ctx.clientNode())
+            return;
+
+        if (!isPersistenceEnabled(ctx.config())) {
+            if (!(bridge instanceof InMemoryCachedDistributedMetaStorageBridge)) {
+                synchronized (innerStateLock) {
+                    assert startupExtras != null;
+
+                    InMemoryCachedDistributedMetaStorageBridge memCachedBridge =
+                        new InMemoryCachedDistributedMetaStorageBridge(this);
+
+                    memCachedBridge.restore(startupExtras);
+
+                    executeDeferredUpdates(memCachedBridge);
+
+                    bridge = memCachedBridge;
+
+                    startupExtras = null;
+                }
+            }
+
+            for (DistributedMetastorageLifecycleListener subscriber : subscrProcessor.getDistributedMetastorageSubscribers())
+                subscriber.onReadyForWrite(this);
+
+            writeAvailable.countDown();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onDeActivate(GridKernalContext kctx) {
+        if (ctx.clientNode())
+            return;
+
+        synchronized (innerStateLock) {
+            wasDeactivated = true;
+
+            if (isPersistenceEnabled(ctx.config())) {
+                try {
+                    DistributedMetaStorageHistoryItem[] locFullData = bridge.localFullData();
+
+                    bridge = new ReadOnlyDistributedMetaStorageBridge(locFullData);
+                }
+                catch (IgniteCheckedException e) {
+                    throw criticalError(e);
+                }
+
+                startupExtras = new StartupExtras();
+            }
+
+            if (writeAvailable.getCount() > 0)
+                writeAvailable.countDown();
+
+            writeAvailable = new CountDownLatch(1);
+        }
+    }
+
+    /** Whether cluster is active at this moment or not. Also returns {@code true} if cluster is being activated. */
+    private boolean isActive() {
+        return ctx.state().clusterState().active();
+    }
+
+    /**
+     * Implementation for {@link MetastorageLifecycleListener#onReadyForRead(ReadOnlyMetastorage)} listener.
+     * Invoked after node was started but before it was activated (only in persistent clusters).
+     *
+     * @param metastorage Local metastorage instance available for reading.
+     * @throws IgniteCheckedException If there were any issues while metastorage reading.
+     * @see MetastorageLifecycleListener#onReadyForRead(ReadOnlyMetastorage)
+     */
+    private void onMetaStorageReadyForRead(ReadOnlyMetastorage metastorage) throws IgniteCheckedException {
+        assert isPersistenceEnabled(ctx.config());
+
+        assert startupExtras != null;
+
+        ReadOnlyDistributedMetaStorageBridge readOnlyBridge = new ReadOnlyDistributedMetaStorageBridge();
+
+        lock();
+
+        try {
+            ver = readOnlyBridge.readInitialData(metastorage, startupExtras);
+
+            metastorage.iterate(
+                historyItemPrefix(),
+                (key, val) -> addToHistoryCache(historyItemVer(key), (DistributedMetaStorageHistoryItem)val),
+                true
+            );
+        }
+        finally {
+            unlock();
+        }
+
+        bridge = readOnlyBridge;
+
+        for (DistributedMetastorageLifecycleListener subscriber : subscrProcessor.getDistributedMetastorageSubscribers())
+            subscriber.onReadyForRead(this);
+    }
+
+    /**
+     * Implementation for {@link MetastorageLifecycleListener#onReadyForReadWrite(ReadWriteMetastorage)} listener.
+     * Invoked after each activation (only in persistent clusters).
+     *
+     * @param metastorage Local metastorage instance available for writing.
+     * @throws IgniteCheckedException If there were any errors while accessing local metastorage.
+     * @see MetastorageLifecycleListener#onReadyForReadWrite(ReadWriteMetastorage)
+     */
+    private void onMetaStorageReadyForWrite(ReadWriteMetastorage metastorage) throws IgniteCheckedException {
+        assert isPersistenceEnabled(ctx.config());
+
+        synchronized (innerStateLock) {
+            WritableDistributedMetaStorageBridge writableBridge = new WritableDistributedMetaStorageBridge(this, metastorage);
+
+            if (startupExtras != null) {
+                lock();
+
+                try {
+                    writableBridge.restore(startupExtras);
+                }
+                finally {
+                    unlock();
+                }
+
+                executeDeferredUpdates(writableBridge);
+            }
+
+            bridge = writableBridge;
+
+            startupExtras = null;
+        }
+
+        for (DistributedMetastorageLifecycleListener subscriber : subscrProcessor.getDistributedMetastorageSubscribers())
+            subscriber.onReadyForWrite(this);
+
+        writeAvailable.countDown();
+    }
+
+    /** {@inheritDoc} */
+    @Override @Nullable public <T extends Serializable> T read(@NotNull String key) throws IgniteCheckedException {
+        lock();
+
+        try {
+            return (T)bridge.read(key, true);
+        }
+        finally {
+            unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void write(@NotNull String key, @NotNull Serializable val) throws IgniteCheckedException {
+        assert val != null : key;
+
+        startWrite(key, marshal(val));
+    }
+
+    /** {@inheritDoc} */
+    @Override public void remove(@NotNull String key) throws IgniteCheckedException {
+        startWrite(key, null);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean compareAndSet(
+        @NotNull String key,
+        @Nullable Serializable expVal,
+        @NotNull Serializable newVal
+    ) throws IgniteCheckedException {
+        assert newVal != null : key;
+
+        return startCas(key, marshal(expVal), marshal(newVal));
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean compareAndRemove(
+        @NotNull String key,
+        @NotNull Serializable expVal
+    ) throws IgniteCheckedException {
+        assert expVal != null : key;
+
+        return startCas(key, marshal(expVal), null);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void iterate(
+        @NotNull String keyPrefix,
+        @NotNull BiConsumer<String, ? super Serializable> cb
+    ) throws IgniteCheckedException {
+        lock();
+
+        try {
+            bridge.iterate(keyPrefix, cb, true);
+        }
+        finally {
+            unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void listen(@NotNull Predicate<String> keyPred, DistributedMetaStorageListener<?> lsnr) {
+        DistributedMetaStorageListener<Serializable> lsnrUnchecked = (DistributedMetaStorageListener<Serializable>)lsnr;
+
+        lsnrs.add(new IgniteBiTuple<>(keyPred, lsnrUnchecked));
+    }
+
+    /** {@inheritDoc} */
+    @Override @Nullable public DiscoveryDataExchangeType discoveryDataType() {
+        return META_STORAGE;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void collectJoiningNodeData(DiscoveryDataBag dataBag) {
+        if (ctx.clientNode())
+            return;
+
+        assert startupExtras != null;
+
+        DistributedMetaStorageHistoryItem[] hist = new TreeMap<>(histCache) // Sorting might be avoided if histCache is a queue
+            .values()
+            .toArray(EMPTY_ARRAY);
+
+        DistributedMetaStorageVersion verToSnd = bridge instanceof ReadOnlyDistributedMetaStorageBridge
+            ? ((ReadOnlyDistributedMetaStorageBridge)bridge).version()
+            : ver;
+
+        Serializable data = new DistributedMetaStorageJoiningNodeData(
+            getBaselineTopologyId(),
+            verToSnd,
+            hist
+        );
+
+        dataBag.addJoiningNodeData(COMPONENT_ID, data);
+    }
+
+    /** Returns current baseline topology id of {@code -1} if there's no baseline topology found. */
+    private int getBaselineTopologyId() {
+        BaselineTopology baselineTop = ctx.state().clusterState().baselineTopology();
+
+        return baselineTop != null ? baselineTop.id() : -1;
+    }
+
+    /** {@inheritDoc} */
+    @Override @Nullable public IgniteNodeValidationResult validateNode(
+        ClusterNode node,
+        DiscoveryDataBag.JoiningNodeDiscoveryData discoData
+    ) {
+        if (ctx.clientNode())
+            return null;
+
+        if (!discoData.hasJoiningNodeData() || !isPersistenceEnabled(ctx.config()))
+            return null;
+
+        DistributedMetaStorageJoiningNodeData joiningData =
+            (DistributedMetaStorageJoiningNodeData)discoData.joiningNodeData();
+
+        DistributedMetaStorageVersion remoteVer = joiningData.ver;
+
+        DistributedMetaStorageHistoryItem[] remoteHist = joiningData.hist;
+
+        int remoteHistSize = remoteHist.length;
+
+        int remoteBltId = joiningData.bltId;
+
+        boolean clusterIsActive = isActive();
+
+        String errorMsg;
+
+        synchronized (innerStateLock) {
+            DistributedMetaStorageVersion locVer = getActualVersion();
+
+            int locBltId = getBaselineTopologyId();
+
+            int locHistSize = getAvailableHistorySize();
+
+            if (remoteVer.id < locVer.id - locHistSize) {
+                // Remote node is too far behind.
+                // Technicaly this situation should be banned because there's no way to prove data consistency.
+                errorMsg = null;
+            }
+            else if (remoteVer.id < locVer.id) {
+                // Remote node it behind the cluster version and there's enough history.
+                DistributedMetaStorageVersion newRemoteVer = remoteVer.nextVersion(
+                    this::historyItem,
+                    remoteVer.id + 1,
+                    locVer.id
+                );
+
+                if (!newRemoteVer.equals(locVer))
+                    errorMsg = "Joining node has conflicting distributed metastorage data.";
+                else
+                    errorMsg = null;
+            }
+            else if (remoteVer.id == locVer.id) {
+                // Remote and local versions match.
+                if (!remoteVer.equals(locVer)) {
+                    errorMsg = S.toString(
+                        "Joining node has conflicting distributed metastorage data:",
+                        "clusterVersion", locVer, false,
+                        "joiningNodeVersion", remoteVer, false
+                    );
+                }
+                else
+                    errorMsg = null;
+            }
+            else if (remoteVer.id <= locVer.id + remoteHistSize) {
+                // Remote node is ahead of the cluster and has enough history.
+                if (clusterIsActive) {
+                    errorMsg = "Attempting to join node with larger distributed metastorage version id." +
+                        " The node is most likely in invalid state and can't be joined.";
+                }
+                else if (wasDeactivated || remoteBltId < locBltId)
+                    errorMsg = "Joining node has conflicting distributed metastorage data.";
+                else {
+                    DistributedMetaStorageVersion newLocVer = locVer.nextVersion(
+                        remoteHist,
+                        remoteHistSize - (int)(remoteVer.id - locVer.id),
+                        remoteHistSize
+                    );
+
+                    if (!newLocVer.equals(remoteVer))
+                        errorMsg = "Joining node has conflicting distributed metastorage data.";
+                    else
+                        errorMsg = null;
+                }
+            }
+            else {
+                assert remoteVer.id > locVer.id + remoteHistSize;
+
+                // Remote node is too far ahead.
+                if (clusterIsActive) {
+                    errorMsg = "Attempting to join node with larger distributed metastorage version id." +
+                        " The node is most likely in invalid state and can't be joined.";
+                }
+                else if (wasDeactivated || remoteBltId < locBltId)
+                    errorMsg = "Joining node has conflicting distributed metastorage data.";
+                else {
+                    errorMsg = "Joining node doesn't have enough history items in distributed metastorage data." +
+                        " Please check the order in which you start cluster nodes.";
+                }
+            }
+        }
+
+        return (errorMsg == null) ? null : new IgniteNodeValidationResult(node.id(), errorMsg, errorMsg);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void collectGridNodeData(DiscoveryDataBag dataBag) {
+        if (ctx.clientNode())
+            return;
+
+        DiscoveryDataBag.JoiningNodeDiscoveryData discoData = dataBag.newJoinerDiscoveryData(COMPONENT_ID);
+
+        if (!discoData.hasJoiningNodeData())
+            return;
+
+        DistributedMetaStorageJoiningNodeData joiningData =
+            (DistributedMetaStorageJoiningNodeData)discoData.joiningNodeData();
+
+        if (joiningData == null)
+            return;
+
+        DistributedMetaStorageVersion remoteVer = joiningData.ver;
+
+        synchronized (innerStateLock) {
+            //TODO Store it precalculated? Maybe later.
+            DistributedMetaStorageVersion actualVer = getActualVersion();
+
+            if (remoteVer.id > actualVer.id) {
+                assert startupExtras != null;
+
+                DistributedMetaStorageHistoryItem[] hist = joiningData.hist;
+
+                if (remoteVer.id - actualVer.id <= hist.length) {
+                    assert bridge instanceof ReadOnlyDistributedMetaStorageBridge
+                        || bridge instanceof EmptyDistributedMetaStorageBridge;
+
+                    for (long v = actualVer.id + 1; v <= remoteVer.id; v++)
+                        updateLater(hist[(int)(v - remoteVer.id + hist.length - 1)]);
+
+                    Serializable nodeData = new DistributedMetaStorageClusterNodeData(remoteVer, null, null, null);
+
+                    dataBag.addGridCommonData(COMPONENT_ID, nodeData);
+                }
+                else
+                    assert false : "Joining node is too far ahead [remoteVer=" + remoteVer + "]";
+            }
+            else {
+                if (dataBag.commonDataCollectedFor(COMPONENT_ID))
+                    return;
+
+                if (remoteVer.id == actualVer.id) {
+                    assert remoteVer.equals(actualVer) : actualVer + " " + remoteVer;
+
+                    Serializable nodeData = new DistributedMetaStorageClusterNodeData(ver, null, null, null);
+
+                    dataBag.addGridCommonData(COMPONENT_ID, nodeData);
+                }
+                else {
+                    int availableHistSize = getAvailableHistorySize();
+
+                    if (actualVer.id - remoteVer.id <= availableHistSize) {
+                        DistributedMetaStorageHistoryItem[] hist = history(remoteVer.id + 1, actualVer.id);
+
+                        Serializable nodeData = new DistributedMetaStorageClusterNodeData(ver, null, null, hist);
+
+                        dataBag.addGridCommonData(COMPONENT_ID, nodeData);
+                    }
+                    else {
+                        DistributedMetaStorageVersion ver0;
+
+                        DistributedMetaStorageHistoryItem[] fullData;
+
+                        DistributedMetaStorageHistoryItem[] hist;
+
+                        if (startupExtras == null || startupExtras.fullNodeData == null) {
+                            ver0 = ver;
+
+                            try {
+                                fullData = bridge.localFullData();
+                            }
+                            catch (IgniteCheckedException e) {
+                                throw criticalError(e);
+                            }
+
+                            hist = history(ver.id - histCache.size() + 1, actualVer.id);
+                        }
+                        else {
+                            ver0 = startupExtras.fullNodeData.ver;
+
+                            fullData = startupExtras.fullNodeData.fullData;
+
+                            hist = startupExtras.fullNodeData.hist;
+                        }
+
+                        DistributedMetaStorageHistoryItem[] updates;
+
+                        if (startupExtras != null)
+                            updates = startupExtras.deferredUpdates.toArray(EMPTY_ARRAY);
+                        else
+                            updates = null;
+
+                        Serializable nodeData = new DistributedMetaStorageClusterNodeData(ver0, fullData, hist, updates);
+
+                        dataBag.addGridCommonData(COMPONENT_ID, nodeData);
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * Returns number of all available history items. Might be a history from remote node snapshot or/and deferred
+     * updates from another remote node. Depends on the current node state.
+     */
+    private int getAvailableHistorySize() {
+        assert Thread.holdsLock(innerStateLock);
+
+        if (startupExtras == null)
+            return histCache.size();
+        else if (startupExtras.fullNodeData == null)
+            return histCache.size() + startupExtras.deferredUpdates.size();
+        else
+            return startupExtras.fullNodeData.hist.length + startupExtras.deferredUpdates.size();
+    }
+
+    /**
+     * Returns actual version from the local node. It is just a version for activated node or calculated future
+     * version otherwise.
+     */
+    private DistributedMetaStorageVersion getActualVersion() {
+        assert Thread.holdsLock(innerStateLock);
+
+        if (startupExtras == null)
+            return ver;
+        else if (startupExtras.fullNodeData == null)
+            return ver.nextVersion(startupExtras.deferredUpdates);
+        else
+            return startupExtras.fullNodeData.ver.nextVersion(startupExtras.deferredUpdates);
+    }
+
+    /**
+     * Returns last update for the specified version.
+     *
+     * @param specificVer Specific version.
+     * @return {@code <key, value>} pair if it was found, {@code null} otherwise.
+     */
+    private DistributedMetaStorageHistoryItem historyItem(long specificVer) {
+        assert Thread.holdsLock(innerStateLock);
+
+        if (startupExtras == null)
+            return histCache.get(specificVer);
+        else {
+            DistributedMetaStorageClusterNodeData fullNodeData = startupExtras.fullNodeData;
+
+            long notDeferredVer;
+
+            if (fullNodeData == null) {
+                notDeferredVer = ver.id;
+
+                if (specificVer <= notDeferredVer)
+                    return histCache.get(specificVer);
+            }
+            else {
+                notDeferredVer = fullNodeData.ver.id;
+
+                if (specificVer <= notDeferredVer) {
+                    int idx = (int)(specificVer - notDeferredVer + fullNodeData.hist.length - 1);
+
+                    return idx >= 0 ? fullNodeData.hist[idx] : null;
+                }
+            }
+
+            assert specificVer > notDeferredVer;
+
+            int idx = (int)(specificVer - notDeferredVer - 1);
+
+            List<DistributedMetaStorageHistoryItem> deferredUpdates = startupExtras.deferredUpdates;
+
+            if (idx < deferredUpdates.size())
+                return deferredUpdates.get(idx);
+
+            return null;
+        }
+    }
+
+    /**
+     * Returns all updates in the specified range of versions.
+     *
+     * @param startVer Lower bound (inclusive).
+     * @param actualVer Upper bound (inclusive).
+     * @return Array with all requested updates sorted by version in ascending order.
+     */
+    private DistributedMetaStorageHistoryItem[] history(long startVer, long actualVer) {
+        return LongStream.rangeClosed(startVer, actualVer)
+            .mapToObj(this::historyItem)
+            .toArray(DistributedMetaStorageHistoryItem[]::new);
+    }
+
+    /**
+     * {@link DistributedMetaStorageBridge#localFullData()} invoked on {@link #bridge}.
+     */
+    @TestOnly
+    private DistributedMetaStorageHistoryItem[] localFullData() throws IgniteCheckedException {
+        return bridge.localFullData();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onGridDataReceived(DiscoveryDataBag.GridDiscoveryData data) {
+        DistributedMetaStorageClusterNodeData nodeData = (DistributedMetaStorageClusterNodeData)data.commonData();
+
+        if (nodeData != null) {
+            synchronized (innerStateLock) {
+                if (nodeData.fullData == null) {
+                    if (nodeData.updates != null) {
+                        for (DistributedMetaStorageHistoryItem update : nodeData.updates)
+                            updateLater(update);
+                    }
+                }
+                else
+                    writeFullDataLater(nodeData);
+            }
+        }
+    }
+
+    /**
+     * Common implementation for {@link #write(String, Serializable)} and {@link #remove(String)}. Synchronously waits
+     * for operation to be completed.
+     *
+     * @param key The key.
+     * @param valBytes Value bytes to write. Null if value needs to be removed.
+     * @throws IgniteCheckedException If there was an error while sending discovery message or message was sent but
+     *      cluster is not active.
+     */
+    private void startWrite(String key, byte[] valBytes) throws IgniteCheckedException {
+        UUID reqId = UUID.randomUUID();
+
+        GridFutureAdapter<Boolean> fut = new GridFutureAdapter<>();
+
+        updateFuts.put(reqId, fut);
+
+        DiscoveryCustomMessage msg = new DistributedMetaStorageUpdateMessage(reqId, key, valBytes);
+
+        ctx.discovery().sendCustomEvent(msg);
+
+        fut.get();
+    }
+
+    /**
+     * Basically the same as {@link #startWrite(String, byte[])} but for CAS operations.
+     */
+    private boolean startCas(String key, byte[] expValBytes, byte[] newValBytes) throws IgniteCheckedException {
+        UUID reqId = UUID.randomUUID();
+
+        GridFutureAdapter<Boolean> fut = new GridFutureAdapter<>();
+
+        updateFuts.put(reqId, fut);
+
+        DiscoveryCustomMessage msg = new DistributedMetaStorageCasMessage(reqId, key, expValBytes, newValBytes);
+
+        ctx.discovery().sendCustomEvent(msg);
+
+        return fut.get();
+    }
+
+    /**
+     * Invoked when {@link DistributedMetaStorageUpdateMessage} received. Attempts to store received data (depends on
+     * current {@link #bridge} value). Invokes failure handler with critical error if attempt failed for some reason.
+     *
+     * @param topVer Ignored.
+     * @param node Ignored.
+     * @param msg Received message.
+     */
+    private void onUpdateMessage(
+        AffinityTopologyVersion topVer,
+        ClusterNode node,
+        DistributedMetaStorageUpdateMessage msg
+    ) {
+        if (!isActive()) {
+            msg.setActive(false);
+
+            return;
+        }
+
+        try {
+            U.await(writeAvailable);
+
+            if (msg instanceof DistributedMetaStorageCasMessage)
+                completeCas(bridge, (DistributedMetaStorageCasMessage)msg);
+            else
+                completeWrite(bridge, new DistributedMetaStorageHistoryItem(msg.key(), msg.value()), true);
+        }
+        catch (IgniteCheckedException | Error e) {
+            throw criticalError(e);
+        }
+    }
+
+    /**
+     * Invoked when {@link DistributedMetaStorageUpdateAckMessage} received. Completes future if local node is the node
+     * that initiated write operation.
+     *
+     * @param topVer Ignored.
+     * @param node Ignored.
+     * @param msg Received message.
+     */
+    private void onAckMessage(
+        AffinityTopologyVersion topVer,
+        ClusterNode node,
+        DistributedMetaStorageUpdateAckMessage msg
+    ) {
+        GridFutureAdapter<Boolean> fut = updateFuts.remove(msg.requestId());
+
+        if (fut != null) {
+            if (msg.isActive()) {
+                Boolean res = msg instanceof DistributedMetaStorageCasAckMessage
+                    ? ((DistributedMetaStorageCasAckMessage)msg).updated()
+                    : null;
+
+                fut.onDone(res);
+            }
+            else
+                fut.onDone(new IllegalStateException("Ignite cluster is not active"));
+        }
+    }
+
+    /**
+     * Invoke failure handler and rethrow passed exception, possibly wrapped into the unchecked one.
+     */
+    private RuntimeException criticalError(Throwable e) {
+        ctx.failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
+
+        if (e instanceof Error)
+            throw (Error) e;
+
+        throw U.convertException((IgniteCheckedException)e);
+    }
+
+    /**
+     * Store data in local metastorage or in memory.
+     *
+     * @param bridge Bridge to get the access to the storage.
+     * @param histItem {@code <key, value>} pair to process.
+     * @param notifyListeners Whether listeners should be notified or not. {@code false} for data restore on activation.
+     * @throws IgniteCheckedException In case of IO/unmarshalling errors.
+     */
+    private void completeWrite(
+        DistributedMetaStorageBridge bridge,
+        DistributedMetaStorageHistoryItem histItem,
+        boolean notifyListeners
+    ) throws IgniteCheckedException {
+        Serializable val = notifyListeners ? unmarshal(histItem.valBytes) : null;
+
+        lock();
+
+        try {
+            bridge.onUpdateMessage(histItem, val, notifyListeners);
+
+            bridge.write(histItem.key, histItem.valBytes);
+        }
+        finally {
+            unlock();
+        }
+
+        addToHistoryCache(ver.id, histItem);
+
+        shrinkHistory(bridge);
+    }
+
+    /**
+     * Store data in local metastorage or in memory.
+     *
+     * @param bridge Bridge to get the access to the storage.
+     * @param msg Message with all required data.
+     * @see #completeWrite(DistributedMetaStorageBridge, DistributedMetaStorageHistoryItem, boolean)
+     */
+    private void completeCas(
+        DistributedMetaStorageBridge bridge,
+        DistributedMetaStorageCasMessage msg
+    ) throws IgniteCheckedException {
+        if (!msg.matches())
+            return;
+
+        lock();
+
+        try {
+            Serializable oldVal = bridge.read(msg.key(), true);
+
+            Serializable expVal = unmarshal(msg.expectedValue());
+
+            if (!Objects.deepEquals(oldVal, expVal)) {
+                msg.setMatches(false);
+
+                // Do nothing if expected value doesn't match with the actual one.
+                return;
+            }
+        }
+        finally {
+            unlock();
+        }
+
+        completeWrite(bridge, new DistributedMetaStorageHistoryItem(msg.key(), msg.value()), true);
+    }
+
+    /**
+     * Store current update into the in-memory history cache. {@link #histSizeApproximation} is recalculated during this
+     * process.
+     *
+     * @param ver Version for the update.
+     * @param histItem Update itself.
+     */
+    void addToHistoryCache(long ver, DistributedMetaStorageHistoryItem histItem) {
+        DistributedMetaStorageHistoryItem old = histCache.put(ver, histItem);
+
+        assert old == null : old;
+
+        histSizeApproximation += histItem.estimateSize();
+    }
+
+    /**
+     * Remove specific update from the in-memory history cache. {@link #histSizeApproximation} is recalculated during
+     * this process.
+     *
+     * @param ver Version of the update.
+     */
+    void removeFromHistoryCache(long ver) {
+        DistributedMetaStorageHistoryItem old = histCache.remove(ver);
+
+        if (old != null)
+            histSizeApproximation -= old.estimateSize();
+    }
+
+    /**
+     * Clear in-memory history cache.
+     */
+    void clearHistoryCache() {
+        histCache.clear();
+
+        histSizeApproximation = 0L;
+    }
+
+    /**
+     * Shrikn history so that its estimating size doesn't exceed {@link #histMaxBytes}.
+     */
+    private void shrinkHistory(
+        DistributedMetaStorageBridge bridge
+    ) throws IgniteCheckedException {
+        long maxBytes = histMaxBytes;
+
+        if (histSizeApproximation > maxBytes && histCache.size() > 1) {
+            lock();
+
+            try {
+                while (histSizeApproximation > maxBytes && histCache.size() > 1) {
+                    bridge.removeHistoryItem(ver.id + 1 - histCache.size());
+
+                    removeFromHistoryCache(ver.id + 1 - histCache.size());
+                }
+            }
+            finally {
+                unlock();
+            }
+        }
+    }
+
+    /**
+     * Add update into the list of deferred updates. Works for inactive nodes only.
+     */
+    private void updateLater(DistributedMetaStorageHistoryItem update) {
+        assert Thread.holdsLock(innerStateLock);
+
+        assert startupExtras != null;
+
+        startupExtras.deferredUpdates.add(update);
+    }
+
+    /**
+     * Invoked at the end of activation.
+     *
+     * @param bridge Bridge to access data storage.
+     * @throws IgniteCheckedException In case of IO/unmarshalling errors.
+     */
+    private void executeDeferredUpdates(DistributedMetaStorageBridge bridge) throws IgniteCheckedException {
+        assert startupExtras != null;
+
+        DistributedMetaStorageHistoryItem lastUpdate = histCache.get(ver.id);
+
+        if (lastUpdate != null) {
+            byte[] valBytes = (byte[])bridge.read(lastUpdate.key, false);
+
+            if (!Arrays.equals(valBytes, lastUpdate.valBytes)) {
+                lock();
+
+                try {
+                    bridge.write(lastUpdate.key, lastUpdate.valBytes);
+                }
+                finally {
+                    unlock();
+                }
+            }
+        }
+
+        for (DistributedMetaStorageHistoryItem histItem : startupExtras.deferredUpdates)
+            completeWrite(bridge, histItem, false);
+
+        notifyListenersBeforeReadyForWrite(bridge);
+    }
+
+    /**
+     * Notify listeners at the end of activation. Even if there was no data restoring.
+     *
+     * @param bridge Bridge to access data storage.
+     */
+    private void notifyListenersBeforeReadyForWrite(
+        DistributedMetaStorageBridge bridge
+    ) throws IgniteCheckedException {
+        DistributedMetaStorageHistoryItem[] oldData = this.bridge.localFullData();
+
+        DistributedMetaStorageHistoryItem[] newData = bridge.localFullData();
+
+        int oldIdx = 0, newIdx = 0;
+
+        while (oldIdx < oldData.length && newIdx < newData.length) {
+            String oldKey = oldData[oldIdx].key;
+            byte[] oldValBytes = oldData[oldIdx].valBytes;
+
+            String newKey = newData[newIdx].key;
+            byte[] newValBytes = newData[newIdx].valBytes;
+
+            int c = oldKey.compareTo(newKey);
+
+            if (c < 0) {
+                notifyListeners(oldKey, unmarshal(oldValBytes), null);
+
+                ++oldIdx;
+            }
+            else if (c > 0) {
+                notifyListeners(newKey, null, unmarshal(newValBytes));
+
+                ++newIdx;
+            }
+            else {
+                Serializable oldVal = unmarshal(oldValBytes);
+
+                Serializable newVal = Arrays.equals(oldValBytes, newValBytes) ? oldVal : unmarshal(newValBytes);
+
+                notifyListeners(oldKey, oldVal, newVal);
+
+                ++oldIdx;
+
+                ++newIdx;
+            }
+        }
+
+        for (; oldIdx < oldData.length; ++oldIdx)
+            notifyListeners(oldData[oldIdx].key, unmarshal(oldData[oldIdx].valBytes), null);
+
+        for (; newIdx < newData.length; ++newIdx)
+            notifyListeners(newData[newIdx].key, null, unmarshal(newData[newIdx].valBytes));
+    }
+
+    /**
+     * Ultimate version of {@link #updateLater(DistributedMetaStorageHistoryItem)}.
+     *
+     * @param nodeData Data received from remote node.
+     */
+    private void writeFullDataLater(DistributedMetaStorageClusterNodeData nodeData) {
+        assert Thread.holdsLock(innerStateLock);
+
+        assert nodeData.fullData != null;
+
+        startupExtras.fullNodeData = nodeData;
+
+        startupExtras.deferredUpdates.clear();
+
+        if (nodeData.updates != null) {
+            for (DistributedMetaStorageHistoryItem update : nodeData.updates)
+                updateLater(update);
+
+            nodeData.updates = null;
+        }
+    }
+
+    /**
+     * Notify listeners.
+     *
+     * @param key The key.
+     * @param oldVal Old value.
+     * @param newVal New value.
+     */
+    void notifyListeners(String key, Serializable oldVal, Serializable newVal) {
+        for (IgniteBiTuple<Predicate<String>, DistributedMetaStorageListener<Serializable>> entry : lsnrs) {
+            if (entry.get1().test(key)) {
+                try {
+                    // ClassCastException might be thrown here for crappy listeners.
+                    entry.get2().onUpdate(key, oldVal, newVal);
+                }
+                catch (Exception e) {
+                    log.error(S.toString(
+                        "Failed to notify global metastorage update listener",
+                        "key", key, false,
+                        "oldVal", oldVal, false,
+                        "newVal", newVal, false,
+                        "lsnr", entry.get2(), false
+                    ), e);
+                }
+            }
+        }
+    }
+
+    /** Checkpoint read lock. */
+    private void lock() {
+        ctx.cache().context().database().checkpointReadLock();
+    }
+
+    /** Checkpoint read unlock. */
+    private void unlock() {
+        ctx.cache().context().database().checkpointReadUnlock();
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/ReadWriteMetastorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageJoiningNodeData.java
similarity index 56%
copy from modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/ReadWriteMetastorage.java
copy to modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageJoiningNodeData.java
index ab48afc..660c36d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/ReadWriteMetastorage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageJoiningNodeData.java
@@ -14,19 +14,34 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.ignite.internal.processors.cache.persistence.metastorage;
+
+package org.apache.ignite.internal.processors.metastorage.persistence;
 
 import java.io.Serializable;
-import org.apache.ignite.IgniteCheckedException;
-import org.jetbrains.annotations.NotNull;
 
-/**
- *
- */
-public interface ReadWriteMetastorage extends ReadOnlyMetastorage {
+/** */
+@SuppressWarnings("PublicField")
+class DistributedMetaStorageJoiningNodeData implements Serializable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    public final int bltId;
+
+    /** */
+    public final DistributedMetaStorageVersion ver;
+
     /** */
-    public void write(@NotNull String key, @NotNull Serializable val) throws IgniteCheckedException;
+    public final DistributedMetaStorageHistoryItem[] hist;
 
     /** */
-    public void remove(@NotNull String key) throws IgniteCheckedException;
+    public DistributedMetaStorageJoiningNodeData(
+        int bltId,
+        DistributedMetaStorageVersion ver,
+        DistributedMetaStorageHistoryItem[] hist
+    ) {
+        this.bltId = bltId;
+        this.ver = ver;
+        this.hist = hist;
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageUpdateAckMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageUpdateAckMessage.java
new file mode 100644
index 0000000..0e05d93
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageUpdateAckMessage.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.metastorage.persistence;
+
+import java.util.UUID;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.Nullable;
+
+/** */
+class DistributedMetaStorageUpdateAckMessage implements DiscoveryCustomMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private final IgniteUuid id = IgniteUuid.randomUuid();
+
+    /** Request ID. */
+    private final UUID reqId;
+
+    /** */
+    private final boolean active;
+
+    /** */
+    public DistributedMetaStorageUpdateAckMessage(UUID reqId, boolean active) {
+        this.reqId = reqId;
+        this.active = active;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteUuid id() {
+        return id;
+    }
+
+    /** */
+    public UUID requestId() {
+        return reqId;
+    }
+
+    /** */
+    public boolean isAckMessage() {
+        return true;
+    }
+
+    /** */
+    public boolean isActive() {
+        return active;
+    }
+
+    /** {@inheritDoc} */
+    @Override @Nullable public DiscoveryCustomMessage ackMessage() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isMutable() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean stopProcess() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public DiscoCache createDiscoCache(
+        GridDiscoveryManager mgr,
+        AffinityTopologyVersion topVer,
+        DiscoCache discoCache
+    ) {
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(DistributedMetaStorageUpdateAckMessage.class, this);
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageUpdateMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageUpdateMessage.java
new file mode 100644
index 0000000..cc3f37f
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageUpdateMessage.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.metastorage.persistence;
+
+import java.util.UUID;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.Nullable;
+
+/** */
+class DistributedMetaStorageUpdateMessage implements DiscoveryCustomMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private final IgniteUuid id = IgniteUuid.randomUuid();
+
+    /** Request ID. */
+    private final UUID reqId;
+
+    /** */
+    private final String key;
+
+    /** */
+    private final byte[] valBytes;
+
+    /** */
+    private boolean active = true;
+
+    /** */
+    public DistributedMetaStorageUpdateMessage(UUID reqId, String key, byte[] valBytes) {
+        this.reqId = reqId;
+        this.key = key;
+        this.valBytes = valBytes;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteUuid id() {
+        return id;
+    }
+
+    /** */
+    public UUID requestId() {
+        return reqId;
+    }
+
+    /** */
+    public String key() {
+        return key;
+    }
+
+    /** */
+    public byte[] value() {
+        return valBytes;
+    }
+
+    /** */
+    public boolean isAckMessage() {
+        return false;
+    }
+
+    /** */
+    public void setActive(boolean active) {
+        this.active = active;
+    }
+
+    /** */
+    protected boolean isActive() {
+        return active;
+    }
+
+    /** {@inheritDoc} */
+    @Override @Nullable public DiscoveryCustomMessage ackMessage() {
+        return new DistributedMetaStorageUpdateAckMessage(reqId, active);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isMutable() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean stopProcess() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public DiscoCache createDiscoCache(
+        GridDiscoveryManager mgr,
+        AffinityTopologyVersion topVer,
+        DiscoCache discoCache
+    ) {
+        throw new UnsupportedOperationException("createDiscoCache");
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(DistributedMetaStorageUpdateMessage.class, this);
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageUtil.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageUtil.java
new file mode 100644
index 0000000..33c06e4
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageUtil.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.metastorage.persistence;
+
+import java.io.Serializable;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.marshaller.jdk.JdkMarshaller;
+import org.jetbrains.annotations.Nullable;
+
+/** */
+class DistributedMetaStorageUtil {
+    /**
+     * Common prefix for everything that is going to be written into {@link MetaStorage}. Something that has minimal
+     * chance of collision with the existing keys.
+     */
+    static final String COMMON_KEY_PREFIX = "\u0000";
+
+    /**
+     * Prefix for user keys to store in distributed metastorage.
+     */
+    private static final String KEY_PREFIX = "key-";
+
+    /**
+     * Key for history version.
+     */
+    private static final String HISTORY_VER_KEY = "hist-ver";
+
+    /**
+     * Prefix for history items. Each item will be stored using {@code hist-item-<ver>} key.
+     */
+    private static final String HISTORY_ITEM_KEY_PREFIX = "hist-item-";
+
+    /**
+     * Special key indicating that local data for distributied metastorage is inconsistent because of the ungoing
+     * update/recovery process. Data associated with the key may be ignored.
+     */
+    private static final String CLEANUP_GUARD_KEY = "clean";
+
+    /** */
+    @Nullable public static byte[] marshal(Serializable val) throws IgniteCheckedException {
+        return val == null ? null : JdkMarshaller.DEFAULT.marshal(val);
+    }
+
+    /** */
+    @Nullable public static Serializable unmarshal(byte[] valBytes) throws IgniteCheckedException {
+        return valBytes == null ? null : JdkMarshaller.DEFAULT.unmarshal(valBytes, U.gridClassLoader());
+    }
+
+    /** */
+    public static String localKey(String globalKey) {
+        return localKeyPrefix() + globalKey;
+    }
+
+    /** */
+    public static String globalKey(String locKey) {
+        assert locKey.startsWith(localKeyPrefix()) : locKey;
+
+        return locKey.substring(localKeyPrefix().length());
+    }
+
+    /** */
+    public static String localKeyPrefix() {
+        return COMMON_KEY_PREFIX + KEY_PREFIX;
+    }
+
+    /** */
+    public static String historyItemKey(long ver) {
+        return historyItemPrefix() + ver;
+    }
+
+    /** */
+    public static long historyItemVer(String histItemKey) {
+        assert histItemKey.startsWith(historyItemPrefix());
+
+        return Long.parseLong(histItemKey.substring(historyItemPrefix().length()));
+    }
+
+    /** */
+    public static String historyItemPrefix() {
+        return COMMON_KEY_PREFIX + HISTORY_ITEM_KEY_PREFIX;
+    }
+
+    /** */
+    public static String historyVersionKey() {
+        return COMMON_KEY_PREFIX + HISTORY_VER_KEY;
+    }
+
+    /** */
+    public static String cleanupGuardKey() {
+        return COMMON_KEY_PREFIX + CLEANUP_GUARD_KEY;
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageVersion.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageVersion.java
new file mode 100644
index 0000000..7ed775e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageVersion.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.metastorage.persistence;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.function.LongFunction;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/** Version class for distributed metastorage. */
+class DistributedMetaStorageVersion implements Serializable {
+    /** Serial version UID. */
+    private static final long serialVersionUID = 0L;
+
+    /** Version with id "0". */
+    public static final DistributedMetaStorageVersion INITIAL_VERSION = new DistributedMetaStorageVersion(0L, 1L);
+
+    /** Incremental rehashing considering new update information. */
+    private static long nextHash(long hash, DistributedMetaStorageHistoryItem update) {
+        return hash * 31L + ((long)update.key.hashCode() << 32) + Arrays.hashCode(update.valBytes);
+    }
+
+    /**
+     * Id is basically a total number of distributed metastorage updates in current cluster.
+     * Increases incrementally on every update starting with zero.
+     *
+     * @see #INITIAL_VERSION
+     */
+    @GridToStringInclude
+    public final long id;
+
+    /**
+     * Hash of the whole updates list. Hashing algorinthm is almost the same as in {@link List#hashCode()}, but with
+     * {@code long} value instead of {@code int}.
+     */
+    @GridToStringInclude
+    public final long hash;
+
+    /**
+     * Constructor with all fields.
+     *
+     * @param id Id.
+     * @param hash Hash.
+     */
+    private DistributedMetaStorageVersion(long id, long hash) {
+        this.id = id;
+        this.hash = hash;
+    }
+
+    /**
+     * Calculate next version considering passed update information.
+     *
+     * @param update Single update.
+     * @return Next version.
+     */
+    public DistributedMetaStorageVersion nextVersion(DistributedMetaStorageHistoryItem update) {
+        return new DistributedMetaStorageVersion(id + 1, nextHash(hash, update));
+    }
+
+    /**
+     * Calculate next version considering passed update information.
+     *
+     * @param updates Updates collection.
+     * @return Next version.
+     */
+    public DistributedMetaStorageVersion nextVersion(Collection<DistributedMetaStorageHistoryItem> updates) {
+        long hash = this.hash;
+
+        for (DistributedMetaStorageHistoryItem update : updates)
+            hash = nextHash(hash, update);
+
+        return new DistributedMetaStorageVersion(id + updates.size(), hash);
+    }
+
+    /**
+     * Calculate next version considering passed update information.
+     *
+     * @param updates Updates array.
+     * @param fromIdx Index of the first required update in the array.
+     * @param toIdx Index after the last required update in the array.
+     * @return Next version.
+     */
+    public DistributedMetaStorageVersion nextVersion(
+        DistributedMetaStorageHistoryItem[] updates,
+        int fromIdx,
+        int toIdx // exclusive
+    ) {
+        long hash = this.hash;
+
+        for (int idx = fromIdx; idx < toIdx; idx++)
+            hash = nextHash(hash, updates[idx]);
+
+        return new DistributedMetaStorageVersion(id + toIdx - fromIdx, hash);
+    }
+
+    /**
+     * Calculate next version considering passed update information.
+     *
+     * @param update Function that provides the update by specific version.
+     * @param fromVer Starting version, inclusive.
+     * @param toVer Ending version, inclusive.
+     * @return Next version.
+     */
+    public DistributedMetaStorageVersion nextVersion(
+        LongFunction<DistributedMetaStorageHistoryItem> update,
+        long fromVer,
+        long toVer // inclusive
+    ) {
+        assert fromVer <= toVer;
+
+        long hash = this.hash;
+
+        for (long idx = fromVer; idx <= toVer; idx++)
+            hash = nextHash(hash, update.apply(idx));
+
+        return new DistributedMetaStorageVersion(id + toVer + 1 - fromVer, hash);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object o) {
+        if (this == o)
+            return true;
+
+        if (o == null || getClass() != o.getClass())
+            return false;
+
+        DistributedMetaStorageVersion ver = (DistributedMetaStorageVersion)o;
+
+        return id == ver.id && hash == ver.hash;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        return 31 * Long.hashCode(id) + Long.hashCode(hash);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(DistributedMetaStorageVersion.class, this);
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/EmptyDistributedMetaStorageBridge.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/EmptyDistributedMetaStorageBridge.java
new file mode 100644
index 0000000..2043c55
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/EmptyDistributedMetaStorageBridge.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.metastorage.persistence;
+
+import java.io.Serializable;
+import java.util.function.BiConsumer;
+
+/**
+ * Empty metastorage is the specific implementation to be used in in-memory clusters to have distributed metastorage
+ * without any data until cluster is activated.
+ */
+class EmptyDistributedMetaStorageBridge implements DistributedMetaStorageBridge {
+    /** {@inheritDoc} */
+    @Override public Serializable read(String globalKey, boolean unmarshal) {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void iterate(
+        String globalKeyPrefix,
+        BiConsumer<String, ? super Serializable> cb,
+        boolean unmarshal
+    ) {
+    }
+
+    /** {@inheritDoc} */
+    @Override public void write(String globalKey, byte[] valBytes) {
+        throw new UnsupportedOperationException("write");
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onUpdateMessage(
+        DistributedMetaStorageHistoryItem histItem,
+        Serializable val,
+        boolean notifyListeners
+    ) {
+        throw new UnsupportedOperationException("onUpdateMessage");
+    }
+
+    /** {@inheritDoc} */
+    @Override public void removeHistoryItem(long ver) {
+        throw new UnsupportedOperationException("removeHistoryItem");
+    }
+
+    /** {@inheritDoc} */
+    @Override public DistributedMetaStorageHistoryItem[] localFullData() {
+        return DistributedMetaStorageHistoryItem.EMPTY_ARRAY;
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/InMemoryCachedDistributedMetaStorageBridge.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/InMemoryCachedDistributedMetaStorageBridge.java
new file mode 100644
index 0000000..1ff14c4
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/InMemoryCachedDistributedMetaStorageBridge.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.metastorage.persistence;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.function.BiConsumer;
+import org.apache.ignite.IgniteCheckedException;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUtil.unmarshal;
+
+/** */
+class InMemoryCachedDistributedMetaStorageBridge implements DistributedMetaStorageBridge {
+    /** */
+    private DistributedMetaStorageImpl dms;
+
+    /** */
+    private final Map<String, byte[]> cache = new ConcurrentSkipListMap<>();
+
+    /** */
+    public InMemoryCachedDistributedMetaStorageBridge(DistributedMetaStorageImpl dms) {
+        this.dms = dms;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Serializable read(String globalKey, boolean unmarshal) throws IgniteCheckedException {
+        byte[] valBytes = cache.get(globalKey);
+
+        return unmarshal ? unmarshal(valBytes) : valBytes;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void iterate(
+        String globalKeyPrefix,
+        BiConsumer<String, ? super Serializable> cb,
+        boolean unmarshal
+    ) throws IgniteCheckedException {
+        for (Map.Entry<String, byte[]> entry : cache.entrySet()) {
+            if (entry.getKey().startsWith(globalKeyPrefix))
+                cb.accept(entry.getKey(), unmarshal ? unmarshal(entry.getValue()) : entry.getValue());
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void write(String globalKey, @Nullable byte[] valBytes) {
+        if (valBytes == null)
+            cache.remove(globalKey);
+        else
+            cache.put(globalKey, valBytes);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onUpdateMessage(
+        DistributedMetaStorageHistoryItem histItem,
+        Serializable val,
+        boolean notifyListeners
+    ) throws IgniteCheckedException {
+        dms.ver = dms.ver.nextVersion(histItem);
+
+        if (notifyListeners)
+            dms.notifyListeners(histItem.key, read(histItem.key, true), val);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void removeHistoryItem(long ver) {
+    }
+
+    /** {@inheritDoc} */
+    @Override public DistributedMetaStorageHistoryItem[] localFullData() {
+        return cache.entrySet().stream().map(
+            entry -> new DistributedMetaStorageHistoryItem(entry.getKey(), entry.getValue())
+        ).toArray(DistributedMetaStorageHistoryItem[]::new);
+    }
+
+    /** */
+    public void restore(StartupExtras startupExtras) {
+        if (startupExtras.fullNodeData != null) {
+            DistributedMetaStorageClusterNodeData fullNodeData = startupExtras.fullNodeData;
+
+            dms.ver = fullNodeData.ver;
+
+            for (DistributedMetaStorageHistoryItem item : fullNodeData.fullData)
+                cache.put(item.key, item.valBytes);
+
+            for (int i = 0, len = fullNodeData.hist.length; i < len; i++) {
+                DistributedMetaStorageHistoryItem histItem = fullNodeData.hist[i];
+
+                dms.addToHistoryCache(dms.ver.id + i + 1 - len, histItem);
+            }
+        }
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/NotAvailableDistributedMetaStorageBridge.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/NotAvailableDistributedMetaStorageBridge.java
new file mode 100644
index 0000000..5417c2c
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/NotAvailableDistributedMetaStorageBridge.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.metastorage.persistence;
+
+import java.io.Serializable;
+import java.util.function.BiConsumer;
+
+/** */
+class NotAvailableDistributedMetaStorageBridge implements DistributedMetaStorageBridge {
+    /** {@inheritDoc} */
+    @Override public Serializable read(String globalKey, boolean unmarshal) {
+        throw new UnsupportedOperationException("read");
+    }
+
+    /** {@inheritDoc} */
+    @Override public void iterate(
+        String globalKeyPre,
+        BiConsumer<String, ? super Serializable> cb,
+        boolean unmarshal
+    ) {
+        throw new UnsupportedOperationException("iterate");
+    }
+
+    /** {@inheritDoc} */
+    @Override public void write(String globalKey, byte[] valBytes) {
+        throw new UnsupportedOperationException("write");
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onUpdateMessage(
+        DistributedMetaStorageHistoryItem histItem,
+        Serializable val,
+        boolean notifyListeners
+    ) {
+        throw new UnsupportedOperationException("onUpdateMessage");
+    }
+
+    /** {@inheritDoc} */
+    @Override public void removeHistoryItem(long ver) {
+        throw new UnsupportedOperationException("removeHistoryItem");
+    }
+
+    /** {@inheritDoc} */
+    @Override public DistributedMetaStorageHistoryItem[] localFullData() {
+        throw new UnsupportedOperationException("localFullData");
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/ReadOnlyDistributedMetaStorageBridge.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/ReadOnlyDistributedMetaStorageBridge.java
new file mode 100644
index 0000000..84c955d
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/ReadOnlyDistributedMetaStorageBridge.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.metastorage.persistence;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.function.BiConsumer;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadOnlyMetastorage;
+
+import static org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageHistoryItem.EMPTY_ARRAY;
+import static org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUtil.cleanupGuardKey;
+import static org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUtil.globalKey;
+import static org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUtil.historyItemKey;
+import static org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUtil.historyVersionKey;
+import static org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUtil.localKey;
+import static org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUtil.localKeyPrefix;
+import static org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUtil.unmarshal;
+
+/** */
+class ReadOnlyDistributedMetaStorageBridge implements DistributedMetaStorageBridge {
+    /** */
+    private static final Comparator<DistributedMetaStorageHistoryItem> HISTORY_ITEM_KEY_COMPARATOR =
+        Comparator.comparing(item -> item.key);
+
+    /** */
+    private DistributedMetaStorageHistoryItem[] locFullData;
+
+    /** */
+    private DistributedMetaStorageVersion ver;
+
+    /** */
+    public ReadOnlyDistributedMetaStorageBridge() {
+    }
+
+    /** */
+    public ReadOnlyDistributedMetaStorageBridge(
+        DistributedMetaStorageHistoryItem[] locFullData
+    ) {
+        this.locFullData = locFullData;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Serializable read(String globalKey, boolean unmarshal) throws IgniteCheckedException {
+        int idx = Arrays.binarySearch(
+            locFullData,
+            new DistributedMetaStorageHistoryItem(globalKey, null),
+            HISTORY_ITEM_KEY_COMPARATOR
+        );
+
+        if (idx >= 0)
+            return unmarshal ? unmarshal(locFullData[idx].valBytes) : locFullData[idx].valBytes;
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void iterate(
+        String globalKeyPrefix,
+        BiConsumer<String, ? super Serializable> cb,
+        boolean unmarshal
+    ) throws IgniteCheckedException {
+        int idx = Arrays.binarySearch(
+            locFullData,
+            new DistributedMetaStorageHistoryItem(globalKeyPrefix, null),
+            HISTORY_ITEM_KEY_COMPARATOR
+        );
+
+        if (idx < 0)
+            idx = -1 - idx;
+
+        for (; idx < locFullData.length && locFullData[idx].key.startsWith(globalKeyPrefix); ++idx) {
+            DistributedMetaStorageHistoryItem item = locFullData[idx];
+
+            cb.accept(item.key, unmarshal ? unmarshal(item.valBytes) : item.valBytes);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void write(String globalKey, byte[] valBytes) {
+        throw new UnsupportedOperationException("write");
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onUpdateMessage(
+        DistributedMetaStorageHistoryItem histItem,
+        Serializable val,
+        boolean notifyListeners
+    ) {
+        throw new UnsupportedOperationException("onUpdateMessage");
+    }
+
+    /** {@inheritDoc} */
+    @Override public void removeHistoryItem(long ver) {
+        throw new UnsupportedOperationException("removeHistoryItem");
+    }
+
+    /** {@inheritDoc} */
+    @Override public DistributedMetaStorageHistoryItem[] localFullData() {
+        return locFullData;
+    }
+
+    /** */
+    public DistributedMetaStorageVersion version() {
+        return ver;
+    }
+
+    /** */
+    public DistributedMetaStorageVersion readInitialData(
+        ReadOnlyMetastorage metastorage,
+        StartupExtras startupExtras
+    ) throws IgniteCheckedException {
+        if (metastorage.readRaw(cleanupGuardKey()) != null) {
+            ver = DistributedMetaStorageVersion.INITIAL_VERSION;
+
+            locFullData = EMPTY_ARRAY;
+
+            return ver;
+        }
+        else {
+            DistributedMetaStorageVersion storedVer =
+                (DistributedMetaStorageVersion)metastorage.read(historyVersionKey());
+
+            if (storedVer == null) {
+                ver = DistributedMetaStorageVersion.INITIAL_VERSION;
+
+                locFullData = EMPTY_ARRAY;
+
+                return ver;
+            }
+            else {
+                ver = storedVer;
+
+                DistributedMetaStorageHistoryItem histItem =
+                    (DistributedMetaStorageHistoryItem)metastorage.read(historyItemKey(storedVer.id + 1));
+
+                DistributedMetaStorageHistoryItem[] firstToWrite = {null};
+
+                if (histItem != null) {
+                    ver = storedVer.nextVersion(histItem);
+
+                    startupExtras.deferredUpdates.add(histItem);
+                }
+                else {
+                    histItem = (DistributedMetaStorageHistoryItem)metastorage.read(historyItemKey(storedVer.id));
+
+                    if (histItem != null) {
+                        byte[] valBytes = metastorage.readRaw(localKey(histItem.key));
+
+                        if (!Arrays.equals(valBytes, histItem.valBytes))
+                            firstToWrite[0] = histItem;
+                    }
+                }
+
+                List<DistributedMetaStorageHistoryItem> locFullDataList = new ArrayList<>();
+
+                metastorage.iterate(
+                    localKeyPrefix(),
+                    (key, val) -> {
+                        String globalKey = globalKey(key);
+
+                        if (firstToWrite[0] != null && firstToWrite[0].key.equals(globalKey)) {
+                            if (firstToWrite[0].valBytes != null)
+                                locFullDataList.add(firstToWrite[0]);
+
+                            firstToWrite[0] = null;
+                        }
+                        else if (firstToWrite[0] != null && firstToWrite[0].key.compareTo(globalKey) < 0) {
+                            if (firstToWrite[0].valBytes != null)
+                                locFullDataList.add(firstToWrite[0]);
+
+                            firstToWrite[0] = null;
+
+                            locFullDataList.add(new DistributedMetaStorageHistoryItem(globalKey, (byte[])val));
+                        }
+                        else
+                            locFullDataList.add(new DistributedMetaStorageHistoryItem(globalKey, (byte[])val));
+                    },
+                    false
+                );
+
+                if (firstToWrite[0] != null && firstToWrite[0].valBytes != null) {
+                    locFullDataList.add(
+                        new DistributedMetaStorageHistoryItem(firstToWrite[0].key, firstToWrite[0].valBytes)
+                    );
+                }
+
+                locFullData = locFullDataList.toArray(EMPTY_ARRAY);
+
+                return storedVer;
+            }
+        }
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/ReadWriteMetastorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/StartupExtras.java
similarity index 64%
copy from modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/ReadWriteMetastorage.java
copy to modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/StartupExtras.java
index ab48afc..58f8f10 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/ReadWriteMetastorage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/StartupExtras.java
@@ -14,19 +14,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.ignite.internal.processors.cache.persistence.metastorage;
 
-import java.io.Serializable;
-import org.apache.ignite.IgniteCheckedException;
-import org.jetbrains.annotations.NotNull;
+package org.apache.ignite.internal.processors.metastorage.persistence;
 
-/**
- *
- */
-public interface ReadWriteMetastorage extends ReadOnlyMetastorage {
+import java.util.ArrayList;
+import java.util.List;
+
+/** */
+@SuppressWarnings("PublicField")
+class StartupExtras {
     /** */
-    public void write(@NotNull String key, @NotNull Serializable val) throws IgniteCheckedException;
+    public List<DistributedMetaStorageHistoryItem> deferredUpdates = new ArrayList<>();
 
     /** */
-    public void remove(@NotNull String key) throws IgniteCheckedException;
+    public DistributedMetaStorageClusterNodeData fullNodeData;
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/WritableDistributedMetaStorageBridge.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/WritableDistributedMetaStorageBridge.java
new file mode 100644
index 0000000..37c6181
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/WritableDistributedMetaStorageBridge.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.metastorage.persistence;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.function.BiConsumer;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadWriteMetastorage;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageHistoryItem.EMPTY_ARRAY;
+import static org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUtil.COMMON_KEY_PREFIX;
+import static org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUtil.cleanupGuardKey;
+import static org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUtil.globalKey;
+import static org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUtil.historyItemKey;
+import static org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUtil.historyVersionKey;
+import static org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUtil.localKey;
+import static org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUtil.localKeyPrefix;
+
+/** */
+class WritableDistributedMetaStorageBridge implements DistributedMetaStorageBridge {
+    /** */
+    private static final byte[] DUMMY_VALUE = {};
+
+    /** */
+    private final DistributedMetaStorageImpl dms;
+
+    /** */
+    private final ReadWriteMetastorage metastorage;
+
+    /** */
+    public WritableDistributedMetaStorageBridge(DistributedMetaStorageImpl dms, ReadWriteMetastorage metastorage) {
+        this.dms = dms;
+        this.metastorage = metastorage;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Serializable read(String globalKey, boolean unmarshal) throws IgniteCheckedException {
+        return unmarshal ? metastorage.read(localKey(globalKey)) : metastorage.readRaw(localKey(globalKey));
+    }
+
+    /** {@inheritDoc} */
+    @Override public void iterate(
+        String globalKeyPrefix,
+        BiConsumer<String, ? super Serializable> cb,
+        boolean unmarshal
+    ) throws IgniteCheckedException {
+        metastorage.iterate(
+            localKeyPrefix() + globalKeyPrefix,
+            (key, val) -> cb.accept(globalKey(key), val),
+            unmarshal
+        );
+    }
+
+    /** {@inheritDoc} */
+    @Override public void write(String globalKey, @Nullable byte[] valBytes) throws IgniteCheckedException {
+        if (valBytes == null)
+            metastorage.remove(localKey(globalKey));
+        else
+            metastorage.writeRaw(localKey(globalKey), valBytes);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onUpdateMessage(
+        DistributedMetaStorageHistoryItem histItem,
+        Serializable val,
+        boolean notifyListeners
+    ) throws IgniteCheckedException {
+        metastorage.write(historyItemKey(dms.ver.id + 1), histItem);
+
+        dms.ver = dms.ver.nextVersion(histItem);
+
+        metastorage.write(historyVersionKey(), dms.ver);
+
+        if (notifyListeners)
+            dms.notifyListeners(histItem.key, read(histItem.key, true), val);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void removeHistoryItem(long ver) throws IgniteCheckedException {
+        metastorage.remove(historyItemKey(ver));
+    }
+
+    /** {@inheritDoc} */
+    @Override public DistributedMetaStorageHistoryItem[] localFullData() throws IgniteCheckedException {
+        List<DistributedMetaStorageHistoryItem> locFullData = new ArrayList<>();
+
+        metastorage.iterate(
+            localKeyPrefix(),
+            (key, val) -> locFullData.add(new DistributedMetaStorageHistoryItem(globalKey(key), (byte[])val)),
+            false
+        );
+
+        return locFullData.toArray(EMPTY_ARRAY);
+    }
+
+    /** */
+    public void restore(StartupExtras startupExtras) throws IgniteCheckedException {
+        assert startupExtras != null;
+
+        String cleanupGuardKey = cleanupGuardKey();
+
+        if (metastorage.readRaw(cleanupGuardKey) != null || startupExtras.fullNodeData != null) {
+            metastorage.writeRaw(cleanupGuardKey, DUMMY_VALUE);
+
+            Set<String> allKeys = new HashSet<>();
+
+            metastorage.iterate(COMMON_KEY_PREFIX, (key, val) -> allKeys.add(key), false);
+
+            for (String key : allKeys)
+                metastorage.remove(key);
+
+            if (startupExtras.fullNodeData != null) {
+                DistributedMetaStorageClusterNodeData fullNodeData = startupExtras.fullNodeData;
+
+                dms.ver = fullNodeData.ver;
+
+                dms.clearHistoryCache();
+
+                for (DistributedMetaStorageHistoryItem item : fullNodeData.fullData)
+                    metastorage.writeRaw(localKey(item.key), item.valBytes);
+
+                for (int i = 0, len = fullNodeData.hist.length; i < len; i++) {
+                    DistributedMetaStorageHistoryItem histItem = fullNodeData.hist[i];
+
+                    long histItemVer = dms.ver.id + i + 1 - len;
+
+                    metastorage.write(historyItemKey(histItemVer), histItem);
+
+                    dms.addToHistoryCache(histItemVer, histItem);
+                }
+
+                metastorage.write(historyVersionKey(), dms.ver);
+            }
+
+            metastorage.remove(cleanupGuardKey);
+        }
+
+        DistributedMetaStorageVersion storedVer = (DistributedMetaStorageVersion)metastorage.read(historyVersionKey());
+
+        if (storedVer == null)
+            metastorage.write(historyVersionKey(), DistributedMetaStorageVersion.INITIAL_VERSION);
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/subscription/GridInternalSubscriptionProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/subscription/GridInternalSubscriptionProcessor.java
index 6db7fa5..5e48547 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/subscription/GridInternalSubscriptionProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/subscription/GridInternalSubscriptionProcessor.java
@@ -22,6 +22,7 @@ import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.processors.GridProcessorAdapter;
 import org.apache.ignite.internal.processors.cache.persistence.DatabaseLifecycleListener;
 import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetastorageLifecycleListener;
+import org.apache.ignite.internal.processors.metastorage.DistributedMetastorageLifecycleListener;
 import org.jetbrains.annotations.NotNull;
 
 /**
@@ -34,10 +35,13 @@ import org.jetbrains.annotations.NotNull;
  */
 public class GridInternalSubscriptionProcessor extends GridProcessorAdapter {
     /** */
-    private List<MetastorageLifecycleListener> metastorageListeners = new ArrayList<>();
+    private final List<MetastorageLifecycleListener> metastorageListeners = new ArrayList<>();
 
     /** */
-    private List<DatabaseLifecycleListener> databaseListeners = new ArrayList<>();
+    private final List<DistributedMetastorageLifecycleListener> distributedMetastorageListeners = new ArrayList<>();
+
+    /** */
+    private final List<DatabaseLifecycleListener> dbListeners = new ArrayList<>();
 
 
     /**
@@ -61,15 +65,28 @@ public class GridInternalSubscriptionProcessor extends GridProcessorAdapter {
     }
 
     /** */
+    public void registerDistributedMetastorageListener(@NotNull DistributedMetastorageLifecycleListener lsnr) {
+        if (lsnr == null)
+            throw new NullPointerException("Global metastorage subscriber should be not-null.");
+
+        distributedMetastorageListeners.add(lsnr);
+    }
+
+    /** */
+    public List<DistributedMetastorageLifecycleListener> getDistributedMetastorageSubscribers() {
+        return distributedMetastorageListeners;
+    }
+
+    /** */
     public void registerDatabaseListener(@NotNull DatabaseLifecycleListener databaseListener) {
         if (databaseListener == null)
             throw new NullPointerException("Database subscriber should be not-null.");
 
-        databaseListeners.add(databaseListener);
+        dbListeners.add(databaseListener);
     }
 
     /** */
     public List<DatabaseLifecycleListener> getDatabaseListeners() {
-        return databaseListeners;
+        return dbListeners;
     }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/marshaller/jdk/JdkMarshaller.java b/modules/core/src/main/java/org/apache/ignite/marshaller/jdk/JdkMarshaller.java
index 742925d..8f7edd1 100644
--- a/modules/core/src/main/java/org/apache/ignite/marshaller/jdk/JdkMarshaller.java
+++ b/modules/core/src/main/java/org/apache/ignite/marshaller/jdk/JdkMarshaller.java
@@ -68,6 +68,9 @@ import org.jetbrains.annotations.Nullable;
  * For information about Spring framework visit <a href="http://www.springframework.org/">www.springframework.org</a>
  */
 public class JdkMarshaller extends AbstractNodeNameAwareMarshaller {
+    /** Default instance. */
+    public static final JdkMarshaller DEFAULT = new JdkMarshaller();
+
     /** Class name filter. */
     private final IgnitePredicate<String> clsFilter;
 
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/encryption/EncryptedCacheDestroyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/encryption/EncryptedCacheDestroyTest.java
index cfbe642..a204297 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/encryption/EncryptedCacheDestroyTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/encryption/EncryptedCacheDestroyTest.java
@@ -123,11 +123,11 @@ public class EncryptedCacheDestroyTest extends AbstractEncryptionTest {
         if (keyShouldBeEmpty) {
             assertNull(encKey);
 
-            assertNull(metaStore.getData(ENCRYPTION_KEY_PREFIX + grpId));
+            assertNull(metaStore.readRaw(ENCRYPTION_KEY_PREFIX + grpId));
         } else {
             assertNotNull(encKey);
 
-            assertNotNull(metaStore.getData(ENCRYPTION_KEY_PREFIX + grpId));
+            assertNotNull(metaStore.readRaw(ENCRYPTION_KEY_PREFIX + grpId));
         }
     }
 }
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/IgniteMetaStorageBasicTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/IgniteMetaStorageBasicTest.java
index 32277db..d02bb5e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/IgniteMetaStorageBasicTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/IgniteMetaStorageBasicTest.java
@@ -18,6 +18,8 @@
 package org.apache.ignite.internal.processors.cache.persistence.metastorage;
 
 import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -37,6 +39,8 @@ import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
 import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
 import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.marshaller.jdk.JdkMarshaller;
+import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.junit.Assert;
 import org.junit.Test;
@@ -111,7 +115,7 @@ public class IgniteMetaStorageBasicTest extends GridCommonAbstractTest {
 
                 metaStorage.remove(key);
 
-                metaStorage.putData(key, arr/*b.toString().getBytes()*/);
+                metaStorage.writeRaw(key, arr/*b.toString().getBytes()*/);
             }
         }
         finally {
@@ -150,7 +154,7 @@ public class IgniteMetaStorageBasicTest extends GridCommonAbstractTest {
 
                 metaStorage.remove(key);
 
-                metaStorage.putData(key, arr);
+                metaStorage.writeRaw(key, arr);
             }
         }
         finally {
@@ -170,7 +174,7 @@ public class IgniteMetaStorageBasicTest extends GridCommonAbstractTest {
         for (Iterator<IgniteBiTuple<String, byte[]>> it = generateTestData(size, from).iterator(); it.hasNext(); ) {
             IgniteBiTuple<String, byte[]> d = it.next();
 
-            metaStorage.putData(d.getKey(), d.getValue());
+            metaStorage.writeRaw(d.getKey(), d.getValue());
 
             res.put(d.getKey(), d.getValue());
         }
@@ -314,7 +318,7 @@ public class IgniteMetaStorageBasicTest extends GridCommonAbstractTest {
 
             try {
                 for (Map.Entry<String, byte[]> v : testData.entrySet())
-                    metaStorage.putData(v.getKey(), v.getValue());
+                    metaStorage.writeRaw(v.getKey(), v.getValue());
             }
             finally {
                 db.checkpointReadUnlock();
@@ -483,6 +487,69 @@ public class IgniteMetaStorageBasicTest extends GridCommonAbstractTest {
         verifyKeys(grid(1), KEYS_CNT, KEY_PREFIX, UPDATED_VAL_PREFIX);
     }
 
+    /**
+     * @throws Exception If fails.
+     */
+    @Test
+    public void testReadOnlyIterationOrder() throws Exception {
+        IgniteEx ignite = startGrid(0);
+
+        ignite.cluster().active(true);
+
+        MetaStorage storage = ignite.context().cache().context().database().metaStorage();
+
+        ignite.context().cache().context().database().checkpointReadLock();
+
+        try {
+            storage.write("a", 0);
+
+            storage.write("z", 0);
+
+            storage.write("pref-1", 1);
+
+            storage.write("pref-3", 3);
+
+            storage.write("pref-5", 5);
+
+            storage.write("pref-7", 7);
+
+            GridTestUtils.setFieldValue(storage, "readOnly", true);
+
+            storage.applyUpdate("pref-0", JdkMarshaller.DEFAULT.marshal(0));
+
+            storage.applyUpdate("pref-1", JdkMarshaller.DEFAULT.marshal(10));
+
+            storage.applyUpdate("pref-4", JdkMarshaller.DEFAULT.marshal(4));
+
+            storage.applyUpdate("pref-5", null);
+
+            storage.applyUpdate("pref-8", JdkMarshaller.DEFAULT.marshal(8));
+
+            List<String> keys = new ArrayList<>();
+
+            List<Integer> values = new ArrayList<>();
+
+            storage.iterate("pref", (key, val) -> {
+                keys.add(key);
+
+                values.add((Integer)val);
+            }, true);
+
+            assertEqualsCollections(
+                Arrays.asList("pref-0", "pref-1", "pref-3", "pref-4", "pref-7", "pref-8"),
+                keys
+            );
+
+            assertEqualsCollections(
+                Arrays.asList(0, 10, 3, 4, 7, 8),
+                values
+            );
+        }
+        finally {
+            ignite.context().cache().context().database().checkpointReadUnlock();
+        }
+    }
+
     /** */
     private void loadKeys(IgniteEx ig,
         byte keysCnt,
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStoragePersistentTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStoragePersistentTest.java
new file mode 100644
index 0000000..d3c6cc3
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStoragePersistentTest.java
@@ -0,0 +1,698 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.metastorage;
+
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage;
+import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageImpl;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_GLOBAL_METASTORAGE_HISTORY_MAX_BYTES;
+
+/**
+ * Test for {@link DistributedMetaStorageImpl} with enabled persistence.
+ */
+@RunWith(JUnit4.class)
+public class DistributedMetaStoragePersistentTest extends DistributedMetaStorageTest {
+    /** {@inheritDoc} */
+    @Override protected boolean isPersistent() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void before() throws Exception {
+        super.before();
+
+        cleanPersistenceDir();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void after() throws Exception {
+        super.after();
+
+        cleanPersistenceDir();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testRestart() throws Exception {
+        IgniteEx ignite = startGrid(0);
+
+        ignite.cluster().active(true);
+
+        ignite.context().distributedMetastorage().write("key", "value");
+
+        stopGrid(0);
+
+        ignite = startGrid(0);
+
+        ignite.cluster().active(true);
+
+        assertEquals("value", ignite.context().distributedMetastorage().read("key"));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testJoinDirtyNode() throws Exception {
+        IgniteEx ignite = startGrid(0);
+
+        startGrid(1);
+
+        ignite.cluster().active(true);
+
+        ignite.context().distributedMetastorage().write("key1", "value1");
+
+        stopGrid(1);
+
+        stopGrid(0);
+
+        ignite = startGrid(0);
+
+        ignite.cluster().active(true);
+
+        ignite.context().distributedMetastorage().write("key2", "value2");
+
+        IgniteEx newNode = startGrid(1);
+
+        assertEquals("value1", newNode.context().distributedMetastorage().read("key1"));
+
+        assertEquals("value2", newNode.context().distributedMetastorage().read("key2"));
+
+        assertDistributedMetastoragesAreEqual(ignite, newNode);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testJoinDirtyNodeFullData() throws Exception {
+        System.setProperty(IGNITE_GLOBAL_METASTORAGE_HISTORY_MAX_BYTES, "0");
+
+        IgniteEx ignite = startGrid(0);
+
+        startGrid(1);
+
+        ignite.cluster().active(true);
+
+        ignite.context().distributedMetastorage().write("key1", "value1");
+
+        stopGrid(1);
+
+        stopGrid(0);
+
+        ignite = startGrid(0);
+
+        ignite.cluster().active(true);
+
+        ignite.context().distributedMetastorage().write("key2", "value2");
+
+        ignite.context().distributedMetastorage().write("key3", "value3");
+
+        IgniteEx newNode = startGrid(1);
+
+        assertEquals("value1", newNode.context().distributedMetastorage().read("key1"));
+
+        assertEquals("value2", newNode.context().distributedMetastorage().read("key2"));
+
+        assertEquals("value3", newNode.context().distributedMetastorage().read("key3"));
+
+        assertDistributedMetastoragesAreEqual(ignite, newNode);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testJoinNodeWithLongerHistory() throws Exception {
+        IgniteEx ignite = startGrid(0);
+
+        startGrid(1);
+
+        ignite.cluster().active(true);
+
+        ignite.context().distributedMetastorage().write("key1", "value1");
+
+        stopGrid(1);
+
+        ignite.context().distributedMetastorage().write("key2", "value2");
+
+        stopGrid(0);
+
+        ignite = startGrid(1);
+
+        startGrid(0);
+
+        awaitPartitionMapExchange();
+
+        assertEquals("value1", ignite.context().distributedMetastorage().read("key1"));
+
+        assertEquals("value2", ignite.context().distributedMetastorage().read("key2"));
+
+        assertDistributedMetastoragesAreEqual(ignite, grid(0));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test @Ignore
+    public void testJoinNodeWithoutEnoughHistory() throws Exception {
+        System.setProperty(IGNITE_GLOBAL_METASTORAGE_HISTORY_MAX_BYTES, "0");
+
+        IgniteEx ignite = startGrid(0);
+
+        startGrid(1);
+
+        ignite.cluster().active(true);
+
+        ignite.context().distributedMetastorage().write("key1", "value1");
+
+        stopGrid(1);
+
+        ignite.context().distributedMetastorage().write("key2", "value2");
+
+        ignite.context().distributedMetastorage().write("key3", "value3");
+
+        stopGrid(0);
+
+        ignite = startGrid(1);
+
+        startGrid(0);
+
+        awaitPartitionMapExchange();
+
+        assertEquals("value1", ignite.context().distributedMetastorage().read("key1"));
+
+        assertEquals("value2", ignite.context().distributedMetastorage().read("key2"));
+
+        assertEquals("value3", ignite.context().distributedMetastorage().read("key3"));
+
+        assertDistributedMetastoragesAreEqual(ignite, grid(0));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testNamesCollision() throws Exception {
+        IgniteEx ignite = startGrid(0);
+
+        ignite.cluster().active(true);
+
+        IgniteCacheDatabaseSharedManager dbSharedMgr = ignite.context().cache().context().database();
+
+        MetaStorage locMetastorage = dbSharedMgr.metaStorage();
+
+        DistributedMetaStorage distributedMetastorage = ignite.context().distributedMetastorage();
+
+        dbSharedMgr.checkpointReadLock();
+
+        try {
+            locMetastorage.write("key", "localValue");
+        }
+        finally {
+            dbSharedMgr.checkpointReadUnlock();
+        }
+
+        distributedMetastorage.write("key", "globalValue");
+
+        dbSharedMgr.checkpointReadLock();
+
+        try {
+            assertEquals("localValue", locMetastorage.read("key"));
+        }
+        finally {
+            dbSharedMgr.checkpointReadUnlock();
+        }
+
+        assertEquals("globalValue", distributedMetastorage.read("key"));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testUnstableTopology() throws Exception {
+        int cnt = 8;
+
+        startGridsMultiThreaded(cnt);
+
+        grid(0).cluster().active(true);
+
+        stopGrid(0);
+
+        startGrid(0);
+
+        AtomicInteger gridIdxCntr = new AtomicInteger(0);
+
+        AtomicBoolean stop = new AtomicBoolean();
+
+        IgniteInternalFuture<?> fut = multithreadedAsync(() -> {
+            int gridIdx = gridIdxCntr.incrementAndGet();
+
+            try {
+                while (!stop.get()) {
+                    stopGrid(gridIdx, true);
+
+                    Thread.sleep(10L);
+
+                    startGrid(gridIdx);
+
+                    Thread.sleep(10L);
+                }
+            }
+            catch (Exception e) {
+                log.error(e.getMessage(), e);
+            }
+        }, cnt - 1);
+
+        long start = System.currentTimeMillis();
+
+        long duration = GridTestUtils.SF.applyLB(30_000, 5_000);
+
+        try {
+            for (int i = 0; System.currentTimeMillis() < start + duration; i++) {
+                metastorage(0).write(
+                    "key" + i, Integer.toString(ThreadLocalRandom.current().nextInt(1000))
+                );
+            }
+        }
+        finally {
+            stop.set(true);
+
+            fut.get();
+        }
+
+        awaitPartitionMapExchange();
+
+        Thread.sleep(3_000L); // Remove later.
+
+        for (int i = 0; i < cnt; i++) {
+            DistributedMetaStorage distributedMetastorage = metastorage(i);
+
+            assertNull(U.field(distributedMetastorage, "startupExtras"));
+        }
+
+        for (int i = 1; i < cnt; i++)
+            assertDistributedMetastoragesAreEqual(grid(0), grid(i));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testWrongStartOrder1() throws Exception {
+        System.setProperty(IGNITE_GLOBAL_METASTORAGE_HISTORY_MAX_BYTES, "0");
+
+        int cnt = 4;
+
+        startGridsMultiThreaded(cnt);
+
+        grid(0).cluster().active(true);
+
+        metastorage(2).write("key1", "value1");
+
+        stopGrid(2);
+
+        metastorage(1).write("key2", "value2");
+
+        stopGrid(1);
+
+        metastorage(0).write("key3", "value3");
+
+        stopGrid(0);
+
+        metastorage(3).write("key4", "value4");
+
+        stopGrid(3);
+
+
+        for (int i = 0; i < cnt; i++)
+            startGrid(i);
+
+        awaitPartitionMapExchange();
+
+        for (int i = 1; i < cnt; i++)
+            assertDistributedMetastoragesAreEqual(grid(0), grid(i));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testWrongStartOrder2() throws Exception {
+        System.setProperty(IGNITE_GLOBAL_METASTORAGE_HISTORY_MAX_BYTES, "0");
+
+        int cnt = 6;
+
+        startGridsMultiThreaded(cnt);
+
+        grid(0).cluster().active(true);
+
+        metastorage(4).write("key1", "value1");
+
+        stopGrid(4);
+
+        metastorage(3).write("key2", "value2");
+
+        stopGrid(3);
+
+        metastorage(0).write("key3", "value3");
+
+        stopGrid(0);
+
+        stopGrid(2);
+
+        metastorage(1).write("key4", "value4");
+
+        stopGrid(1);
+
+        metastorage(5).write("key5", "value5");
+
+        stopGrid(5);
+
+
+        startGrid(1);
+
+        startGrid(0);
+
+        stopGrid(1);
+
+        for (int i = 1; i < cnt; i++)
+            startGrid(i);
+
+        awaitPartitionMapExchange();
+
+        for (int i = 1; i < cnt; i++)
+            assertDistributedMetastoragesAreEqual(grid(0), grid(i));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testWrongStartOrder3() throws Exception {
+        System.setProperty(IGNITE_GLOBAL_METASTORAGE_HISTORY_MAX_BYTES, "0");
+
+        int cnt = 5;
+
+        startGridsMultiThreaded(cnt);
+
+        grid(0).cluster().active(true);
+
+        metastorage(3).write("key1", "value1");
+
+        stopGrid(3);
+
+        stopGrid(0);
+
+        metastorage(2).write("key2", "value2");
+
+        stopGrid(2);
+
+        metastorage(1).write("key3", "value3");
+
+        stopGrid(1);
+
+        metastorage(4).write("key4", "value4");
+
+        stopGrid(4);
+
+
+        startGrid(1);
+
+        startGrid(0);
+
+        stopGrid(1);
+
+        for (int i = 1; i < cnt; i++)
+            startGrid(i);
+
+        awaitPartitionMapExchange();
+
+        for (int i = 1; i < cnt; i++)
+            assertDistributedMetastoragesAreEqual(grid(0), grid(i));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testWrongStartOrder4() throws Exception {
+        System.setProperty(IGNITE_GLOBAL_METASTORAGE_HISTORY_MAX_BYTES, "0");
+
+        int cnt = 6;
+
+        startGridsMultiThreaded(cnt);
+
+        grid(0).cluster().active(true);
+
+        metastorage(4).write("key1", "value1");
+
+        stopGrid(4);
+
+        stopGrid(0);
+
+        metastorage(3).write("key2", "value2");
+
+        stopGrid(3);
+
+        metastorage(2).write("key3", "value3");
+
+        stopGrid(2);
+
+        metastorage(1).write("key4", "value4");
+
+        stopGrid(1);
+
+        metastorage(5).write("key5", "value5");
+
+        stopGrid(5);
+
+
+        startGrid(2);
+
+        startGrid(0);
+
+        stopGrid(2);
+
+        for (int i = 1; i < cnt; i++)
+            startGrid(i);
+
+        awaitPartitionMapExchange();
+
+        for (int i = 1; i < cnt; i++)
+            assertDistributedMetastoragesAreEqual(grid(0), grid(i));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test @SuppressWarnings("ThrowableNotThrown")
+    public void testInactiveClusterWrite() throws Exception {
+        startGrid(0);
+
+        GridTestUtils.assertThrowsAnyCause(log, () -> {
+            metastorage(0).write("key", "value");
+
+            return null;
+        }, IllegalStateException.class, "Ignite cluster is not active");
+
+        GridTestUtils.assertThrowsAnyCause(log, () -> {
+            metastorage(0).remove("key");
+
+            return null;
+        }, IllegalStateException.class, "Ignite cluster is not active");
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test @SuppressWarnings("ThrowableNotThrown")
+    public void testConflictingData() throws Exception {
+        startGrid(0);
+
+        startGrid(1);
+
+        grid(0).cluster().active(true);
+
+        stopGrid(0);
+
+        metastorage(1).write("key", "value1");
+
+        stopGrid(1);
+
+        startGrid(0);
+
+        grid(0).cluster().active(true);
+
+        metastorage(0).write("key", "value2");
+
+        GridTestUtils.assertThrowsAnyCause(
+            log,
+            () -> startGrid(1),
+            IgniteSpiException.class,
+            "Joining node has conflicting distributed metastorage data"
+        );
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testFailover1() throws Exception {
+        System.setProperty(IGNITE_GLOBAL_METASTORAGE_HISTORY_MAX_BYTES, "0");
+
+        startGrid(0);
+
+        startGrid(1);
+
+        grid(0).cluster().active(true);
+
+        stopGrid(1);
+
+        metastorage(0).write("key1", "val1");
+
+        metastorage(0).write("key9", "val9");
+
+        IgniteCacheDatabaseSharedManager dbSharedMgr = grid(0).context().cache().context().database();
+
+        dbSharedMgr.checkpointReadLock();
+
+        try {
+            dbSharedMgr.metaStorage().remove("\u0000key-key9");
+        }
+        finally {
+            dbSharedMgr.checkpointReadUnlock();
+        }
+
+        stopGrid(0);
+
+        startGrid(0);
+
+        startGrid(1);
+
+        awaitPartitionMapExchange();
+
+        assertEquals("val9", metastorage(1).read("key9"));
+
+        assertDistributedMetastoragesAreEqual(grid(0), grid(1));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testFailover2() throws Exception {
+        System.setProperty(IGNITE_GLOBAL_METASTORAGE_HISTORY_MAX_BYTES, "0");
+
+        startGrid(0);
+
+        startGrid(1);
+
+        grid(0).cluster().active(true);
+
+        stopGrid(1);
+
+        metastorage(0).write("key9", "val9");
+
+        metastorage(0).write("key1", "val1");
+
+        IgniteCacheDatabaseSharedManager dbSharedMgr = grid(0).context().cache().context().database();
+
+        dbSharedMgr.checkpointReadLock();
+
+        try {
+            dbSharedMgr.metaStorage().remove("\u0000key-key1");
+        }
+        finally {
+            dbSharedMgr.checkpointReadUnlock();
+        }
+
+        stopGrid(0);
+
+        startGrid(0);
+
+        startGrid(1);
+
+        awaitPartitionMapExchange();
+
+        assertEquals("val1", metastorage(1).read("key1"));
+
+        assertDistributedMetastoragesAreEqual(grid(0), grid(1));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testFailover3() throws Exception {
+        System.setProperty(IGNITE_GLOBAL_METASTORAGE_HISTORY_MAX_BYTES, "0");
+
+        startGrid(0);
+
+        startGrid(1);
+
+        grid(0).cluster().active(true);
+
+        stopGrid(1);
+
+        metastorage(0).write("key1", "val1");
+
+        metastorage(0).write("key9", "val9");
+
+        metastorage(0).write("key5", "val5");
+
+        IgniteCacheDatabaseSharedManager dbSharedMgr = grid(0).context().cache().context().database();
+
+        dbSharedMgr.checkpointReadLock();
+
+        try {
+            dbSharedMgr.metaStorage().write("\u0000key-key5", "wrong-value");
+        }
+        finally {
+            dbSharedMgr.checkpointReadUnlock();
+        }
+
+        stopGrid(0);
+
+        startGrid(0);
+
+        startGrid(1);
+
+        awaitPartitionMapExchange();
+
+        assertEquals("val5", metastorage(1).read("key5"));
+
+        assertDistributedMetastoragesAreEqual(grid(0), grid(1));
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStorageTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStorageTest.java
new file mode 100644
index 0000000..8dbd71a
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStorageTest.java
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.metastorage;
+
+import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.failure.FailureHandler;
+import org.apache.ignite.failure.StopNodeFailureHandler;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageImpl;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_GLOBAL_METASTORAGE_HISTORY_MAX_BYTES;
+
+/**
+ * Test for {@link DistributedMetaStorageImpl} with disabled persistence.
+ */
+@RunWith(JUnit4.class)
+public class DistributedMetaStorageTest extends GridCommonAbstractTest {
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        cfg.setConsistentId(igniteInstanceName);
+
+        cfg.setDataStorageConfiguration(new DataStorageConfiguration()
+            .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
+                .setPersistenceEnabled(isPersistent())
+            )
+        );
+
+        return cfg;
+    }
+
+    /**
+     * @return {@code true} for tests with persistent cluster, {@code false} otherwise.
+     */
+    protected boolean isPersistent() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected FailureHandler getFailureHandler(String igniteInstanceName) {
+        return new StopNodeFailureHandler();
+    }
+
+    /** */
+    @Before
+    public void before() throws Exception {
+        stopAllGrids();
+    }
+
+    /** */
+    @After
+    public void after() throws Exception {
+        stopAllGrids();
+
+        System.clearProperty(IGNITE_GLOBAL_METASTORAGE_HISTORY_MAX_BYTES);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testSingleNode() throws Exception {
+        IgniteEx ignite = startGrid(0);
+
+        ignite.cluster().active(true);
+
+        DistributedMetaStorage metastorage = ignite.context().distributedMetastorage();
+
+        assertNull(metastorage.read("key"));
+
+        metastorage.write("key", "value");
+
+        assertEquals("value", metastorage.read("key"));
+
+        metastorage.remove("key");
+
+        assertNull(metastorage.read("key"));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testMultipleNodes() throws Exception {
+        int cnt = 4;
+
+        startGridsMultiThreaded(cnt);
+
+        grid(0).cluster().active(true);
+
+        for (int i = 0; i < cnt; i++) {
+            String key = UUID.randomUUID().toString();
+
+            String val = UUID.randomUUID().toString();
+
+            metastorage(i).write(key, val);
+
+            for (int j = 0; j < cnt; j++)
+                assertEquals(i + " " + j, val, metastorage(j).read(key));
+        }
+
+        for (int i = 1; i < cnt; i++)
+            assertDistributedMetastoragesAreEqual(grid(0), grid(i));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testListenersOnWrite() throws Exception {
+        int cnt = 4;
+
+        startGridsMultiThreaded(cnt);
+
+        grid(0).cluster().active(true);
+
+        AtomicInteger predCntr = new AtomicInteger();
+
+        for (int i = 0; i < cnt; i++) {
+            DistributedMetaStorage metastorage = metastorage(i);
+
+            metastorage.listen(key -> key.startsWith("k"), (key, oldVal, newVal) -> {
+                assertNull(oldVal);
+
+                assertEquals("value", newVal);
+
+                predCntr.incrementAndGet();
+            });
+        }
+
+        metastorage(0).write("key", "value");
+
+        assertEquals(cnt, predCntr.get());
+
+        for (int i = 1; i < cnt; i++)
+            assertDistributedMetastoragesAreEqual(grid(0), grid(i));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testListenersOnRemove() throws Exception {
+        int cnt = 4;
+
+        startGridsMultiThreaded(cnt);
+
+        grid(0).cluster().active(true);
+
+        metastorage(0).write("key", "value");
+
+        AtomicInteger predCntr = new AtomicInteger();
+
+        for (int i = 0; i < cnt; i++) {
+            DistributedMetaStorage metastorage = metastorage(i);
+
+            metastorage.listen(key -> key.startsWith("k"), (key, oldVal, newVal) -> {
+                assertEquals("value", oldVal);
+
+                assertNull(newVal);
+
+                predCntr.incrementAndGet();
+            });
+        }
+
+        metastorage(0).remove("key");
+
+        assertEquals(cnt, predCntr.get());
+
+        for (int i = 1; i < cnt; i++)
+            assertDistributedMetastoragesAreEqual(grid(0), grid(i));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testCas() throws Exception {
+        startGrids(2);
+
+        grid(0).cluster().active(true);
+
+        assertFalse(metastorage(0).compareAndSet("key", "expVal", "newVal"));
+
+        assertNull(metastorage(0).read("key"));
+
+        assertFalse(metastorage(0).compareAndRemove("key", "expVal"));
+
+        assertTrue(metastorage(0).compareAndSet("key", null, "val1"));
+
+        assertEquals("val1", metastorage(0).read("key"));
+
+        assertFalse(metastorage(0).compareAndSet("key", null, "val2"));
+
+        assertEquals("val1", metastorage(0).read("key"));
+
+        assertTrue(metastorage(0).compareAndSet("key", "val1", "val3"));
+
+        assertEquals("val3", metastorage(0).read("key"));
+
+        assertFalse(metastorage(0).compareAndRemove("key", "val1"));
+
+        assertEquals("val3", metastorage(0).read("key"));
+
+        assertTrue(metastorage(0).compareAndRemove("key", "val3"));
+
+        assertNull(metastorage(0).read("key"));
+
+        assertDistributedMetastoragesAreEqual(grid(0), grid(1));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testJoinCleanNode() throws Exception {
+        IgniteEx ignite = startGrid(0);
+
+        ignite.cluster().active(true);
+
+        ignite.context().distributedMetastorage().write("key", "value");
+
+        IgniteEx newNode = startGrid(1);
+
+        assertEquals("value", newNode.context().distributedMetastorage().read("key"));
+
+        assertDistributedMetastoragesAreEqual(ignite, newNode);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testJoinCleanNodeFullData() throws Exception {
+        System.setProperty(IGNITE_GLOBAL_METASTORAGE_HISTORY_MAX_BYTES, "0");
+
+        IgniteEx ignite = startGrid(0);
+
+        ignite.cluster().active(true);
+
+        ignite.context().distributedMetastorage().write("key1", "value1");
+
+        ignite.context().distributedMetastorage().write("key2", "value2");
+
+        startGrid(1);
+
+        assertEquals("value1", metastorage(1).read("key1"));
+
+        assertEquals("value2", metastorage(1).read("key2"));
+
+        assertDistributedMetastoragesAreEqual(ignite, grid(1));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testDeactivateActivate() throws Exception {
+        System.setProperty(IGNITE_GLOBAL_METASTORAGE_HISTORY_MAX_BYTES, "0");
+
+        startGrid(0);
+
+        grid(0).cluster().active(true);
+
+        metastorage(0).write("key1", "value1");
+
+        metastorage(0).write("key2", "value2");
+
+        grid(0).cluster().active(false);
+
+        startGrid(1);
+
+        CountDownLatch grid1MetaStorageStartLatch = new CountDownLatch(1);
+
+        grid(1).context().internalSubscriptionProcessor().registerDistributedMetastorageListener(
+            new DistributedMetastorageLifecycleListener() {
+                @Override public void onReadyForWrite(DistributedMetaStorage metastorage) {
+                    grid1MetaStorageStartLatch.countDown();
+                }
+            }
+        );
+
+        grid(0).cluster().active(true);
+
+        assertEquals("value1", metastorage(0).read("key1"));
+
+        assertEquals("value2", metastorage(0).read("key2"));
+
+        grid1MetaStorageStartLatch.await(1, TimeUnit.SECONDS);
+
+        assertDistributedMetastoragesAreEqual(grid(0), grid(1));
+    }
+
+    /**
+     * @return {@link DistributedMetaStorage} instance for i'th node.
+     */
+    protected DistributedMetaStorage metastorage(int i) {
+        return grid(i).context().distributedMetastorage();
+    }
+
+    /**
+     * Assert that two nodes have the same internal state in {@link DistributedMetaStorage}.
+     */
+    protected void assertDistributedMetastoragesAreEqual(IgniteEx ignite1, IgniteEx ignite2) throws Exception {
+        DistributedMetaStorage distributedMetastorage1 = ignite1.context().distributedMetastorage();
+
+        DistributedMetaStorage distributedMetastorage2 = ignite2.context().distributedMetastorage();
+
+        Object ver1 = U.field(distributedMetastorage1, "ver");
+
+        Object ver2 = U.field(distributedMetastorage2, "ver");
+
+        assertEquals(ver1, ver2);
+
+        Object histCache1 = U.field(distributedMetastorage1, "histCache");
+
+        Object histCache2 = U.field(distributedMetastorage2, "histCache");
+
+        assertEquals(histCache1, histCache2);
+
+        Method fullDataMtd = U.findNonPublicMethod(DistributedMetaStorageImpl.class, "localFullData");
+
+        Object[] fullData1 = (Object[])fullDataMtd.invoke(distributedMetastorage1);
+
+        Object[] fullData2 = (Object[])fullDataMtd.invoke(distributedMetastorage2);
+
+        assertEqualsCollections(Arrays.asList(fullData1), Arrays.asList(fullData2));
+
+        // Also check that arrays are sorted.
+        Arrays.sort(fullData1, Comparator.comparing(o -> U.field(o, "key")));
+
+        assertEqualsCollections(Arrays.asList(fullData1), Arrays.asList(fullData2));
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java
index 7fba2d1..8a31d96 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java
@@ -53,6 +53,8 @@ import org.apache.ignite.internal.processors.database.IgniteDbMultiNodePutGetTes
 import org.apache.ignite.internal.processors.database.IgniteDbPutGetWithCacheStoreTest;
 import org.apache.ignite.internal.processors.database.IgniteDbSingleNodePutGetTest;
 import org.apache.ignite.internal.processors.database.IgniteDbSingleNodeTinyPutGetTest;
+import org.apache.ignite.internal.processors.metastorage.DistributedMetaStoragePersistentTest;
+import org.apache.ignite.internal.processors.metastorage.DistributedMetaStorageTest;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.junit.runner.RunWith;
 import org.junit.runners.Suite;
@@ -170,6 +172,8 @@ public class IgnitePdsTestSuite {
 
         //MetaStorage
         GridTestUtils.addTestIfNeeded(suite, IgniteMetaStorageBasicTest.class, ignoredTests);
+        GridTestUtils.addTestIfNeeded(suite, DistributedMetaStorageTest.class, ignoredTests);
+        GridTestUtils.addTestIfNeeded(suite, DistributedMetaStoragePersistentTest.class, ignoredTests);
     }
 
     /** */
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java
index 973946d..45ee140 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java
@@ -1018,7 +1018,7 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest {
             sharedCtx0.database().checkpointReadLock();
 
             try {
-                storage0.putData(String.valueOf(i), new byte[] {(byte)(i % 256), 2, 3});
+                storage0.writeRaw(String.valueOf(i), new byte[] {(byte)(i % 256), 2, 3});
             }
             finally {
                 sharedCtx0.database().checkpointReadUnlock();
@@ -1032,7 +1032,7 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest {
             sharedCtx1.database().checkpointReadLock();
 
             try {
-                storage1.putData(String.valueOf(i), b1);
+                storage1.writeRaw(String.valueOf(i), b1);
             }
             finally {
                 sharedCtx1.database().checkpointReadUnlock();
@@ -1040,13 +1040,13 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest {
         }
 
         for (int i = 0; i < cnt; i++) {
-            byte[] d1 = storage0.getData(String.valueOf(i));
+            byte[] d1 = storage0.readRaw(String.valueOf(i));
             assertEquals(3, d1.length);
             assertEquals((byte)(i % 256), d1[0]);
             assertEquals(2, d1[1]);
             assertEquals(3, d1[2]);
 
-            byte[] d2 = storage1.getData(String.valueOf(i));
+            byte[] d2 = storage1.readRaw(String.valueOf(i));
             assertEquals(i + 3, d2.length);
             assertEquals(1, d2[0]);
             assertEquals(2, d2[1]);
@@ -1079,7 +1079,7 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest {
             sharedCtx.database().checkpointReadLock();
 
             try {
-                storage.putData(String.valueOf(i), b1);
+                storage.writeRaw(String.valueOf(i), b1);
             }
             finally {
                 sharedCtx.database().checkpointReadUnlock();
@@ -1087,7 +1087,7 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest {
         }
 
         for (int i = 0; i < cnt; i++) {
-            byte[] d2 = storage.getData(String.valueOf(i));
+            byte[] d2 = storage.readRaw(String.valueOf(i));
             assertEquals(arraySize, d2.length);
 
             for (int k = 0; k < arraySize; k++) {
@@ -1117,7 +1117,7 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest {
             sharedCtx0.database().checkpointReadLock();
 
             try {
-                storage.putData(String.valueOf(i), new byte[] {1, 2, 3});
+                storage.writeRaw(String.valueOf(i), new byte[] {1, 2, 3});
             }
             finally {
                 sharedCtx0.database().checkpointReadUnlock();
@@ -1136,7 +1136,7 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest {
         }
 
         for (int i = 10; i < cnt; i++) {
-            byte[] d1 = storage.getData(String.valueOf(i));
+            byte[] d1 = storage.readRaw(String.valueOf(i));
             assertEquals(3, d1.length);
             assertEquals(1, d1[0]);
             assertEquals(2, d1[1]);
@@ -1166,7 +1166,7 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest {
                 sharedCtx0.database().checkpointReadLock();
 
                 try {
-                    storage.putData(String.valueOf(i), new byte[] {1, 2, 3});
+                    storage.writeRaw(String.valueOf(i), new byte[] {1, 2, 3});
                 }
                 finally {
                     sharedCtx0.database().checkpointReadUnlock();
@@ -1177,7 +1177,7 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest {
                 sharedCtx0.database().checkpointReadLock();
 
                 try {
-                    storage.putData(String.valueOf(i), new byte[] {2, 2, 3, 4});
+                    storage.writeRaw(String.valueOf(i), new byte[] {2, 2, 3, 4});
                 }
                 finally {
                     sharedCtx0.database().checkpointReadUnlock();
@@ -1185,7 +1185,7 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest {
             }
 
             for (int i = 0; i < cnt; i++) {
-                byte[] d1 = storage.getData(String.valueOf(i));
+                byte[] d1 = storage.readRaw(String.valueOf(i));
                 assertEquals(4, d1.length);
                 assertEquals(2, d1[0]);
                 assertEquals(2, d1[1]);
@@ -1218,7 +1218,7 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest {
             sharedCtx0.database().checkpointReadLock();
 
             try {
-                storage.putData(String.valueOf(i), new byte[] {1, 2, 3});
+                storage.writeRaw(String.valueOf(i), new byte[] {1, 2, 3});
             }
             finally {
                 sharedCtx0.database().checkpointReadUnlock();
@@ -1226,7 +1226,7 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest {
         }
 
         for (int i = 0; i < cnt; i++) {
-            byte[] value = storage.getData(String.valueOf(i));
+            byte[] value = storage.readRaw(String.valueOf(i));
             assert value != null;
             assert value.length == 3;
         }
@@ -1244,7 +1244,7 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest {
         assert storage != null;
 
         for (int i = 0; i < cnt; i++) {
-            byte[] value = storage.getData(String.valueOf(i));
+            byte[] value = storage.readRaw(String.valueOf(i));
             assert value != null;
         }
     }