You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by dg...@apache.org on 2019/01/16 21:47:32 UTC
[ignite] branch master updated: IGNITE-10640 Create cluster-wide
MetaStorage analogue - Fixes #5637.
This is an automated email from the ASF dual-hosted git repository.
dgovorukhin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new a2e1230 IGNITE-10640 Create cluster-wide MetaStorage analogue - Fixes #5637.
a2e1230 is described below
commit a2e1230aeec734aa0b09cc5688e266c645666ada
Author: ibessonov <be...@gmail.com>
AuthorDate: Tue Dec 25 14:01:50 2018 +0300
IGNITE-10640 Create cluster-wide MetaStorage analogue - Fixes #5637.
Signed-off-by: Dmitriy Govorukhin <dm...@gmail.com>
---
.../org/apache/ignite/IgniteSystemProperties.java | 6 +
.../org/apache/ignite/internal/GridComponent.java | 5 +-
.../apache/ignite/internal/GridKernalContext.java | 18 +-
.../ignite/internal/GridKernalContextImpl.java | 12 +
.../org/apache/ignite/internal/IgniteKernal.java | 2 +
.../managers/communication/GridIoManager.java | 8 +-
.../managers/encryption/GridEncryptionManager.java | 18 +-
.../IgniteAuthenticationProcessor.java | 14 +-
.../processors/cache/GridCacheIoManager.java | 23 +-
.../preloader/GridDhtPartitionsExchangeFuture.java | 36 +-
.../GridCacheDatabaseSharedManager.java | 36 +-
.../cache/persistence/metastorage/MetaStorage.java | 132 ++-
.../metastorage/ReadOnlyMetastorage.java | 21 +-
.../metastorage/ReadWriteMetastorage.java | 3 +
.../wal/reader/StandaloneGridKernalContext.java | 6 +
.../cluster/GridClusterStateProcessor.java | 12 +-
.../metastorage/DistributedMetaStorage.java | 74 ++
.../DistributedMetaStorageListener.java | 45 +
.../DistributedMetastorageLifecycleListener.java | 43 +
.../ReadableDistributedMetaStorage.java | 62 ++
.../persistence/DistributedMetaStorageBridge.java | 91 ++
.../DistributedMetaStorageCasAckMessage.java} | 34 +-
.../DistributedMetaStorageCasMessage.java | 67 ++
.../DistributedMetaStorageClusterNodeData.java | 52 +
.../DistributedMetaStorageHistoryItem.java | 76 ++
.../persistence/DistributedMetaStorageImpl.java | 1169 ++++++++++++++++++++
.../DistributedMetaStorageJoiningNodeData.java} | 33 +-
.../DistributedMetaStorageUpdateAckMessage.java | 97 ++
.../DistributedMetaStorageUpdateMessage.java | 119 ++
.../persistence/DistributedMetaStorageUtil.java | 109 ++
.../persistence/DistributedMetaStorageVersion.java | 159 +++
.../EmptyDistributedMetaStorageBridge.java | 64 ++
...InMemoryCachedDistributedMetaStorageBridge.java | 109 ++
.../NotAvailableDistributedMetaStorageBridge.java | 62 ++
.../ReadOnlyDistributedMetaStorageBridge.java | 212 ++++
.../persistence/StartupExtras.java} | 19 +-
.../WritableDistributedMetaStorageBridge.java | 163 +++
.../GridInternalSubscriptionProcessor.java | 25 +-
.../ignite/marshaller/jdk/JdkMarshaller.java | 3 +
.../encryption/EncryptedCacheDestroyTest.java | 4 +-
.../metastorage/IgniteMetaStorageBasicTest.java | 75 +-
.../DistributedMetaStoragePersistentTest.java | 698 ++++++++++++
.../metastorage/DistributedMetaStorageTest.java | 366 ++++++
.../ignite/testsuites/IgnitePdsTestSuite.java | 4 +
.../persistence/db/wal/IgniteWalRecoveryTest.java | 28 +-
45 files changed, 4212 insertions(+), 202 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index 3b3a00b..b20d8ad 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -29,6 +29,7 @@ import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.internal.client.GridClient;
import org.apache.ignite.internal.marshaller.optimized.OptimizedMarshaller;
+import org.apache.ignite.internal.processors.metastorage.DistributedMetaStorage;
import org.apache.ignite.internal.processors.rest.GridRestCommand;
import org.apache.ignite.internal.util.GridLogThrottle;
import org.apache.ignite.stream.StreamTransformer;
@@ -1089,6 +1090,11 @@ public final class IgniteSystemProperties {
"IGNITE_SQL_MAX_EXTRACTED_PARTS_FROM_BETWEEN";
/**
+ * Maximum amount of bytes that can be stored in history of {@link DistributedMetaStorage} updates.
+ */
+ public static final String IGNITE_GLOBAL_METASTORAGE_HISTORY_MAX_BYTES = "IGNITE_GLOBAL_METASTORAGE_HISTORY_MAX_BYTES";
+
+ /**
* Enforces singleton.
*/
private IgniteSystemProperties() {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java b/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
index 1f9e02e..2e86198 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
@@ -73,7 +73,10 @@ public interface GridComponent {
ENCRYPTION_MGR,
/** Service processor. */
- SERVICE_PROC
+ SERVICE_PROC,
+
+ /** Distributed MetaStorage processor. */
+ META_STORAGE;
}
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
index 691fe37..9651290 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
@@ -33,19 +33,16 @@ import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
import org.apache.ignite.internal.managers.failover.GridFailoverManager;
import org.apache.ignite.internal.managers.indexing.GridIndexingManager;
import org.apache.ignite.internal.managers.loadbalancer.GridLoadBalancerManager;
-import org.apache.ignite.internal.processors.cache.mvcc.MvccProcessor;
-import org.apache.ignite.internal.stat.IoStatisticsManager;
-import org.apache.ignite.internal.processors.compress.CompressionProcessor;
-import org.apache.ignite.internal.processors.service.ServiceProcessorAdapter;
-import org.apache.ignite.internal.worker.WorkersRegistry;
import org.apache.ignite.internal.processors.affinity.GridAffinityProcessor;
import org.apache.ignite.internal.processors.authentication.IgniteAuthenticationProcessor;
import org.apache.ignite.internal.processors.cache.GridCacheProcessor;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccProcessor;
import org.apache.ignite.internal.processors.cache.persistence.filename.PdsFoldersResolver;
import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;
import org.apache.ignite.internal.processors.closure.GridClosureProcessor;
import org.apache.ignite.internal.processors.cluster.ClusterProcessor;
import org.apache.ignite.internal.processors.cluster.GridClusterStateProcessor;
+import org.apache.ignite.internal.processors.compress.CompressionProcessor;
import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
import org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor;
import org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor;
@@ -57,6 +54,7 @@ import org.apache.ignite.internal.processors.igfs.IgfsProcessorAdapter;
import org.apache.ignite.internal.processors.job.GridJobProcessor;
import org.apache.ignite.internal.processors.jobmetrics.GridJobMetricsProcessor;
import org.apache.ignite.internal.processors.marshaller.GridMarshallerMappingProcessor;
+import org.apache.ignite.internal.processors.metastorage.DistributedMetaStorage;
import org.apache.ignite.internal.processors.odbc.ClientListenerProcessor;
import org.apache.ignite.internal.processors.platform.PlatformProcessor;
import org.apache.ignite.internal.processors.plugin.IgnitePluginProcessor;
@@ -68,14 +66,17 @@ import org.apache.ignite.internal.processors.rest.GridRestProcessor;
import org.apache.ignite.internal.processors.schedule.IgniteScheduleProcessorAdapter;
import org.apache.ignite.internal.processors.security.GridSecurityProcessor;
import org.apache.ignite.internal.processors.segmentation.GridSegmentationProcessor;
+import org.apache.ignite.internal.processors.service.ServiceProcessorAdapter;
import org.apache.ignite.internal.processors.session.GridTaskSessionProcessor;
import org.apache.ignite.internal.processors.subscription.GridInternalSubscriptionProcessor;
import org.apache.ignite.internal.processors.task.GridTaskProcessor;
import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
+import org.apache.ignite.internal.stat.IoStatisticsManager;
import org.apache.ignite.internal.suggestions.GridPerformanceSuggestions;
import org.apache.ignite.internal.util.IgniteExceptionRegistry;
import org.apache.ignite.internal.util.StripedExecutor;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.worker.WorkersRegistry;
import org.apache.ignite.plugin.PluginNotFoundException;
import org.apache.ignite.plugin.PluginProvider;
import org.apache.ignite.thread.IgniteStripedThreadPoolExecutor;
@@ -206,6 +207,13 @@ public interface GridKernalContext extends Iterable<GridComponent> {
public GridClusterStateProcessor state();
/**
+ * Gets global metastorage.
+ *
+ * @return Global metastorage.
+ */
+ public DistributedMetaStorage distributedMetastorage();
+
+ /**
* Gets task session processor.
*
* @return Session processor.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
index 1219d00..cc18d49 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
@@ -72,6 +72,7 @@ import org.apache.ignite.internal.processors.igfs.IgfsProcessorAdapter;
import org.apache.ignite.internal.processors.job.GridJobProcessor;
import org.apache.ignite.internal.processors.jobmetrics.GridJobMetricsProcessor;
import org.apache.ignite.internal.processors.marshaller.GridMarshallerMappingProcessor;
+import org.apache.ignite.internal.processors.metastorage.DistributedMetaStorage;
import org.apache.ignite.internal.processors.nodevalidation.DiscoveryNodeValidationProcessor;
import org.apache.ignite.internal.processors.odbc.ClientListenerProcessor;
import org.apache.ignite.internal.processors.platform.PlatformProcessor;
@@ -218,6 +219,10 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
@GridToStringInclude
private GridClusterStateProcessor stateProc;
+ /** Global metastorage. */
+ @GridToStringInclude
+ private DistributedMetaStorage distributedMetastorage;
+
/** */
@GridToStringInclude
private GridTaskSessionProcessor sesProc;
@@ -602,6 +607,8 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
cacheProc = (GridCacheProcessor)comp;
else if (comp instanceof GridClusterStateProcessor)
stateProc = (GridClusterStateProcessor)comp;
+ else if (comp instanceof DistributedMetaStorage)
+ distributedMetastorage = (DistributedMetaStorage)comp;
else if (comp instanceof GridTaskSessionProcessor)
sesProc = (GridTaskSessionProcessor)comp;
else if (comp instanceof GridPortProcessor)
@@ -752,6 +759,11 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
}
/** {@inheritDoc} */
+ @Override public DistributedMetaStorage distributedMetastorage() {
+ return distributedMetastorage;
+ }
+
+ /** {@inheritDoc} */
@Override public GridTaskSessionProcessor session() {
return sesProc;
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 9633f89..30af4f4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -149,6 +149,7 @@ import org.apache.ignite.internal.processors.hadoop.HadoopProcessorAdapter;
import org.apache.ignite.internal.processors.job.GridJobProcessor;
import org.apache.ignite.internal.processors.jobmetrics.GridJobMetricsProcessor;
import org.apache.ignite.internal.processors.marshaller.GridMarshallerMappingProcessor;
+import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageImpl;
import org.apache.ignite.internal.processors.nodevalidation.DiscoveryNodeValidationProcessor;
import org.apache.ignite.internal.processors.nodevalidation.OsDiscoveryNodeValidationProcessor;
import org.apache.ignite.internal.processors.odbc.ClientListenerProcessor;
@@ -1035,6 +1036,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
startProcessor(new DataStructuresProcessor(ctx));
startProcessor(createComponent(PlatformProcessor.class, ctx));
startProcessor(new GridMarshallerMappingProcessor(ctx));
+ startProcessor(new DistributedMetaStorageImpl(ctx));
// Start plugins.
for (PluginProvider provider : ctx.plugins().allProviders()) {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 09c7b96..c1e1684 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -205,11 +205,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
private final AtomicLong ioTestId = new AtomicLong();
/** No-op runnable. */
- private static final IgniteRunnable NOOP = new IgniteRunnable() {
- @Override public void run() {
- // No-op.
- }
- };
+ private static final IgniteRunnable NOOP = () -> {};
/**
* @param ctx Grid kernal context.
@@ -301,7 +297,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
throws IgniteCheckedException {
return new DirectMessageReader(msgFactory,
- rmtNodeId != null ? U.directProtocolVersion(ctx, rmtNodeId) : GridIoManager.DIRECT_PROTO_VER);
+ rmtNodeId != null ? U.directProtocolVersion(ctx, rmtNodeId) : DIRECT_PROTO_VER);
}
};
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/GridEncryptionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/GridEncryptionManager.java
index 5667676..dd2db02 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/GridEncryptionManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/GridEncryptionManager.java
@@ -33,7 +33,6 @@ import java.util.concurrent.ConcurrentMap;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.spi.encryption.EncryptionSpi;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.managers.GridManagerAdapter;
@@ -53,7 +52,6 @@ import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteFutureCancelledException;
-import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.spi.IgniteNodeValidationResult;
@@ -62,6 +60,7 @@ import org.apache.ignite.spi.discovery.DiscoveryDataBag.GridDiscoveryData;
import org.apache.ignite.spi.discovery.DiscoveryDataBag.JoiningNodeDiscoveryData;
import org.apache.ignite.spi.discovery.DiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.encryption.EncryptionSpi;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
@@ -129,10 +128,6 @@ public class GridEncryptionManager extends GridManagerAdapter<EncryptionSpi> imp
/** Prefix for a encryption group key in meta store. */
public static final String ENCRYPTION_KEY_PREFIX = "grp-encryption-key-";
- /** Encryption key predicate for meta store. */
- private static final IgnitePredicate<String> ENCRYPTION_KEY_PREFIX_PRED =
- (IgnitePredicate<String>)key -> key.startsWith(ENCRYPTION_KEY_PREFIX);
-
/** Group encryption keys. */
private final ConcurrentHashMap<Integer, Serializable> grpEncKeys = new ConcurrentHashMap<>();
@@ -551,18 +546,13 @@ public class GridEncryptionManager extends GridManagerAdapter<EncryptionSpi> imp
/** {@inheritDoc} */
@Override public void onReadyForRead(ReadOnlyMetastorage metastorage) {
try {
- Map<String, ? extends Serializable> encKeys = metastorage.readForPredicate(ENCRYPTION_KEY_PREFIX_PRED);
-
- if (encKeys.isEmpty())
- return;
-
- for (String key : encKeys.keySet()) {
+ metastorage.iterate(ENCRYPTION_KEY_PREFIX, (key, val) -> {
Integer grpId = Integer.valueOf(key.replace(ENCRYPTION_KEY_PREFIX, ""));
- byte[] encGrpKey = (byte[])encKeys.get(key);
+ byte[] encGrpKey = (byte[])val;
grpEncKeys.putIfAbsent(grpId, getSpi().decryptKey(encGrpKey));
- }
+ }, true);
if (!grpEncKeys.isEmpty()) {
U.quietAndInfo(log, "Encryption keys loaded from metastore. [grps=" +
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/authentication/IgniteAuthenticationProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/authentication/IgniteAuthenticationProcessor.java
index 98f4201..5a696f3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/authentication/IgniteAuthenticationProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/authentication/IgniteAuthenticationProcessor.java
@@ -61,7 +61,6 @@ import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteFutureCancelledException;
-import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.spi.IgniteNodeValidationResult;
import org.apache.ignite.spi.discovery.DiscoveryDataBag;
@@ -387,11 +386,11 @@ public class IgniteAuthenticationProcessor extends GridProcessorAdapter implemen
if (!ctx.clientNode()) {
users = new ConcurrentHashMap<>();
- Map<String, User> readUsers = (Map<String, User>)metastorage.readForPredicate(
- (IgnitePredicate<String>)key -> key != null && key.startsWith(STORE_USER_PREFIX));
+ metastorage.iterate(STORE_USER_PREFIX, (key, val) -> {
+ User u = (User)val;
- for (User u : readUsers.values())
users.put(u.name(), u);
+ }, true);
}
else
users = null;
@@ -1318,10 +1317,11 @@ public class IgniteAuthenticationProcessor extends GridProcessorAdapter implemen
sharedCtx.database().checkpointReadLock();
try {
- Map<String, User> existUsrs = (Map<String, User>)metastorage.readForPredicate(
- (IgnitePredicate<String>)key -> key != null && key.startsWith(STORE_USER_PREFIX));
+ Set<String> existUsrsKeys = new HashSet<>();
- for (String key : existUsrs.keySet())
+ metastorage.iterate(STORE_USER_PREFIX, (key, val) -> existUsrsKeys.add(key), false);
+
+ for (String key : existUsrsKeys)
metastorage.remove(key);
for (User u : newUsrs)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 710931f..06c6267 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -1374,10 +1374,11 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
* @param type Type of message.
* @param c Handler.
*/
- public void addCacheHandler(
+ public <Msg extends GridCacheMessage> void addCacheHandler(
int hndId,
- Class<? extends GridCacheMessage> type,
- IgniteBiInClosure<UUID, ? extends GridCacheMessage> c) {
+ Class<Msg> type,
+ IgniteBiInClosure<UUID, ? super Msg> c
+ ) {
assert !type.isAssignableFrom(GridCacheGroupIdMessage.class) : type;
addHandler(hndId, type, c, cacheHandlers);
@@ -1388,10 +1389,11 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
* @param type Type of message.
* @param c Handler.
*/
- public void addCacheGroupHandler(
+ public <Msg extends GridCacheGroupIdMessage> void addCacheGroupHandler(
int hndId,
- Class<? extends GridCacheGroupIdMessage> type,
- IgniteBiInClosure<UUID, ? extends GridCacheMessage> c) {
+ Class<Msg> type,
+ IgniteBiInClosure<UUID, ? super Msg> c
+ ) {
assert !type.isAssignableFrom(GridCacheIdMessage.class) : type;
addHandler(hndId, type, c, grpHandlers);
@@ -1403,11 +1405,12 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
* @param c Handler.
* @param msgHandlers Message handlers.
*/
- private void addHandler(
+ private <Msg extends GridCacheMessage> void addHandler(
int hndId,
- Class<? extends GridCacheMessage> type,
- IgniteBiInClosure<UUID, ? extends GridCacheMessage> c,
- MessageHandlers msgHandlers) {
+ Class<Msg> type,
+ IgniteBiInClosure<UUID, ? super Msg> c,
+ MessageHandlers msgHandlers
+ ) {
int msgIdx = messageIndex(type);
if (msgIdx != -1) {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index a536011..8350fe6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -53,6 +53,7 @@ import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.failure.FailureContext;
import org.apache.ignite.failure.FailureType;
+import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
import org.apache.ignite.internal.IgniteDiagnosticAware;
import org.apache.ignite.internal.IgniteDiagnosticPrepareContext;
@@ -101,6 +102,7 @@ import org.apache.ignite.internal.processors.cluster.BaselineTopology;
import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage;
import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState;
+import org.apache.ignite.internal.processors.cluster.IgniteChangeGlobalStateSupport;
import org.apache.ignite.internal.processors.service.GridServiceProcessor;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.TimeBag;
@@ -1063,7 +1065,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
assert req != null : exchActions;
- DiscoveryDataClusterState state = cctx.kernalContext().state().clusterState();
+ GridKernalContext kctx = cctx.kernalContext();
+
+ DiscoveryDataClusterState state = kctx.state().clusterState();
if (state.transitionError() != null)
exchangeLocE = state.transitionError();
@@ -1072,7 +1076,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
if (req.activate()) {
if (log.isInfoEnabled()) {
log.info("Start activation process [nodeId=" + cctx.localNodeId() +
- ", client=" + cctx.kernalContext().clientNode() +
+ ", client=" + kctx.clientNode() +
", topVer=" + initialVersion() + "]");
}
@@ -1093,7 +1097,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
try {
registerCachesFuture = cctx.affinity().onCacheChangeRequest(this, crd, exchActions);
- if (!cctx.kernalContext().clientNode())
+ if (!kctx.clientNode())
cctx.cache().shutdownNotFinishedRecoveryCaches();
}
finally {
@@ -1102,13 +1106,13 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
if (log.isInfoEnabled()) {
log.info("Successfully activated caches [nodeId=" + cctx.localNodeId() +
- ", client=" + cctx.kernalContext().clientNode() +
+ ", client=" + kctx.clientNode() +
", topVer=" + initialVersion() + "]");
}
}
catch (Exception e) {
U.error(log, "Failed to activate node components [nodeId=" + cctx.localNodeId() +
- ", client=" + cctx.kernalContext().clientNode() +
+ ", client=" + kctx.clientNode() +
", topVer=" + initialVersion() + "]", e);
exchangeLocE = e;
@@ -1130,34 +1134,36 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
else {
if (log.isInfoEnabled()) {
log.info("Start deactivation process [nodeId=" + cctx.localNodeId() +
- ", client=" + cctx.kernalContext().clientNode() +
+ ", client=" + kctx.clientNode() +
", topVer=" + initialVersion() + "]");
}
cctx.exchange().exchangerBlockingSectionBegin();
try {
- cctx.kernalContext().dataStructures().onDeActivate(cctx.kernalContext());
+ ((IgniteChangeGlobalStateSupport)kctx.distributedMetastorage()).onDeActivate(kctx);
+
+ kctx.dataStructures().onDeActivate(kctx);
if (cctx.kernalContext().service() instanceof GridServiceProcessor)
- ((GridServiceProcessor)cctx.kernalContext().service()).onDeActivate(cctx.kernalContext());
+ ((GridServiceProcessor)kctx.service()).onDeActivate(cctx.kernalContext());
assert registerCachesFuture == null : "No caches registration should be scheduled before new caches have started.";
registerCachesFuture = cctx.affinity().onCacheChangeRequest(this, crd, exchActions);
- cctx.kernalContext().encryption().onDeActivate(cctx.kernalContext());
+ kctx.encryption().onDeActivate(kctx);
if (log.isInfoEnabled()) {
log.info("Successfully deactivated data structures, services and caches [" +
"nodeId=" + cctx.localNodeId() +
- ", client=" + cctx.kernalContext().clientNode() +
+ ", client=" + kctx.clientNode() +
", topVer=" + initialVersion() + "]");
}
}
catch (Exception e) {
U.error(log, "Failed to deactivate node components [nodeId=" + cctx.localNodeId() +
- ", client=" + cctx.kernalContext().clientNode() +
+ ", client=" + kctx.clientNode() +
", topVer=" + initialVersion() + "]", e);
exchangeLocE = e;
@@ -1181,13 +1187,13 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
cctx.affinity().onBaselineTopologyChanged(this, crd);
}
- if (CU.isPersistenceEnabled(cctx.kernalContext().config()) && !cctx.kernalContext().clientNode())
- cctx.kernalContext().state().onBaselineTopologyChanged(req.baselineTopology(),
+ if (CU.isPersistenceEnabled(kctx.config()) && !kctx.clientNode())
+ kctx.state().onBaselineTopologyChanged(req.baselineTopology(),
req.prevBaselineTopologyHistoryItem());
}
catch (Exception e) {
U.error(log, "Failed to change baseline topology [nodeId=" + cctx.localNodeId() +
- ", client=" + cctx.kernalContext().clientNode() +
+ ", client=" + kctx.clientNode() +
", topVer=" + initialVersion() + "]", e);
exchangeLocE = e;
@@ -1197,7 +1203,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
}
}
- return cctx.kernalContext().clientNode() ? ExchangeType.CLIENT : ExchangeType.ALL;
+ return kctx.clientNode() ? ExchangeType.CLIENT : ExchangeType.ALL;
}
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index c85099f..d7ad8c1 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -241,13 +241,6 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
/** Prefix for meta store records which means that checkpoint entry for some group is not applicable for WAL rebalance. */
private static final String CHECKPOINT_INAPPLICABLE_FOR_REBALANCE = "cp-wal-rebalance-inapplicable-";
- /** WAL marker predicate for meta store. */
- private static final IgnitePredicate<String> WAL_KEY_PREFIX_PRED = new IgnitePredicate<String>() {
- @Override public boolean apply(String key) {
- return key.startsWith(WAL_KEY_PREFIX);
- }
- };
-
/** Timeout between partition file destroy and checkpoint to handle it. */
private static final long PARTITION_DESTROY_CHECKPOINT_TIMEOUT = 30 * 1000; // 30 Seconds.
@@ -2214,9 +2207,9 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
if (pageMem == null)
break;
- // Here we do not require tag check because we may be applying memory changes after
- // several repetitive restarts and the same pages may have changed several times.
- long page = pageMem.acquirePage(grpId, pageId, IoStatisticsHolderNoOp.INSTANCE, true);
+ // Here we do not require tag check because we may be applying memory changes after
+ // several repetitive restarts and the same pages may have changed several times.
+ long page = pageMem.acquirePage(grpId, pageId, IoStatisticsHolderNoOp.INSTANCE, true);
try {
long pageAddr = pageMem.writeLock(grpId, pageId, page, true);
@@ -4616,23 +4609,16 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
assert metaStorage != null;
try {
- Set<String> keys = metaStorage.readForPredicate(WAL_KEY_PREFIX_PRED).keySet();
-
- if (keys.isEmpty())
- return;
-
- for (String key : keys) {
+ metaStorage.iterate(WAL_KEY_PREFIX, (key, val) -> {
T2<Integer, Boolean> t2 = walKeyToGroupIdAndLocalFlag(key);
- if (t2 == null)
- continue;
-
- if (t2.get2())
- initiallyLocalWalDisabledGrps.add(t2.get1());
- else
- initiallyGlobalWalDisabledGrps.add(t2.get1());
- }
-
+ if (t2 != null) {
+ if (t2.get2())
+ initiallyLocalWalDisabledGrps.add(t2.get1());
+ else
+ initiallyGlobalWalDisabledGrps.add(t2.get1());
+ }
+ }, false);
}
catch (IgniteCheckedException e) {
throw new IgniteException("Failed to read cache groups WAL state.", e);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java
index 7ff8257..0906605 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java
@@ -27,13 +27,14 @@ import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiConsumer;
import java.util.stream.Stream;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
@@ -72,7 +73,6 @@ import org.apache.ignite.internal.util.lang.GridCursor;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
-import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.marshaller.jdk.JdkMarshaller;
import org.jetbrains.annotations.NotNull;
@@ -85,7 +85,7 @@ import static org.apache.ignite.internal.pagemem.PageIdUtils.pageId;
/**
* General purpose key-value local-only storage.
*/
-public class MetaStorage implements DbCheckpointListener, ReadOnlyMetastorage, ReadWriteMetastorage {
+public class MetaStorage implements DbCheckpointListener, ReadWriteMetastorage {
/** */
public static final String METASTORAGE_CACHE_NAME = "MetaStorage";
@@ -139,10 +139,10 @@ public class MetaStorage implements DbCheckpointListener, ReadOnlyMetastorage, R
private FreeListImpl freeList;
/** */
- private Map<String, byte[]> lastUpdates;
+ private SortedMap<String, byte[]> lastUpdates;
/** */
- private final Marshaller marshaller = new JdkMarshaller();
+ private final Marshaller marshaller = JdkMarshaller.DEFAULT;
/** */
private final FailureProcessor failureProcessor;
@@ -155,7 +155,7 @@ public class MetaStorage implements DbCheckpointListener, ReadOnlyMetastorage, R
/** */
public MetaStorage(
- GridCacheSharedContext cctx,
+ GridCacheSharedContext<?, ?> cctx,
DataRegion dataRegion,
DataRegionMetricsImpl regionMetrics,
boolean readOnly
@@ -224,7 +224,7 @@ public class MetaStorage implements DbCheckpointListener, ReadOnlyMetastorage, R
for (Iterator<IgniteBiTuple<String, byte[]>> it = tmpStorage.stream().iterator(); it.hasNext(); ) {
IgniteBiTuple<String, byte[]> t = it.next();
- putData(t.get1(), t.get2());
+ writeRaw(t.get1(), t.get2());
}
try {
@@ -261,45 +261,50 @@ public class MetaStorage implements DbCheckpointListener, ReadOnlyMetastorage, R
/** {@inheritDoc} */
@Override public Serializable read(String key) throws IgniteCheckedException {
- byte[] data = getData(key);
+ byte[] data = readRaw(key);
- Object result = null;
+ Serializable res = null;
if (data != null)
- result = marshaller.unmarshal(data, getClass().getClassLoader());
+ res = marshaller.unmarshal(data, U.gridClassLoader());
- return (Serializable)result;
+ return res;
}
+
/** {@inheritDoc} */
- @Override public Map<String, ? extends Serializable> readForPredicate(IgnitePredicate<String> keyPred)
- throws IgniteCheckedException {
- Map<String, Serializable> res = null;
+ @Override public void iterate(
+ String keyPrefix,
+ BiConsumer<String, ? super Serializable> cb,
+ boolean unmarshal
+ ) throws IgniteCheckedException {
+ if (empty)
+ return;
- if (readOnly) {
- if (empty)
- return Collections.emptyMap();
+ Iterator<Map.Entry<String, byte[]>> updatesIter = null;
+ if (readOnly) {
if (lastUpdates != null) {
- for (Map.Entry<String, byte[]> lastUpdate : lastUpdates.entrySet()) {
- if (keyPred.apply(lastUpdate.getKey())) {
- byte[] valBytes = lastUpdate.getValue();
+ SortedMap<String, byte[]> prefixedSubmap = lastUpdates.subMap(keyPrefix, keyPrefix + "\uFFFF");
- if (valBytes == TOMBSTONE)
- continue;
+ if (!prefixedSubmap.isEmpty())
+ updatesIter = prefixedSubmap.entrySet().iterator();
+ }
+ }
- if (res == null)
- res = new HashMap<>();
+ Map.Entry<String, byte[]> curUpdatesEntry = null;
- Serializable val = marshaller.unmarshal(valBytes, getClass().getClassLoader());
+ if (updatesIter != null) {
+ assert updatesIter.hasNext();
- res.put(lastUpdate.getKey(), val);
- }
- }
- }
+ curUpdatesEntry = updatesIter.next();
}
- GridCursor<MetastorageDataRow> cur = tree.find(null, null);
+ MetastorageDataRow lower = new MetastorageDataRow(keyPrefix, null);
+
+ MetastorageDataRow upper = new MetastorageDataRow(keyPrefix + "\uFFFF", null);
+
+ GridCursor<MetastorageDataRow> cur = tree.find(lower, upper);
while (cur.next()) {
MetastorageDataRow row = cur.get();
@@ -307,24 +312,49 @@ public class MetaStorage implements DbCheckpointListener, ReadOnlyMetastorage, R
String key = row.key();
byte[] valBytes = row.value();
- if (keyPred.apply(key)) {
- // Either already added it, or this is a tombstone -> ignore.
- if (lastUpdates != null && lastUpdates.containsKey(key))
- continue;
-
- if (res == null)
- res = new HashMap<>();
+ int c = 0;
- Serializable val = marshaller.unmarshal(valBytes, getClass().getClassLoader());
+ while (curUpdatesEntry != null && (c = curUpdatesEntry.getKey().compareTo(key)) < 0)
+ curUpdatesEntry = advanceCurrentUpdatesEntry(cb, unmarshal, updatesIter, curUpdatesEntry);
- res.put(key, val);
- }
+ if (curUpdatesEntry != null && c == 0)
+ curUpdatesEntry = advanceCurrentUpdatesEntry(cb, unmarshal, updatesIter, curUpdatesEntry);
+ else
+ applyCallback(cb, unmarshal, key, valBytes);
}
- if (res == null)
- res = Collections.emptyMap();
+ while (curUpdatesEntry != null)
+ curUpdatesEntry = advanceCurrentUpdatesEntry(cb, unmarshal, updatesIter, curUpdatesEntry);
+ }
- return res;
+ /** */
+ private Map.Entry<String, byte[]> advanceCurrentUpdatesEntry(
+ BiConsumer<String, ? super Serializable> cb,
+ boolean unmarshal,
+ Iterator<Map.Entry<String, byte[]>> updatesIter,
+ Map.Entry<String, byte[]> curUpdatesEntry
+ ) throws IgniteCheckedException {
+ applyCallback(cb, unmarshal, curUpdatesEntry.getKey(), curUpdatesEntry.getValue());
+
+ return updatesIter.hasNext() ? updatesIter.next() : null;
+ }
+
+ /** */
+ private void applyCallback(
+ BiConsumer<String, ? super Serializable> cb,
+ boolean unmarshal,
+ String key,
+ byte[] valBytes
+ ) throws IgniteCheckedException {
+ if (valBytes != TOMBSTONE) {
+ if (unmarshal) {
+ Serializable val = marshaller.unmarshal(valBytes, U.gridClassLoader());
+
+ cb.accept(key, val);
+ }
+ else
+ cb.accept(key, valBytes);
+ }
}
/**
@@ -350,7 +380,7 @@ public class MetaStorage implements DbCheckpointListener, ReadOnlyMetastorage, R
byte[] data = marshaller.marshal(val);
- putData(key, data);
+ writeRaw(key, data);
}
/** {@inheritDoc} */
@@ -358,8 +388,8 @@ public class MetaStorage implements DbCheckpointListener, ReadOnlyMetastorage, R
removeData(key);
}
- /** */
- public void putData(String key, byte[] data) throws IgniteCheckedException {
+ /** {@inheritDoc} */
+ @Override public void writeRaw(String key, byte[] data) throws IgniteCheckedException {
if (!readOnly) {
WALPointer ptr = wal.log(new MetastoreDataRecord(key, data));
@@ -380,8 +410,8 @@ public class MetaStorage implements DbCheckpointListener, ReadOnlyMetastorage, R
}
}
- /** */
- public byte[] getData(String key) throws IgniteCheckedException {
+ /** {@inheritDoc} */
+ @Override public byte[] readRaw(String key) throws IgniteCheckedException {
if (readOnly) {
if (lastUpdates != null) {
byte[] res = lastUpdates.get(key);
@@ -608,13 +638,13 @@ public class MetaStorage implements DbCheckpointListener, ReadOnlyMetastorage, R
public void applyUpdate(String key, byte[] value) throws IgniteCheckedException {
if (readOnly) {
if (lastUpdates == null)
- lastUpdates = new HashMap<>();
+ lastUpdates = new TreeMap<>();
lastUpdates.put(key, value != null ? value : TOMBSTONE);
}
else {
if (value != null)
- putData(key, value);
+ writeRaw(key, value);
else
removeData(key);
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/ReadOnlyMetastorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/ReadOnlyMetastorage.java
index 86cbf0d..40d2245 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/ReadOnlyMetastorage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/ReadOnlyMetastorage.java
@@ -17,10 +17,8 @@
package org.apache.ignite.internal.processors.cache.persistence.metastorage;
import java.io.Serializable;
-import java.util.Map;
-
+import java.util.function.BiConsumer;
import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.lang.IgnitePredicate;
/**
*
@@ -29,12 +27,21 @@ public interface ReadOnlyMetastorage {
/** */
Serializable read(String key) throws IgniteCheckedException;
+ /** */
+ byte[] readRaw(String key) throws IgniteCheckedException;
+
/**
- * Read all keys matching provided predicate.
+ * Read all key/value pairs where key has provided prefix.
+ * It is guaranteed that callback will be applied to matching keys in ascending order.
*
- * @param keyPred Key predicate.
- * @return Matched key-value pairs.
+ * @param keyPrefix Key prefix.
+ * @param cb Callback to invoke on each matching key/value pair.
+ * @param unmarshal {@code True} if object passed into {@code cb} should be unmarshalled.
* @throws IgniteCheckedException If failed.
*/
- Map<String, ? extends Serializable> readForPredicate(IgnitePredicate<String> keyPred) throws IgniteCheckedException;
+ public void iterate(
+ String keyPrefix,
+ BiConsumer<String, ? super Serializable> cb,
+ boolean unmarshal
+ ) throws IgniteCheckedException;
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/ReadWriteMetastorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/ReadWriteMetastorage.java
index ab48afc..41cd7ba 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/ReadWriteMetastorage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/ReadWriteMetastorage.java
@@ -28,5 +28,8 @@ public interface ReadWriteMetastorage extends ReadOnlyMetastorage {
public void write(@NotNull String key, @NotNull Serializable val) throws IgniteCheckedException;
/** */
+ public void writeRaw(String key, byte[] data) throws IgniteCheckedException;
+
+ /** */
public void remove(@NotNull String key) throws IgniteCheckedException;
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
index e56b05b..72f2f3b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
@@ -69,6 +69,7 @@ import org.apache.ignite.internal.processors.igfs.IgfsProcessorAdapter;
import org.apache.ignite.internal.processors.job.GridJobProcessor;
import org.apache.ignite.internal.processors.jobmetrics.GridJobMetricsProcessor;
import org.apache.ignite.internal.processors.marshaller.GridMarshallerMappingProcessor;
+import org.apache.ignite.internal.processors.metastorage.DistributedMetaStorage;
import org.apache.ignite.internal.processors.odbc.ClientListenerProcessor;
import org.apache.ignite.internal.processors.platform.PlatformProcessor;
import org.apache.ignite.internal.processors.plugin.IgnitePluginProcessor;
@@ -301,6 +302,11 @@ public class StandaloneGridKernalContext implements GridKernalContext {
}
/** {@inheritDoc} */
+ @Override public DistributedMetaStorage distributedMetastorage() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
@Override public GridTaskSessionProcessor session() {
return null;
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
index b347d39..e04feff 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
@@ -66,7 +66,6 @@ import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.C1;
import org.apache.ignite.internal.util.typedef.CI1;
-import org.apache.ignite.internal.util.typedef.CI2;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -630,11 +629,8 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
sharedCtx.io().addCacheHandler(
0, GridChangeGlobalStateMessageResponse.class,
- new CI2<UUID, GridChangeGlobalStateMessageResponse>() {
- @Override public void apply(UUID nodeId, GridChangeGlobalStateMessageResponse msg) {
- processChangeGlobalStateResponse(nodeId, msg);
- }
- });
+ this::processChangeGlobalStateResponse
+ );
}
/** {@inheritDoc} */
@@ -1163,8 +1159,6 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
@Override public void run() {
boolean client = ctx.clientNode();
- Exception e = null;
-
try {
if (ctx.service() instanceof GridServiceProcessor) {
GridServiceProcessor srvcProc = (GridServiceProcessor)ctx.service();
@@ -1182,6 +1176,8 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I
ctx.encryption().onActivate(ctx);
+ ((IgniteChangeGlobalStateSupport)ctx.distributedMetastorage()).onActivate(ctx);
+
if (log.isInfoEnabled())
log.info("Successfully performed final activation steps [nodeId="
+ ctx.localNodeId() + ", client=" + client + ", topVer=" + req.topologyVersion() + "]");
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStorage.java
new file mode 100644
index 0000000..47236cb
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStorage.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.metastorage;
+
+import java.io.Serializable;
+import org.apache.ignite.IgniteCheckedException;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * API for distributed data storage with the ability to write into it.
+ *
+ * @see ReadableDistributedMetaStorage
+ */
+public interface DistributedMetaStorage extends ReadableDistributedMetaStorage {
+ /**
+ * Write value into distributed metastorage.
+ *
+ * @param key The key.
+ * @param val Value to write. Must not be null.
+ * @throws IgniteCheckedException If cluster is in deactivated state.
+ */
+ void write(@NotNull String key, @NotNull Serializable val) throws IgniteCheckedException;
+
+ /**
+ * Remove value from distributed metastorage.
+ *
+ * @param key The key.
+ * @throws IgniteCheckedException If cluster is in deactivated state.
+ */
+ void remove(@NotNull String key) throws IgniteCheckedException;
+
+ /**
+ * Write value into distributed metastorage but only if current value matches the expected one.
+ *
+ * @param key The key.
+ * @param expVal Expected value. Might be null.
+ * @param newVal Value to write. Must not be null.
+ * @throws IgniteCheckedException If cluster is in deactivated state.
+ * @return {@code True} if expected value matched the actual one and write was completed successfully.
+ * {@code False} otherwise.
+ */
+ boolean compareAndSet(
+ @NotNull String key,
+ @Nullable Serializable expVal,
+ @NotNull Serializable newVal
+ ) throws IgniteCheckedException;
+
+ /**
+ * Remove value from distributed metastorage but only if current value matches the expected one.
+ *
+ * @param key The key.
+ * @param expVal Expected value. Must not be null.
+ * @throws IgniteCheckedException If cluster is in deactivated state.
+ * @return {@code True} if expected value matched the actual one and remove was completed successfully.
+ * {@code False} otherwise.
+ */
+ boolean compareAndRemove(@NotNull String key, @NotNull Serializable expVal) throws IgniteCheckedException;
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStorageListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStorageListener.java
new file mode 100644
index 0000000..8c5cb30
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStorageListener.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.metastorage;
+
+import java.io.Serializable;
+import java.util.function.Predicate;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Listener for distributed metastorage data updates.
+ *
+ * @see ReadableDistributedMetaStorage#listen(Predicate, DistributedMetaStorageListener)
+ */
+@FunctionalInterface
+public interface DistributedMetaStorageListener<T extends Serializable> {
+ /**
+ * Invoked in two cases:
+ * <ul>
+ * <li>data was dinamicaly updated;</li>
+ * <li>node was activated. In this case {@code oldVal} and {@code newVal} might be different only if new data
+ * was received from cluster before activation</li>
+ * </ul>
+ *
+ * @param key The key.
+ * @param oldVal Previous value associated with the key.
+ * @param newVal New value after update.
+ */
+ void onUpdate(@NotNull String key, @Nullable T oldVal, @Nullable T newVal);
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/DistributedMetastorageLifecycleListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/DistributedMetastorageLifecycleListener.java
new file mode 100644
index 0000000..6c07ddc
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/DistributedMetastorageLifecycleListener.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.metastorage;
+
+/**
+ * Listener for {@link DistributedMetaStorage} lifecycle events.
+ */
+public interface DistributedMetastorageLifecycleListener {
+ /**
+ * Called when global metastorage is ready for reading.
+ * <br>
+ * Normally this is the place where you should add listeners and read required data.
+ * Note that this data might be outdated.
+ *
+ * @param metastorage Read-only global metastorage.
+ * @see DistributedMetaStorageListener
+ */
+ default void onReadyForRead(ReadableDistributedMetaStorage metastorage) {}
+
+ /**
+ * Called when global metastorage is available for writing. Given instance guaranteed to be
+ * valid until cluster deactivation. In such case this method will be invoked once cluster
+ * is reactivated.
+ *
+ * @param metastorage Global metastorage instance.
+ */
+ default void onReadyForWrite(DistributedMetaStorage metastorage) {}
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/ReadableDistributedMetaStorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/ReadableDistributedMetaStorage.java
new file mode 100644
index 0000000..7047079
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/ReadableDistributedMetaStorage.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.metastorage;
+
+import java.io.Serializable;
+import java.util.function.BiConsumer;
+import java.util.function.Predicate;
+import org.apache.ignite.IgniteCheckedException;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * API for distributed data storage. It is guaranteed that every read value is the same on every node in the cluster
+ * all the time.
+ */
+public interface ReadableDistributedMetaStorage {
+ /**
+ * Get value by the key.
+ *
+ * @param key The key.
+ * @return Value associated with the key.
+ * @throws IgniteCheckedException If reading or unmarshalling failed.
+ */
+ @Nullable <T extends Serializable> T read(@NotNull String key) throws IgniteCheckedException;
+
+ /**
+ * Iterate over all values corresponding to the keys with given prefix. It is guaranteed that iteration will be
+ * executed in ascending keys order.
+ *
+ * @param keyPrefix Prefix for the keys that will be iterated.
+ * @param cb Callback that will be applied to all {@code <key, value>} pairs.
+ * @throws IgniteCheckedException If reading or unmarshalling failed.
+ */
+ void iterate(
+ @NotNull String keyPrefix,
+ @NotNull BiConsumer<String, ? super Serializable> cb
+ ) throws IgniteCheckedException;
+
+ /**
+ * Add listener on data updates.
+ *
+ * @param keyPred Predicate to check whether this listener should be invoked on given key update or not.
+ * @param lsnr Listener object.
+ * @see DistributedMetaStorageListener
+ */
+ void listen(@NotNull Predicate<String> keyPred, DistributedMetaStorageListener<?> lsnr);
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageBridge.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageBridge.java
new file mode 100644
index 0000000..f6d83fa
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageBridge.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.metastorage.persistence;
+
+import java.io.Serializable;
+import java.util.function.BiConsumer;
+import org.apache.ignite.IgniteCheckedException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Bridge interface to access data storage in {@link DistributedMetaStorageImpl}.
+ */
+interface DistributedMetaStorageBridge {
+ /**
+ * Get data by key.
+ *
+ * @param globalKey The key.
+ * @param unmarshal Whether the value should be unmarshalled or not.
+ * @return Value associated with the key.
+ * @throws IgniteCheckedException If reading or unmarshalling failed.
+ */
+ Serializable read(String globalKey, boolean unmarshal) throws IgniteCheckedException;
+
+ /**
+ * Iterate over all values corresponding to the keys with given prefix. It is guaranteed that iteration will be
+ * executed in ascending keys order.
+ *
+ * @param globalKeyPrefix Prefix for the keys that will be iterated.
+ * @param cb Callback that will be applied to all {@code <key, value>} pairs.
+ * @throws IgniteCheckedException If reading or unmarshalling failed.
+ */
+ void iterate(
+ String globalKeyPrefix,
+ BiConsumer<String, ? super Serializable> cb,
+ boolean unmarshal
+ ) throws IgniteCheckedException;
+
+ /**
+ * Write data into storage.
+ *
+ * @param globalKey The key.
+ * @param valBytes Value bytes.
+ * @throws IgniteCheckedException If some IO problem occured.
+ */
+ void write(String globalKey, @Nullable byte[] valBytes) throws IgniteCheckedException;
+
+ /**
+ * Invoked when update message was received. Prepares storage to the writing of new value and notifies listeners
+ * (optionally).
+ *
+ * @param histItem Update data.
+ * @param val Unmarshalled value that needs to be written. This value is ignored if listeners shouldn't be notified.
+ * @param notifyListeners Whether listeners should be notified about update or not.
+ * @throws IgniteCheckedException If some IO or unmarshalling errors occured.
+ */
+ void onUpdateMessage(
+ DistributedMetaStorageHistoryItem histItem,
+ Serializable val,
+ boolean notifyListeners
+ ) throws IgniteCheckedException;
+
+ /**
+ * Remove information about the specific update from the history.
+ *
+ * @param ver Specific version for which the update information should be deleted.
+ * @throws IgniteCheckedException If some IO error occured.
+ */
+ void removeHistoryItem(long ver) throws IgniteCheckedException;
+
+ /**
+ * Returns all {@code <key, value>} pairs currently stored in distributed metastorage. Values are not unmarshalled.
+ *
+ * @return Array of all keys and values.
+ */
+ DistributedMetaStorageHistoryItem[] localFullData() throws IgniteCheckedException;
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/ReadWriteMetastorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageCasAckMessage.java
similarity index 53%
copy from modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/ReadWriteMetastorage.java
copy to modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageCasAckMessage.java
index ab48afc..a403454 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/ReadWriteMetastorage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageCasAckMessage.java
@@ -14,19 +14,33 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.cache.persistence.metastorage;
-import java.io.Serializable;
-import org.apache.ignite.IgniteCheckedException;
-import org.jetbrains.annotations.NotNull;
+package org.apache.ignite.internal.processors.metastorage.persistence;
+
+import java.util.UUID;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/** */
+class DistributedMetaStorageCasAckMessage extends DistributedMetaStorageUpdateAckMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private final boolean updated;
-/**
- *
- */
-public interface ReadWriteMetastorage extends ReadOnlyMetastorage {
/** */
- public void write(@NotNull String key, @NotNull Serializable val) throws IgniteCheckedException;
+ public DistributedMetaStorageCasAckMessage(UUID reqId, boolean active, boolean updated) {
+ super(reqId, active);
+ this.updated = updated;
+ }
/** */
- public void remove(@NotNull String key) throws IgniteCheckedException;
+ public boolean updated() {
+ return updated;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(DistributedMetaStorageCasAckMessage.class, this);
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageCasMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageCasMessage.java
new file mode 100644
index 0000000..d5d5f8f
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageCasMessage.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.metastorage.persistence;
+
+import java.util.UUID;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
+
+/** */
+class DistributedMetaStorageCasMessage extends DistributedMetaStorageUpdateMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private final byte[] expectedVal;
+
+ /** */
+ private boolean matches = true;
+
+ /** */
+ public DistributedMetaStorageCasMessage(UUID reqId, String key, byte[] expValBytes, byte[] valBytes) {
+ super(reqId, key, valBytes);
+
+ expectedVal = expValBytes;
+ }
+
+ /** */
+ public byte[] expectedValue() {
+ return expectedVal;
+ }
+
+ /** */
+ public void setMatches(boolean matches) {
+ this.matches = matches;
+ }
+
+ /** */
+ public boolean matches() {
+ return matches;
+ }
+
+ /** {@inheritDoc} */
+ @Override @Nullable public DiscoveryCustomMessage ackMessage() {
+ return new DistributedMetaStorageCasAckMessage(requestId(), isActive(), matches);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(DistributedMetaStorageCasMessage.class, this);
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageClusterNodeData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageClusterNodeData.java
new file mode 100644
index 0000000..94351dc
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageClusterNodeData.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.metastorage.persistence;
+
+import java.io.Serializable;
+
+/** */
+@SuppressWarnings("PublicField")
+class DistributedMetaStorageClusterNodeData implements Serializable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ public final DistributedMetaStorageVersion ver;
+
+ /** */
+ public final DistributedMetaStorageHistoryItem[] fullData;
+
+ /** */
+ public final DistributedMetaStorageHistoryItem[] hist;
+
+ /** */
+ public DistributedMetaStorageHistoryItem[] updates;
+
+ /** */
+ public DistributedMetaStorageClusterNodeData(
+ DistributedMetaStorageVersion ver,
+ DistributedMetaStorageHistoryItem[] fullData,
+ DistributedMetaStorageHistoryItem[] hist,
+ DistributedMetaStorageHistoryItem[] updates
+ ) {
+ this.fullData = fullData;
+ this.ver = ver;
+ this.hist = hist;
+ this.updates = updates;
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageHistoryItem.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageHistoryItem.java
new file mode 100644
index 0000000..2d1ab50
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageHistoryItem.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.metastorage.persistence;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/** */
+@SuppressWarnings("PublicField")
+class DistributedMetaStorageHistoryItem implements Serializable {
+ /** */
+ public static final DistributedMetaStorageHistoryItem[] EMPTY_ARRAY = {};
+
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ @GridToStringInclude
+ public final String key;
+
+ /** */
+ @GridToStringInclude
+ public final byte[] valBytes;
+
+ /** */
+ public DistributedMetaStorageHistoryItem(String key, byte[] valBytes) {
+ this.key = key;
+ this.valBytes = valBytes;
+ }
+
+ /** */
+ public long estimateSize() {
+ // String encoding is ignored to make estimation faster. 2 "size" values added as well.
+ return 8 + key.length() * 2 + (valBytes == null ? 0 : valBytes.length);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ DistributedMetaStorageHistoryItem item = (DistributedMetaStorageHistoryItem)o;
+
+ return key.equals(item.key) && Arrays.equals(valBytes, item.valBytes);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return 31 * key.hashCode() + Arrays.hashCode(valBytes);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(DistributedMetaStorageHistoryItem.class, this);
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java
new file mode 100644
index 0000000..d96c73d
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java
@@ -0,0 +1,1169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.metastorage.persistence;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.function.BiConsumer;
+import java.util.function.Predicate;
+import java.util.stream.LongStream;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.failure.FailureContext;
+import org.apache.ignite.failure.FailureType;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
+import org.apache.ignite.internal.processors.GridProcessorAdapter;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage;
+import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetastorageLifecycleListener;
+import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadOnlyMetastorage;
+import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadWriteMetastorage;
+import org.apache.ignite.internal.processors.cluster.BaselineTopology;
+import org.apache.ignite.internal.processors.cluster.IgniteChangeGlobalStateSupport;
+import org.apache.ignite.internal.processors.metastorage.DistributedMetaStorage;
+import org.apache.ignite.internal.processors.metastorage.DistributedMetaStorageListener;
+import org.apache.ignite.internal.processors.metastorage.DistributedMetastorageLifecycleListener;
+import org.apache.ignite.internal.processors.subscription.GridInternalSubscriptionProcessor;
+import org.apache.ignite.internal.util.GridConcurrentLinkedHashSet;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.spi.IgniteNodeValidationResult;
+import org.apache.ignite.spi.discovery.DiscoveryDataBag;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.TestOnly;
+
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_GLOBAL_METASTORAGE_HISTORY_MAX_BYTES;
+import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType.META_STORAGE;
+import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isPersistenceEnabled;
+import static org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageHistoryItem.EMPTY_ARRAY;
+import static org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUtil.historyItemPrefix;
+import static org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUtil.historyItemVer;
+import static org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUtil.marshal;
+import static org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUtil.unmarshal;
+
+/**
+ * Implementation of {@link DistributedMetaStorage} based on {@link MetaStorage} for persistence and discovery SPI
+ * for communication.
+ */
+public class DistributedMetaStorageImpl extends GridProcessorAdapter
+ implements DistributedMetaStorage, IgniteChangeGlobalStateSupport
+{
+ /** Component ID required for {@link DiscoveryDataBag} instances. */
+ private static final int COMPONENT_ID = META_STORAGE.ordinal();
+
+ /** Default upper bound of history size in bytes. */
+ private static final long DFLT_MAX_HISTORY_BYTES = 100 * 1024 * 1024;
+
+ /** Cached subscription processor instance. Exists to make code shorter. */
+ private final GridInternalSubscriptionProcessor subscrProcessor;
+
+ /** Bridge. Has some "phase-specific" code. Exists to avoid countless {@code if}s in code. */
+ private volatile DistributedMetaStorageBridge bridge = new NotAvailableDistributedMetaStorageBridge();
+
+ /**
+ * {@link MetastorageLifecycleListener#onReadyForReadWrite(ReadWriteMetastorage)} is invoked asynchronously after
+ * cluster activation so there's a chance of a gap where someone alreasy tries to write data but distributed
+ * metastorage is not "writeable". Current latch aims to resolve this issue - every "write" action waits for it
+ * before actually trying to write anything.
+ */
+ private volatile CountDownLatch writeAvailable = new CountDownLatch(1);
+
+ /**
+ * Version of distributed metastorage.
+ */
+ volatile DistributedMetaStorageVersion ver;
+
+ /** Listeners set. */
+ final Set<IgniteBiTuple<Predicate<String>, DistributedMetaStorageListener<Serializable>>> lsnrs =
+ new GridConcurrentLinkedHashSet<>();
+
+ /**
+ * Map that contains latest changes in distributed metastorage. There should be no gaps in versions and the latest
+ * version is always present in the map. This means that the map is empty only if version is 0.
+ */
+ //TODO Use something similar to java.util.ArrayDeque.
+ private final Map<Long, DistributedMetaStorageHistoryItem> histCache = new ConcurrentHashMap<>();
+
+ /** Approximate number of bytes in values of {@link #histCache} map. */
+ private long histSizeApproximation;
+
+ /**
+ * Maximal acceptable value of {@link #histSizeApproximation}. After every write history would shrink until its size
+ * is not greater then given value.
+ */
+ private final long histMaxBytes = IgniteSystemProperties.getLong(
+ IGNITE_GLOBAL_METASTORAGE_HISTORY_MAX_BYTES,
+ DFLT_MAX_HISTORY_BYTES
+ );
+
+ /**
+ * Map with futures used to wait for async write/remove operations completion.
+ */
+ private final ConcurrentMap<UUID, GridFutureAdapter<Boolean>> updateFuts = new ConcurrentHashMap<>();
+
+ /**
+ * Some extra values that are useful only when node is not active. Otherwise it is nullized to remove
+ * excessive data from the heap.
+ *
+ * @see StartupExtras
+ */
+ private volatile StartupExtras startupExtras = new StartupExtras();
+
+ /**
+ * Lock to access/update {@link #bridge} and {@link #startupExtras} fields (probably some others as well).
+ */
+ private final Object innerStateLock = new Object();
+
+ /**
+ * Becomes {@code true} if node was deactivated, this information is useful for joining node validation.
+ *
+ * @see #validateNode(ClusterNode, DiscoveryDataBag.JoiningNodeDiscoveryData)
+ */
+ private boolean wasDeactivated;
+
+ /**
+ * @param ctx Kernal context.
+ */
+ public DistributedMetaStorageImpl(GridKernalContext ctx) {
+ super(ctx);
+
+ subscrProcessor = ctx.internalSubscriptionProcessor();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void start() throws IgniteCheckedException {
+ if (ctx.clientNode())
+ return;
+
+ if (isPersistenceEnabled(ctx.config())) {
+ subscrProcessor.registerMetastorageListener(new MetastorageLifecycleListener() {
+ /** {@inheritDoc} */
+ @Override public void onReadyForRead(
+ ReadOnlyMetastorage metastorage
+ ) throws IgniteCheckedException {
+ onMetaStorageReadyForRead(metastorage);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onReadyForReadWrite(
+ ReadWriteMetastorage metastorage
+ ) throws IgniteCheckedException {
+ onMetaStorageReadyForWrite(metastorage);
+ }
+ });
+ }
+ else {
+ ver = DistributedMetaStorageVersion.INITIAL_VERSION;
+
+ bridge = new EmptyDistributedMetaStorageBridge();
+
+ for (DistributedMetastorageLifecycleListener subscriber : subscrProcessor.getDistributedMetastorageSubscribers())
+ subscriber.onReadyForRead(this);
+ }
+
+ GridDiscoveryManager discovery = ctx.discovery();
+
+ discovery.setCustomEventListener(
+ DistributedMetaStorageUpdateMessage.class,
+ this::onUpdateMessage
+ );
+
+ discovery.setCustomEventListener(
+ DistributedMetaStorageUpdateAckMessage.class,
+ this::onAckMessage
+ );
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onKernalStart(boolean active) throws IgniteCheckedException {
+ if (active)
+ onActivate(ctx);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onActivate(GridKernalContext kctx) throws IgniteCheckedException {
+ if (ctx.clientNode())
+ return;
+
+ if (!isPersistenceEnabled(ctx.config())) {
+ if (!(bridge instanceof InMemoryCachedDistributedMetaStorageBridge)) {
+ synchronized (innerStateLock) {
+ assert startupExtras != null;
+
+ InMemoryCachedDistributedMetaStorageBridge memCachedBridge =
+ new InMemoryCachedDistributedMetaStorageBridge(this);
+
+ memCachedBridge.restore(startupExtras);
+
+ executeDeferredUpdates(memCachedBridge);
+
+ bridge = memCachedBridge;
+
+ startupExtras = null;
+ }
+ }
+
+ for (DistributedMetastorageLifecycleListener subscriber : subscrProcessor.getDistributedMetastorageSubscribers())
+ subscriber.onReadyForWrite(this);
+
+ writeAvailable.countDown();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onDeActivate(GridKernalContext kctx) {
+ if (ctx.clientNode())
+ return;
+
+ synchronized (innerStateLock) {
+ wasDeactivated = true;
+
+ if (isPersistenceEnabled(ctx.config())) {
+ try {
+ DistributedMetaStorageHistoryItem[] locFullData = bridge.localFullData();
+
+ bridge = new ReadOnlyDistributedMetaStorageBridge(locFullData);
+ }
+ catch (IgniteCheckedException e) {
+ throw criticalError(e);
+ }
+
+ startupExtras = new StartupExtras();
+ }
+
+ if (writeAvailable.getCount() > 0)
+ writeAvailable.countDown();
+
+ writeAvailable = new CountDownLatch(1);
+ }
+ }
+
+ /** Whether cluster is active at this moment or not. Also returns {@code true} if cluster is being activated. */
+ private boolean isActive() {
+ return ctx.state().clusterState().active();
+ }
+
+ /**
+ * Implementation for {@link MetastorageLifecycleListener#onReadyForRead(ReadOnlyMetastorage)} listener.
+ * Invoked after node was started but before it was activated (only in persistent clusters).
+ *
+ * @param metastorage Local metastorage instance available for reading.
+ * @throws IgniteCheckedException If there were any issues while metastorage reading.
+ * @see MetastorageLifecycleListener#onReadyForRead(ReadOnlyMetastorage)
+ */
+ private void onMetaStorageReadyForRead(ReadOnlyMetastorage metastorage) throws IgniteCheckedException {
+ assert isPersistenceEnabled(ctx.config());
+
+ assert startupExtras != null;
+
+ ReadOnlyDistributedMetaStorageBridge readOnlyBridge = new ReadOnlyDistributedMetaStorageBridge();
+
+ lock();
+
+ try {
+ ver = readOnlyBridge.readInitialData(metastorage, startupExtras);
+
+ metastorage.iterate(
+ historyItemPrefix(),
+ (key, val) -> addToHistoryCache(historyItemVer(key), (DistributedMetaStorageHistoryItem)val),
+ true
+ );
+ }
+ finally {
+ unlock();
+ }
+
+ bridge = readOnlyBridge;
+
+ for (DistributedMetastorageLifecycleListener subscriber : subscrProcessor.getDistributedMetastorageSubscribers())
+ subscriber.onReadyForRead(this);
+ }
+
+ /**
+ * Implementation for {@link MetastorageLifecycleListener#onReadyForReadWrite(ReadWriteMetastorage)} listener.
+ * Invoked after each activation (only in persistent clusters).
+ *
+ * @param metastorage Local metastorage instance available for writing.
+ * @throws IgniteCheckedException If there were any errors while accessing local metastorage.
+ * @see MetastorageLifecycleListener#onReadyForReadWrite(ReadWriteMetastorage)
+ */
+ private void onMetaStorageReadyForWrite(ReadWriteMetastorage metastorage) throws IgniteCheckedException {
+ assert isPersistenceEnabled(ctx.config());
+
+ synchronized (innerStateLock) {
+ WritableDistributedMetaStorageBridge writableBridge = new WritableDistributedMetaStorageBridge(this, metastorage);
+
+ if (startupExtras != null) {
+ lock();
+
+ try {
+ writableBridge.restore(startupExtras);
+ }
+ finally {
+ unlock();
+ }
+
+ executeDeferredUpdates(writableBridge);
+ }
+
+ bridge = writableBridge;
+
+ startupExtras = null;
+ }
+
+ for (DistributedMetastorageLifecycleListener subscriber : subscrProcessor.getDistributedMetastorageSubscribers())
+ subscriber.onReadyForWrite(this);
+
+ writeAvailable.countDown();
+ }
+
+ /** {@inheritDoc} */
+ @Override @Nullable public <T extends Serializable> T read(@NotNull String key) throws IgniteCheckedException {
+ lock();
+
+ try {
+ return (T)bridge.read(key, true);
+ }
+ finally {
+ unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void write(@NotNull String key, @NotNull Serializable val) throws IgniteCheckedException {
+ assert val != null : key;
+
+ startWrite(key, marshal(val));
+ }
+
+ /** {@inheritDoc} */
+ @Override public void remove(@NotNull String key) throws IgniteCheckedException {
+ startWrite(key, null);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean compareAndSet(
+ @NotNull String key,
+ @Nullable Serializable expVal,
+ @NotNull Serializable newVal
+ ) throws IgniteCheckedException {
+ assert newVal != null : key;
+
+ return startCas(key, marshal(expVal), marshal(newVal));
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean compareAndRemove(
+ @NotNull String key,
+ @NotNull Serializable expVal
+ ) throws IgniteCheckedException {
+ assert expVal != null : key;
+
+ return startCas(key, marshal(expVal), null);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void iterate(
+ @NotNull String keyPrefix,
+ @NotNull BiConsumer<String, ? super Serializable> cb
+ ) throws IgniteCheckedException {
+ lock();
+
+ try {
+ bridge.iterate(keyPrefix, cb, true);
+ }
+ finally {
+ unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void listen(@NotNull Predicate<String> keyPred, DistributedMetaStorageListener<?> lsnr) {
+ DistributedMetaStorageListener<Serializable> lsnrUnchecked = (DistributedMetaStorageListener<Serializable>)lsnr;
+
+ lsnrs.add(new IgniteBiTuple<>(keyPred, lsnrUnchecked));
+ }
+
+ /** {@inheritDoc} */
+ @Override @Nullable public DiscoveryDataExchangeType discoveryDataType() {
+ return META_STORAGE;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void collectJoiningNodeData(DiscoveryDataBag dataBag) {
+ if (ctx.clientNode())
+ return;
+
+ assert startupExtras != null;
+
+ DistributedMetaStorageHistoryItem[] hist = new TreeMap<>(histCache) // Sorting might be avoided if histCache is a queue
+ .values()
+ .toArray(EMPTY_ARRAY);
+
+ DistributedMetaStorageVersion verToSnd = bridge instanceof ReadOnlyDistributedMetaStorageBridge
+ ? ((ReadOnlyDistributedMetaStorageBridge)bridge).version()
+ : ver;
+
+ Serializable data = new DistributedMetaStorageJoiningNodeData(
+ getBaselineTopologyId(),
+ verToSnd,
+ hist
+ );
+
+ dataBag.addJoiningNodeData(COMPONENT_ID, data);
+ }
+
+ /** Returns current baseline topology id of {@code -1} if there's no baseline topology found. */
+ private int getBaselineTopologyId() {
+ BaselineTopology baselineTop = ctx.state().clusterState().baselineTopology();
+
+ return baselineTop != null ? baselineTop.id() : -1;
+ }
+
+ /** {@inheritDoc} */
+ @Override @Nullable public IgniteNodeValidationResult validateNode(
+ ClusterNode node,
+ DiscoveryDataBag.JoiningNodeDiscoveryData discoData
+ ) {
+ if (ctx.clientNode())
+ return null;
+
+ if (!discoData.hasJoiningNodeData() || !isPersistenceEnabled(ctx.config()))
+ return null;
+
+ DistributedMetaStorageJoiningNodeData joiningData =
+ (DistributedMetaStorageJoiningNodeData)discoData.joiningNodeData();
+
+ DistributedMetaStorageVersion remoteVer = joiningData.ver;
+
+ DistributedMetaStorageHistoryItem[] remoteHist = joiningData.hist;
+
+ int remoteHistSize = remoteHist.length;
+
+ int remoteBltId = joiningData.bltId;
+
+ boolean clusterIsActive = isActive();
+
+ String errorMsg;
+
+ synchronized (innerStateLock) {
+ DistributedMetaStorageVersion locVer = getActualVersion();
+
+ int locBltId = getBaselineTopologyId();
+
+ int locHistSize = getAvailableHistorySize();
+
+ if (remoteVer.id < locVer.id - locHistSize) {
+ // Remote node is too far behind.
+ // Technicaly this situation should be banned because there's no way to prove data consistency.
+ errorMsg = null;
+ }
+ else if (remoteVer.id < locVer.id) {
+ // Remote node it behind the cluster version and there's enough history.
+ DistributedMetaStorageVersion newRemoteVer = remoteVer.nextVersion(
+ this::historyItem,
+ remoteVer.id + 1,
+ locVer.id
+ );
+
+ if (!newRemoteVer.equals(locVer))
+ errorMsg = "Joining node has conflicting distributed metastorage data.";
+ else
+ errorMsg = null;
+ }
+ else if (remoteVer.id == locVer.id) {
+ // Remote and local versions match.
+ if (!remoteVer.equals(locVer)) {
+ errorMsg = S.toString(
+ "Joining node has conflicting distributed metastorage data:",
+ "clusterVersion", locVer, false,
+ "joiningNodeVersion", remoteVer, false
+ );
+ }
+ else
+ errorMsg = null;
+ }
+ else if (remoteVer.id <= locVer.id + remoteHistSize) {
+ // Remote node is ahead of the cluster and has enough history.
+ if (clusterIsActive) {
+ errorMsg = "Attempting to join node with larger distributed metastorage version id." +
+ " The node is most likely in invalid state and can't be joined.";
+ }
+ else if (wasDeactivated || remoteBltId < locBltId)
+ errorMsg = "Joining node has conflicting distributed metastorage data.";
+ else {
+ DistributedMetaStorageVersion newLocVer = locVer.nextVersion(
+ remoteHist,
+ remoteHistSize - (int)(remoteVer.id - locVer.id),
+ remoteHistSize
+ );
+
+ if (!newLocVer.equals(remoteVer))
+ errorMsg = "Joining node has conflicting distributed metastorage data.";
+ else
+ errorMsg = null;
+ }
+ }
+ else {
+ assert remoteVer.id > locVer.id + remoteHistSize;
+
+ // Remote node is too far ahead.
+ if (clusterIsActive) {
+ errorMsg = "Attempting to join node with larger distributed metastorage version id." +
+ " The node is most likely in invalid state and can't be joined.";
+ }
+ else if (wasDeactivated || remoteBltId < locBltId)
+ errorMsg = "Joining node has conflicting distributed metastorage data.";
+ else {
+ errorMsg = "Joining node doesn't have enough history items in distributed metastorage data." +
+ " Please check the order in which you start cluster nodes.";
+ }
+ }
+ }
+
+ return (errorMsg == null) ? null : new IgniteNodeValidationResult(node.id(), errorMsg, errorMsg);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void collectGridNodeData(DiscoveryDataBag dataBag) {
+ if (ctx.clientNode())
+ return;
+
+ DiscoveryDataBag.JoiningNodeDiscoveryData discoData = dataBag.newJoinerDiscoveryData(COMPONENT_ID);
+
+ if (!discoData.hasJoiningNodeData())
+ return;
+
+ DistributedMetaStorageJoiningNodeData joiningData =
+ (DistributedMetaStorageJoiningNodeData)discoData.joiningNodeData();
+
+ if (joiningData == null)
+ return;
+
+ DistributedMetaStorageVersion remoteVer = joiningData.ver;
+
+ synchronized (innerStateLock) {
+ //TODO Store it precalculated? Maybe later.
+ DistributedMetaStorageVersion actualVer = getActualVersion();
+
+ if (remoteVer.id > actualVer.id) {
+ assert startupExtras != null;
+
+ DistributedMetaStorageHistoryItem[] hist = joiningData.hist;
+
+ if (remoteVer.id - actualVer.id <= hist.length) {
+ assert bridge instanceof ReadOnlyDistributedMetaStorageBridge
+ || bridge instanceof EmptyDistributedMetaStorageBridge;
+
+ for (long v = actualVer.id + 1; v <= remoteVer.id; v++)
+ updateLater(hist[(int)(v - remoteVer.id + hist.length - 1)]);
+
+ Serializable nodeData = new DistributedMetaStorageClusterNodeData(remoteVer, null, null, null);
+
+ dataBag.addGridCommonData(COMPONENT_ID, nodeData);
+ }
+ else
+ assert false : "Joining node is too far ahead [remoteVer=" + remoteVer + "]";
+ }
+ else {
+ if (dataBag.commonDataCollectedFor(COMPONENT_ID))
+ return;
+
+ if (remoteVer.id == actualVer.id) {
+ assert remoteVer.equals(actualVer) : actualVer + " " + remoteVer;
+
+ Serializable nodeData = new DistributedMetaStorageClusterNodeData(ver, null, null, null);
+
+ dataBag.addGridCommonData(COMPONENT_ID, nodeData);
+ }
+ else {
+ int availableHistSize = getAvailableHistorySize();
+
+ if (actualVer.id - remoteVer.id <= availableHistSize) {
+ DistributedMetaStorageHistoryItem[] hist = history(remoteVer.id + 1, actualVer.id);
+
+ Serializable nodeData = new DistributedMetaStorageClusterNodeData(ver, null, null, hist);
+
+ dataBag.addGridCommonData(COMPONENT_ID, nodeData);
+ }
+ else {
+ DistributedMetaStorageVersion ver0;
+
+ DistributedMetaStorageHistoryItem[] fullData;
+
+ DistributedMetaStorageHistoryItem[] hist;
+
+ if (startupExtras == null || startupExtras.fullNodeData == null) {
+ ver0 = ver;
+
+ try {
+ fullData = bridge.localFullData();
+ }
+ catch (IgniteCheckedException e) {
+ throw criticalError(e);
+ }
+
+ hist = history(ver.id - histCache.size() + 1, actualVer.id);
+ }
+ else {
+ ver0 = startupExtras.fullNodeData.ver;
+
+ fullData = startupExtras.fullNodeData.fullData;
+
+ hist = startupExtras.fullNodeData.hist;
+ }
+
+ DistributedMetaStorageHistoryItem[] updates;
+
+ if (startupExtras != null)
+ updates = startupExtras.deferredUpdates.toArray(EMPTY_ARRAY);
+ else
+ updates = null;
+
+ Serializable nodeData = new DistributedMetaStorageClusterNodeData(ver0, fullData, hist, updates);
+
+ dataBag.addGridCommonData(COMPONENT_ID, nodeData);
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Returns number of all available history items. Might be a history from remote node snapshot or/and deferred
+ * updates from another remote node. Depends on the current node state.
+ */
+ private int getAvailableHistorySize() {
+ assert Thread.holdsLock(innerStateLock);
+
+ if (startupExtras == null)
+ return histCache.size();
+ else if (startupExtras.fullNodeData == null)
+ return histCache.size() + startupExtras.deferredUpdates.size();
+ else
+ return startupExtras.fullNodeData.hist.length + startupExtras.deferredUpdates.size();
+ }
+
+ /**
+ * Returns actual version from the local node. It is just a version for activated node or calculated future
+ * version otherwise.
+ */
+ private DistributedMetaStorageVersion getActualVersion() {
+ assert Thread.holdsLock(innerStateLock);
+
+ if (startupExtras == null)
+ return ver;
+ else if (startupExtras.fullNodeData == null)
+ return ver.nextVersion(startupExtras.deferredUpdates);
+ else
+ return startupExtras.fullNodeData.ver.nextVersion(startupExtras.deferredUpdates);
+ }
+
+ /**
+ * Returns last update for the specified version.
+ *
+ * @param specificVer Specific version.
+ * @return {@code <key, value>} pair if it was found, {@code null} otherwise.
+ */
+ private DistributedMetaStorageHistoryItem historyItem(long specificVer) {
+ assert Thread.holdsLock(innerStateLock);
+
+ if (startupExtras == null)
+ return histCache.get(specificVer);
+ else {
+ DistributedMetaStorageClusterNodeData fullNodeData = startupExtras.fullNodeData;
+
+ long notDeferredVer;
+
+ if (fullNodeData == null) {
+ notDeferredVer = ver.id;
+
+ if (specificVer <= notDeferredVer)
+ return histCache.get(specificVer);
+ }
+ else {
+ notDeferredVer = fullNodeData.ver.id;
+
+ if (specificVer <= notDeferredVer) {
+ int idx = (int)(specificVer - notDeferredVer + fullNodeData.hist.length - 1);
+
+ return idx >= 0 ? fullNodeData.hist[idx] : null;
+ }
+ }
+
+ assert specificVer > notDeferredVer;
+
+ int idx = (int)(specificVer - notDeferredVer - 1);
+
+ List<DistributedMetaStorageHistoryItem> deferredUpdates = startupExtras.deferredUpdates;
+
+ if (idx < deferredUpdates.size())
+ return deferredUpdates.get(idx);
+
+ return null;
+ }
+ }
+
+ /**
+ * Returns all updates in the specified range of versions.
+ *
+ * @param startVer Lower bound (inclusive).
+ * @param actualVer Upper bound (inclusive).
+ * @return Array with all requested updates sorted by version in ascending order.
+ */
+ private DistributedMetaStorageHistoryItem[] history(long startVer, long actualVer) {
+ return LongStream.rangeClosed(startVer, actualVer)
+ .mapToObj(this::historyItem)
+ .toArray(DistributedMetaStorageHistoryItem[]::new);
+ }
+
+ /**
+ * {@link DistributedMetaStorageBridge#localFullData()} invoked on {@link #bridge}.
+ */
+ @TestOnly
+ private DistributedMetaStorageHistoryItem[] localFullData() throws IgniteCheckedException {
+ return bridge.localFullData();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onGridDataReceived(DiscoveryDataBag.GridDiscoveryData data) {
+ DistributedMetaStorageClusterNodeData nodeData = (DistributedMetaStorageClusterNodeData)data.commonData();
+
+ if (nodeData != null) {
+ synchronized (innerStateLock) {
+ if (nodeData.fullData == null) {
+ if (nodeData.updates != null) {
+ for (DistributedMetaStorageHistoryItem update : nodeData.updates)
+ updateLater(update);
+ }
+ }
+ else
+ writeFullDataLater(nodeData);
+ }
+ }
+ }
+
+ /**
+ * Common implementation for {@link #write(String, Serializable)} and {@link #remove(String)}. Synchronously waits
+ * for operation to be completed.
+ *
+ * @param key The key.
+ * @param valBytes Value bytes to write. Null if value needs to be removed.
+ * @throws IgniteCheckedException If there was an error while sending discovery message or message was sent but
+ * cluster is not active.
+ */
+ private void startWrite(String key, byte[] valBytes) throws IgniteCheckedException {
+ UUID reqId = UUID.randomUUID();
+
+ GridFutureAdapter<Boolean> fut = new GridFutureAdapter<>();
+
+ updateFuts.put(reqId, fut);
+
+ DiscoveryCustomMessage msg = new DistributedMetaStorageUpdateMessage(reqId, key, valBytes);
+
+ ctx.discovery().sendCustomEvent(msg);
+
+ fut.get();
+ }
+
+ /**
+ * Basically the same as {@link #startWrite(String, byte[])} but for CAS operations.
+ */
+ private boolean startCas(String key, byte[] expValBytes, byte[] newValBytes) throws IgniteCheckedException {
+ UUID reqId = UUID.randomUUID();
+
+ GridFutureAdapter<Boolean> fut = new GridFutureAdapter<>();
+
+ updateFuts.put(reqId, fut);
+
+ DiscoveryCustomMessage msg = new DistributedMetaStorageCasMessage(reqId, key, expValBytes, newValBytes);
+
+ ctx.discovery().sendCustomEvent(msg);
+
+ return fut.get();
+ }
+
+ /**
+ * Invoked when {@link DistributedMetaStorageUpdateMessage} received. Attempts to store received data (depends on
+ * current {@link #bridge} value). Invokes failure handler with critical error if attempt failed for some reason.
+ *
+ * @param topVer Ignored.
+ * @param node Ignored.
+ * @param msg Received message.
+ */
+ private void onUpdateMessage(
+ AffinityTopologyVersion topVer,
+ ClusterNode node,
+ DistributedMetaStorageUpdateMessage msg
+ ) {
+ if (!isActive()) {
+ msg.setActive(false);
+
+ return;
+ }
+
+ try {
+ U.await(writeAvailable);
+
+ if (msg instanceof DistributedMetaStorageCasMessage)
+ completeCas(bridge, (DistributedMetaStorageCasMessage)msg);
+ else
+ completeWrite(bridge, new DistributedMetaStorageHistoryItem(msg.key(), msg.value()), true);
+ }
+ catch (IgniteCheckedException | Error e) {
+ throw criticalError(e);
+ }
+ }
+
+ /**
+ * Invoked when {@link DistributedMetaStorageUpdateAckMessage} received. Completes future if local node is the node
+ * that initiated write operation.
+ *
+ * @param topVer Ignored.
+ * @param node Ignored.
+ * @param msg Received message.
+ */
+ private void onAckMessage(
+ AffinityTopologyVersion topVer,
+ ClusterNode node,
+ DistributedMetaStorageUpdateAckMessage msg
+ ) {
+ GridFutureAdapter<Boolean> fut = updateFuts.remove(msg.requestId());
+
+ if (fut != null) {
+ if (msg.isActive()) {
+ Boolean res = msg instanceof DistributedMetaStorageCasAckMessage
+ ? ((DistributedMetaStorageCasAckMessage)msg).updated()
+ : null;
+
+ fut.onDone(res);
+ }
+ else
+ fut.onDone(new IllegalStateException("Ignite cluster is not active"));
+ }
+ }
+
+ /**
+ * Invoke failure handler and rethrow passed exception, possibly wrapped into the unchecked one.
+ */
+ private RuntimeException criticalError(Throwable e) {
+ ctx.failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
+
+ if (e instanceof Error)
+ throw (Error) e;
+
+ throw U.convertException((IgniteCheckedException)e);
+ }
+
+ /**
+ * Store data in local metastorage or in memory.
+ *
+ * @param bridge Bridge to get the access to the storage.
+ * @param histItem {@code <key, value>} pair to process.
+ * @param notifyListeners Whether listeners should be notified or not. {@code false} for data restore on activation.
+ * @throws IgniteCheckedException In case of IO/unmarshalling errors.
+ */
+ private void completeWrite(
+ DistributedMetaStorageBridge bridge,
+ DistributedMetaStorageHistoryItem histItem,
+ boolean notifyListeners
+ ) throws IgniteCheckedException {
+ Serializable val = notifyListeners ? unmarshal(histItem.valBytes) : null;
+
+ lock();
+
+ try {
+ bridge.onUpdateMessage(histItem, val, notifyListeners);
+
+ bridge.write(histItem.key, histItem.valBytes);
+ }
+ finally {
+ unlock();
+ }
+
+ addToHistoryCache(ver.id, histItem);
+
+ shrinkHistory(bridge);
+ }
+
+ /**
+ * Store data in local metastorage or in memory.
+ *
+ * @param bridge Bridge to get the access to the storage.
+ * @param msg Message with all required data.
+ * @see #completeWrite(DistributedMetaStorageBridge, DistributedMetaStorageHistoryItem, boolean)
+ */
+ private void completeCas(
+ DistributedMetaStorageBridge bridge,
+ DistributedMetaStorageCasMessage msg
+ ) throws IgniteCheckedException {
+ if (!msg.matches())
+ return;
+
+ lock();
+
+ try {
+ Serializable oldVal = bridge.read(msg.key(), true);
+
+ Serializable expVal = unmarshal(msg.expectedValue());
+
+ if (!Objects.deepEquals(oldVal, expVal)) {
+ msg.setMatches(false);
+
+ // Do nothing if expected value doesn't match with the actual one.
+ return;
+ }
+ }
+ finally {
+ unlock();
+ }
+
+ completeWrite(bridge, new DistributedMetaStorageHistoryItem(msg.key(), msg.value()), true);
+ }
+
+ /**
+ * Store current update into the in-memory history cache. {@link #histSizeApproximation} is recalculated during this
+ * process.
+ *
+ * @param ver Version for the update.
+ * @param histItem Update itself.
+ */
+ void addToHistoryCache(long ver, DistributedMetaStorageHistoryItem histItem) {
+ DistributedMetaStorageHistoryItem old = histCache.put(ver, histItem);
+
+ assert old == null : old;
+
+ histSizeApproximation += histItem.estimateSize();
+ }
+
+ /**
+ * Remove specific update from the in-memory history cache. {@link #histSizeApproximation} is recalculated during
+ * this process.
+ *
+ * @param ver Version of the update.
+ */
+ void removeFromHistoryCache(long ver) {
+ DistributedMetaStorageHistoryItem old = histCache.remove(ver);
+
+ if (old != null)
+ histSizeApproximation -= old.estimateSize();
+ }
+
+ /**
+ * Clear in-memory history cache.
+ */
+ void clearHistoryCache() {
+ histCache.clear();
+
+ histSizeApproximation = 0L;
+ }
+
+ /**
+ * Shrikn history so that its estimating size doesn't exceed {@link #histMaxBytes}.
+ */
+ private void shrinkHistory(
+ DistributedMetaStorageBridge bridge
+ ) throws IgniteCheckedException {
+ long maxBytes = histMaxBytes;
+
+ if (histSizeApproximation > maxBytes && histCache.size() > 1) {
+ lock();
+
+ try {
+ while (histSizeApproximation > maxBytes && histCache.size() > 1) {
+ bridge.removeHistoryItem(ver.id + 1 - histCache.size());
+
+ removeFromHistoryCache(ver.id + 1 - histCache.size());
+ }
+ }
+ finally {
+ unlock();
+ }
+ }
+ }
+
+ /**
+ * Add update into the list of deferred updates. Works for inactive nodes only.
+ */
+ private void updateLater(DistributedMetaStorageHistoryItem update) {
+ assert Thread.holdsLock(innerStateLock);
+
+ assert startupExtras != null;
+
+ startupExtras.deferredUpdates.add(update);
+ }
+
+ /**
+ * Invoked at the end of activation.
+ *
+ * @param bridge Bridge to access data storage.
+ * @throws IgniteCheckedException In case of IO/unmarshalling errors.
+ */
+ private void executeDeferredUpdates(DistributedMetaStorageBridge bridge) throws IgniteCheckedException {
+ assert startupExtras != null;
+
+ DistributedMetaStorageHistoryItem lastUpdate = histCache.get(ver.id);
+
+ if (lastUpdate != null) {
+ byte[] valBytes = (byte[])bridge.read(lastUpdate.key, false);
+
+ if (!Arrays.equals(valBytes, lastUpdate.valBytes)) {
+ lock();
+
+ try {
+ bridge.write(lastUpdate.key, lastUpdate.valBytes);
+ }
+ finally {
+ unlock();
+ }
+ }
+ }
+
+ for (DistributedMetaStorageHistoryItem histItem : startupExtras.deferredUpdates)
+ completeWrite(bridge, histItem, false);
+
+ notifyListenersBeforeReadyForWrite(bridge);
+ }
+
+ /**
+ * Notify listeners at the end of activation. Even if there was no data restoring.
+ *
+ * @param bridge Bridge to access data storage.
+ */
+ private void notifyListenersBeforeReadyForWrite(
+ DistributedMetaStorageBridge bridge
+ ) throws IgniteCheckedException {
+ DistributedMetaStorageHistoryItem[] oldData = this.bridge.localFullData();
+
+ DistributedMetaStorageHistoryItem[] newData = bridge.localFullData();
+
+ int oldIdx = 0, newIdx = 0;
+
+ while (oldIdx < oldData.length && newIdx < newData.length) {
+ String oldKey = oldData[oldIdx].key;
+ byte[] oldValBytes = oldData[oldIdx].valBytes;
+
+ String newKey = newData[newIdx].key;
+ byte[] newValBytes = newData[newIdx].valBytes;
+
+ int c = oldKey.compareTo(newKey);
+
+ if (c < 0) {
+ notifyListeners(oldKey, unmarshal(oldValBytes), null);
+
+ ++oldIdx;
+ }
+ else if (c > 0) {
+ notifyListeners(newKey, null, unmarshal(newValBytes));
+
+ ++newIdx;
+ }
+ else {
+ Serializable oldVal = unmarshal(oldValBytes);
+
+ Serializable newVal = Arrays.equals(oldValBytes, newValBytes) ? oldVal : unmarshal(newValBytes);
+
+ notifyListeners(oldKey, oldVal, newVal);
+
+ ++oldIdx;
+
+ ++newIdx;
+ }
+ }
+
+ for (; oldIdx < oldData.length; ++oldIdx)
+ notifyListeners(oldData[oldIdx].key, unmarshal(oldData[oldIdx].valBytes), null);
+
+ for (; newIdx < newData.length; ++newIdx)
+ notifyListeners(newData[newIdx].key, null, unmarshal(newData[newIdx].valBytes));
+ }
+
+ /**
+ * Ultimate version of {@link #updateLater(DistributedMetaStorageHistoryItem)}.
+ *
+ * @param nodeData Data received from remote node.
+ */
+ private void writeFullDataLater(DistributedMetaStorageClusterNodeData nodeData) {
+ assert Thread.holdsLock(innerStateLock);
+
+ assert nodeData.fullData != null;
+
+ startupExtras.fullNodeData = nodeData;
+
+ startupExtras.deferredUpdates.clear();
+
+ if (nodeData.updates != null) {
+ for (DistributedMetaStorageHistoryItem update : nodeData.updates)
+ updateLater(update);
+
+ nodeData.updates = null;
+ }
+ }
+
+ /**
+ * Notify listeners.
+ *
+ * @param key The key.
+ * @param oldVal Old value.
+ * @param newVal New value.
+ */
+ void notifyListeners(String key, Serializable oldVal, Serializable newVal) {
+ for (IgniteBiTuple<Predicate<String>, DistributedMetaStorageListener<Serializable>> entry : lsnrs) {
+ if (entry.get1().test(key)) {
+ try {
+ // ClassCastException might be thrown here for crappy listeners.
+ entry.get2().onUpdate(key, oldVal, newVal);
+ }
+ catch (Exception e) {
+ log.error(S.toString(
+ "Failed to notify global metastorage update listener",
+ "key", key, false,
+ "oldVal", oldVal, false,
+ "newVal", newVal, false,
+ "lsnr", entry.get2(), false
+ ), e);
+ }
+ }
+ }
+ }
+
+ /** Checkpoint read lock. */
+ private void lock() {
+ ctx.cache().context().database().checkpointReadLock();
+ }
+
+ /** Checkpoint read unlock. */
+ private void unlock() {
+ ctx.cache().context().database().checkpointReadUnlock();
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/ReadWriteMetastorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageJoiningNodeData.java
similarity index 56%
copy from modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/ReadWriteMetastorage.java
copy to modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageJoiningNodeData.java
index ab48afc..660c36d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/ReadWriteMetastorage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageJoiningNodeData.java
@@ -14,19 +14,34 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.cache.persistence.metastorage;
+
+package org.apache.ignite.internal.processors.metastorage.persistence;
import java.io.Serializable;
-import org.apache.ignite.IgniteCheckedException;
-import org.jetbrains.annotations.NotNull;
-/**
- *
- */
-public interface ReadWriteMetastorage extends ReadOnlyMetastorage {
+/** */
+@SuppressWarnings("PublicField")
+class DistributedMetaStorageJoiningNodeData implements Serializable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ public final int bltId;
+
+ /** */
+ public final DistributedMetaStorageVersion ver;
+
/** */
- public void write(@NotNull String key, @NotNull Serializable val) throws IgniteCheckedException;
+ public final DistributedMetaStorageHistoryItem[] hist;
/** */
- public void remove(@NotNull String key) throws IgniteCheckedException;
+ public DistributedMetaStorageJoiningNodeData(
+ int bltId,
+ DistributedMetaStorageVersion ver,
+ DistributedMetaStorageHistoryItem[] hist
+ ) {
+ this.bltId = bltId;
+ this.ver = ver;
+ this.hist = hist;
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageUpdateAckMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageUpdateAckMessage.java
new file mode 100644
index 0000000..0e05d93
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageUpdateAckMessage.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.metastorage.persistence;
+
+import java.util.UUID;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.Nullable;
+
+/** */
+class DistributedMetaStorageUpdateAckMessage implements DiscoveryCustomMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private final IgniteUuid id = IgniteUuid.randomUuid();
+
+ /** Request ID. */
+ private final UUID reqId;
+
+ /** */
+ private final boolean active;
+
+ /** */
+ public DistributedMetaStorageUpdateAckMessage(UUID reqId, boolean active) {
+ this.reqId = reqId;
+ this.active = active;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteUuid id() {
+ return id;
+ }
+
+ /** */
+ public UUID requestId() {
+ return reqId;
+ }
+
+ /** */
+ public boolean isAckMessage() {
+ return true;
+ }
+
+ /** */
+ public boolean isActive() {
+ return active;
+ }
+
+ /** {@inheritDoc} */
+ @Override @Nullable public DiscoveryCustomMessage ackMessage() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isMutable() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean stopProcess() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public DiscoCache createDiscoCache(
+ GridDiscoveryManager mgr,
+ AffinityTopologyVersion topVer,
+ DiscoCache discoCache
+ ) {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(DistributedMetaStorageUpdateAckMessage.class, this);
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageUpdateMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageUpdateMessage.java
new file mode 100644
index 0000000..cc3f37f
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageUpdateMessage.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.metastorage.persistence;
+
+import java.util.UUID;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.Nullable;
+
+/** */
+class DistributedMetaStorageUpdateMessage implements DiscoveryCustomMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private final IgniteUuid id = IgniteUuid.randomUuid();
+
+ /** Request ID. */
+ private final UUID reqId;
+
+ /** */
+ private final String key;
+
+ /** */
+ private final byte[] valBytes;
+
+ /** */
+ private boolean active = true;
+
+ /** */
+ public DistributedMetaStorageUpdateMessage(UUID reqId, String key, byte[] valBytes) {
+ this.reqId = reqId;
+ this.key = key;
+ this.valBytes = valBytes;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteUuid id() {
+ return id;
+ }
+
+ /** */
+ public UUID requestId() {
+ return reqId;
+ }
+
+ /** */
+ public String key() {
+ return key;
+ }
+
+ /** */
+ public byte[] value() {
+ return valBytes;
+ }
+
+ /** */
+ public boolean isAckMessage() {
+ return false;
+ }
+
+ /** */
+ public void setActive(boolean active) {
+ this.active = active;
+ }
+
+ /** */
+ protected boolean isActive() {
+ return active;
+ }
+
+ /** {@inheritDoc} */
+ @Override @Nullable public DiscoveryCustomMessage ackMessage() {
+ return new DistributedMetaStorageUpdateAckMessage(reqId, active);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isMutable() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean stopProcess() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public DiscoCache createDiscoCache(
+ GridDiscoveryManager mgr,
+ AffinityTopologyVersion topVer,
+ DiscoCache discoCache
+ ) {
+ throw new UnsupportedOperationException("createDiscoCache");
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(DistributedMetaStorageUpdateMessage.class, this);
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageUtil.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageUtil.java
new file mode 100644
index 0000000..33c06e4
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageUtil.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.metastorage.persistence;
+
+import java.io.Serializable;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.marshaller.jdk.JdkMarshaller;
+import org.jetbrains.annotations.Nullable;
+
+/** */
+class DistributedMetaStorageUtil {
+ /**
+ * Common prefix for everything that is going to be written into {@link MetaStorage}. Something that has minimal
+ * chance of collision with the existing keys.
+ */
+ static final String COMMON_KEY_PREFIX = "\u0000";
+
+ /**
+ * Prefix for user keys to store in distributed metastorage.
+ */
+ private static final String KEY_PREFIX = "key-";
+
+ /**
+ * Key for history version.
+ */
+ private static final String HISTORY_VER_KEY = "hist-ver";
+
+ /**
+ * Prefix for history items. Each item will be stored using {@code hist-item-<ver>} key.
+ */
+ private static final String HISTORY_ITEM_KEY_PREFIX = "hist-item-";
+
+ /**
+ * Special key indicating that local data for distributied metastorage is inconsistent because of the ungoing
+ * update/recovery process. Data associated with the key may be ignored.
+ */
+ private static final String CLEANUP_GUARD_KEY = "clean";
+
+ /** */
+ @Nullable public static byte[] marshal(Serializable val) throws IgniteCheckedException {
+ return val == null ? null : JdkMarshaller.DEFAULT.marshal(val);
+ }
+
+ /** */
+ @Nullable public static Serializable unmarshal(byte[] valBytes) throws IgniteCheckedException {
+ return valBytes == null ? null : JdkMarshaller.DEFAULT.unmarshal(valBytes, U.gridClassLoader());
+ }
+
+ /** */
+ public static String localKey(String globalKey) {
+ return localKeyPrefix() + globalKey;
+ }
+
+ /** */
+ public static String globalKey(String locKey) {
+ assert locKey.startsWith(localKeyPrefix()) : locKey;
+
+ return locKey.substring(localKeyPrefix().length());
+ }
+
+ /** */
+ public static String localKeyPrefix() {
+ return COMMON_KEY_PREFIX + KEY_PREFIX;
+ }
+
+ /** */
+ public static String historyItemKey(long ver) {
+ return historyItemPrefix() + ver;
+ }
+
+ /** */
+ public static long historyItemVer(String histItemKey) {
+ assert histItemKey.startsWith(historyItemPrefix());
+
+ return Long.parseLong(histItemKey.substring(historyItemPrefix().length()));
+ }
+
+ /** */
+ public static String historyItemPrefix() {
+ return COMMON_KEY_PREFIX + HISTORY_ITEM_KEY_PREFIX;
+ }
+
+ /** */
+ public static String historyVersionKey() {
+ return COMMON_KEY_PREFIX + HISTORY_VER_KEY;
+ }
+
+ /** */
+ public static String cleanupGuardKey() {
+ return COMMON_KEY_PREFIX + CLEANUP_GUARD_KEY;
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageVersion.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageVersion.java
new file mode 100644
index 0000000..7ed775e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageVersion.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.metastorage.persistence;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.function.LongFunction;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/** Version class for distributed metastorage. */
+class DistributedMetaStorageVersion implements Serializable {
+ /** Serial version UID. */
+ private static final long serialVersionUID = 0L;
+
+ /** Version with id "0". */
+ public static final DistributedMetaStorageVersion INITIAL_VERSION = new DistributedMetaStorageVersion(0L, 1L);
+
+ /** Incremental rehashing considering new update information. */
+ private static long nextHash(long hash, DistributedMetaStorageHistoryItem update) {
+ return hash * 31L + ((long)update.key.hashCode() << 32) + Arrays.hashCode(update.valBytes);
+ }
+
+ /**
+ * Id is basically a total number of distributed metastorage updates in current cluster.
+ * Increases incrementally on every update starting with zero.
+ *
+ * @see #INITIAL_VERSION
+ */
+ @GridToStringInclude
+ public final long id;
+
+ /**
+ * Hash of the whole updates list. Hashing algorinthm is almost the same as in {@link List#hashCode()}, but with
+ * {@code long} value instead of {@code int}.
+ */
+ @GridToStringInclude
+ public final long hash;
+
+ /**
+ * Constructor with all fields.
+ *
+ * @param id Id.
+ * @param hash Hash.
+ */
+ private DistributedMetaStorageVersion(long id, long hash) {
+ this.id = id;
+ this.hash = hash;
+ }
+
+ /**
+ * Calculate next version considering passed update information.
+ *
+ * @param update Single update.
+ * @return Next version.
+ */
+ public DistributedMetaStorageVersion nextVersion(DistributedMetaStorageHistoryItem update) {
+ return new DistributedMetaStorageVersion(id + 1, nextHash(hash, update));
+ }
+
+ /**
+ * Calculate next version considering passed update information.
+ *
+ * @param updates Updates collection.
+ * @return Next version.
+ */
+ public DistributedMetaStorageVersion nextVersion(Collection<DistributedMetaStorageHistoryItem> updates) {
+ long hash = this.hash;
+
+ for (DistributedMetaStorageHistoryItem update : updates)
+ hash = nextHash(hash, update);
+
+ return new DistributedMetaStorageVersion(id + updates.size(), hash);
+ }
+
+ /**
+ * Calculate next version considering passed update information.
+ *
+ * @param updates Updates array.
+ * @param fromIdx Index of the first required update in the array.
+ * @param toIdx Index after the last required update in the array.
+ * @return Next version.
+ */
+ public DistributedMetaStorageVersion nextVersion(
+ DistributedMetaStorageHistoryItem[] updates,
+ int fromIdx,
+ int toIdx // exclusive
+ ) {
+ long hash = this.hash;
+
+ for (int idx = fromIdx; idx < toIdx; idx++)
+ hash = nextHash(hash, updates[idx]);
+
+ return new DistributedMetaStorageVersion(id + toIdx - fromIdx, hash);
+ }
+
+ /**
+ * Calculate next version considering passed update information.
+ *
+ * @param update Function that provides the update by specific version.
+ * @param fromVer Starting version, inclusive.
+ * @param toVer Ending version, inclusive.
+ * @return Next version.
+ */
+ public DistributedMetaStorageVersion nextVersion(
+ LongFunction<DistributedMetaStorageHistoryItem> update,
+ long fromVer,
+ long toVer // inclusive
+ ) {
+ assert fromVer <= toVer;
+
+ long hash = this.hash;
+
+ for (long idx = fromVer; idx <= toVer; idx++)
+ hash = nextHash(hash, update.apply(idx));
+
+ return new DistributedMetaStorageVersion(id + toVer + 1 - fromVer, hash);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ DistributedMetaStorageVersion ver = (DistributedMetaStorageVersion)o;
+
+ return id == ver.id && hash == ver.hash;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return 31 * Long.hashCode(id) + Long.hashCode(hash);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(DistributedMetaStorageVersion.class, this);
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/EmptyDistributedMetaStorageBridge.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/EmptyDistributedMetaStorageBridge.java
new file mode 100644
index 0000000..2043c55
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/EmptyDistributedMetaStorageBridge.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.metastorage.persistence;
+
+import java.io.Serializable;
+import java.util.function.BiConsumer;
+
+/**
+ * Empty metastorage is the specific implementation to be used in in-memory clusters to have distributed metastorage
+ * without any data until cluster is activated.
+ */
+class EmptyDistributedMetaStorageBridge implements DistributedMetaStorageBridge {
+ /** {@inheritDoc} */
+ @Override public Serializable read(String globalKey, boolean unmarshal) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void iterate(
+ String globalKeyPrefix,
+ BiConsumer<String, ? super Serializable> cb,
+ boolean unmarshal
+ ) {
+ }
+
+ /** {@inheritDoc} */
+ @Override public void write(String globalKey, byte[] valBytes) {
+ throw new UnsupportedOperationException("write");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onUpdateMessage(
+ DistributedMetaStorageHistoryItem histItem,
+ Serializable val,
+ boolean notifyListeners
+ ) {
+ throw new UnsupportedOperationException("onUpdateMessage");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void removeHistoryItem(long ver) {
+ throw new UnsupportedOperationException("removeHistoryItem");
+ }
+
+ /** {@inheritDoc} */
+ @Override public DistributedMetaStorageHistoryItem[] localFullData() {
+ return DistributedMetaStorageHistoryItem.EMPTY_ARRAY;
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/InMemoryCachedDistributedMetaStorageBridge.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/InMemoryCachedDistributedMetaStorageBridge.java
new file mode 100644
index 0000000..1ff14c4
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/InMemoryCachedDistributedMetaStorageBridge.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.metastorage.persistence;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.function.BiConsumer;
+import org.apache.ignite.IgniteCheckedException;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUtil.unmarshal;
+
+/** */
+class InMemoryCachedDistributedMetaStorageBridge implements DistributedMetaStorageBridge {
+ /** */
+ private DistributedMetaStorageImpl dms;
+
+ /** */
+ private final Map<String, byte[]> cache = new ConcurrentSkipListMap<>();
+
+ /** */
+ public InMemoryCachedDistributedMetaStorageBridge(DistributedMetaStorageImpl dms) {
+ this.dms = dms;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Serializable read(String globalKey, boolean unmarshal) throws IgniteCheckedException {
+ byte[] valBytes = cache.get(globalKey);
+
+ return unmarshal ? unmarshal(valBytes) : valBytes;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void iterate(
+ String globalKeyPrefix,
+ BiConsumer<String, ? super Serializable> cb,
+ boolean unmarshal
+ ) throws IgniteCheckedException {
+ for (Map.Entry<String, byte[]> entry : cache.entrySet()) {
+ if (entry.getKey().startsWith(globalKeyPrefix))
+ cb.accept(entry.getKey(), unmarshal ? unmarshal(entry.getValue()) : entry.getValue());
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void write(String globalKey, @Nullable byte[] valBytes) {
+ if (valBytes == null)
+ cache.remove(globalKey);
+ else
+ cache.put(globalKey, valBytes);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onUpdateMessage(
+ DistributedMetaStorageHistoryItem histItem,
+ Serializable val,
+ boolean notifyListeners
+ ) throws IgniteCheckedException {
+ dms.ver = dms.ver.nextVersion(histItem);
+
+ if (notifyListeners)
+ dms.notifyListeners(histItem.key, read(histItem.key, true), val);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void removeHistoryItem(long ver) {
+ }
+
+ /** {@inheritDoc} */
+ @Override public DistributedMetaStorageHistoryItem[] localFullData() {
+ return cache.entrySet().stream().map(
+ entry -> new DistributedMetaStorageHistoryItem(entry.getKey(), entry.getValue())
+ ).toArray(DistributedMetaStorageHistoryItem[]::new);
+ }
+
+ /** */
+ public void restore(StartupExtras startupExtras) {
+ if (startupExtras.fullNodeData != null) {
+ DistributedMetaStorageClusterNodeData fullNodeData = startupExtras.fullNodeData;
+
+ dms.ver = fullNodeData.ver;
+
+ for (DistributedMetaStorageHistoryItem item : fullNodeData.fullData)
+ cache.put(item.key, item.valBytes);
+
+ for (int i = 0, len = fullNodeData.hist.length; i < len; i++) {
+ DistributedMetaStorageHistoryItem histItem = fullNodeData.hist[i];
+
+ dms.addToHistoryCache(dms.ver.id + i + 1 - len, histItem);
+ }
+ }
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/NotAvailableDistributedMetaStorageBridge.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/NotAvailableDistributedMetaStorageBridge.java
new file mode 100644
index 0000000..5417c2c
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/NotAvailableDistributedMetaStorageBridge.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.metastorage.persistence;
+
+import java.io.Serializable;
+import java.util.function.BiConsumer;
+
+/** */
+class NotAvailableDistributedMetaStorageBridge implements DistributedMetaStorageBridge {
+ /** {@inheritDoc} */
+ @Override public Serializable read(String globalKey, boolean unmarshal) {
+ throw new UnsupportedOperationException("read");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void iterate(
+ String globalKeyPre,
+ BiConsumer<String, ? super Serializable> cb,
+ boolean unmarshal
+ ) {
+ throw new UnsupportedOperationException("iterate");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void write(String globalKey, byte[] valBytes) {
+ throw new UnsupportedOperationException("write");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onUpdateMessage(
+ DistributedMetaStorageHistoryItem histItem,
+ Serializable val,
+ boolean notifyListeners
+ ) {
+ throw new UnsupportedOperationException("onUpdateMessage");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void removeHistoryItem(long ver) {
+ throw new UnsupportedOperationException("removeHistoryItem");
+ }
+
+ /** {@inheritDoc} */
+ @Override public DistributedMetaStorageHistoryItem[] localFullData() {
+ throw new UnsupportedOperationException("localFullData");
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/ReadOnlyDistributedMetaStorageBridge.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/ReadOnlyDistributedMetaStorageBridge.java
new file mode 100644
index 0000000..84c955d
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/ReadOnlyDistributedMetaStorageBridge.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.metastorage.persistence;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.function.BiConsumer;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadOnlyMetastorage;
+
+import static org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageHistoryItem.EMPTY_ARRAY;
+import static org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUtil.cleanupGuardKey;
+import static org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUtil.globalKey;
+import static org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUtil.historyItemKey;
+import static org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUtil.historyVersionKey;
+import static org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUtil.localKey;
+import static org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUtil.localKeyPrefix;
+import static org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUtil.unmarshal;
+
+/** */
+class ReadOnlyDistributedMetaStorageBridge implements DistributedMetaStorageBridge {
+ /** */
+ private static final Comparator<DistributedMetaStorageHistoryItem> HISTORY_ITEM_KEY_COMPARATOR =
+ Comparator.comparing(item -> item.key);
+
+ /** */
+ private DistributedMetaStorageHistoryItem[] locFullData;
+
+ /** */
+ private DistributedMetaStorageVersion ver;
+
+ /** */
+ public ReadOnlyDistributedMetaStorageBridge() {
+ }
+
+ /** */
+ public ReadOnlyDistributedMetaStorageBridge(
+ DistributedMetaStorageHistoryItem[] locFullData
+ ) {
+ this.locFullData = locFullData;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Serializable read(String globalKey, boolean unmarshal) throws IgniteCheckedException {
+ int idx = Arrays.binarySearch(
+ locFullData,
+ new DistributedMetaStorageHistoryItem(globalKey, null),
+ HISTORY_ITEM_KEY_COMPARATOR
+ );
+
+ if (idx >= 0)
+ return unmarshal ? unmarshal(locFullData[idx].valBytes) : locFullData[idx].valBytes;
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void iterate(
+ String globalKeyPrefix,
+ BiConsumer<String, ? super Serializable> cb,
+ boolean unmarshal
+ ) throws IgniteCheckedException {
+ int idx = Arrays.binarySearch(
+ locFullData,
+ new DistributedMetaStorageHistoryItem(globalKeyPrefix, null),
+ HISTORY_ITEM_KEY_COMPARATOR
+ );
+
+ if (idx < 0)
+ idx = -1 - idx;
+
+ for (; idx < locFullData.length && locFullData[idx].key.startsWith(globalKeyPrefix); ++idx) {
+ DistributedMetaStorageHistoryItem item = locFullData[idx];
+
+ cb.accept(item.key, unmarshal ? unmarshal(item.valBytes) : item.valBytes);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void write(String globalKey, byte[] valBytes) {
+ throw new UnsupportedOperationException("write");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onUpdateMessage(
+ DistributedMetaStorageHistoryItem histItem,
+ Serializable val,
+ boolean notifyListeners
+ ) {
+ throw new UnsupportedOperationException("onUpdateMessage");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void removeHistoryItem(long ver) {
+ throw new UnsupportedOperationException("removeHistoryItem");
+ }
+
+ /** {@inheritDoc} */
+ @Override public DistributedMetaStorageHistoryItem[] localFullData() {
+ return locFullData;
+ }
+
+ /** */
+ public DistributedMetaStorageVersion version() {
+ return ver;
+ }
+
+ /** */
+ public DistributedMetaStorageVersion readInitialData(
+ ReadOnlyMetastorage metastorage,
+ StartupExtras startupExtras
+ ) throws IgniteCheckedException {
+ if (metastorage.readRaw(cleanupGuardKey()) != null) {
+ ver = DistributedMetaStorageVersion.INITIAL_VERSION;
+
+ locFullData = EMPTY_ARRAY;
+
+ return ver;
+ }
+ else {
+ DistributedMetaStorageVersion storedVer =
+ (DistributedMetaStorageVersion)metastorage.read(historyVersionKey());
+
+ if (storedVer == null) {
+ ver = DistributedMetaStorageVersion.INITIAL_VERSION;
+
+ locFullData = EMPTY_ARRAY;
+
+ return ver;
+ }
+ else {
+ ver = storedVer;
+
+ DistributedMetaStorageHistoryItem histItem =
+ (DistributedMetaStorageHistoryItem)metastorage.read(historyItemKey(storedVer.id + 1));
+
+ DistributedMetaStorageHistoryItem[] firstToWrite = {null};
+
+ if (histItem != null) {
+ ver = storedVer.nextVersion(histItem);
+
+ startupExtras.deferredUpdates.add(histItem);
+ }
+ else {
+ histItem = (DistributedMetaStorageHistoryItem)metastorage.read(historyItemKey(storedVer.id));
+
+ if (histItem != null) {
+ byte[] valBytes = metastorage.readRaw(localKey(histItem.key));
+
+ if (!Arrays.equals(valBytes, histItem.valBytes))
+ firstToWrite[0] = histItem;
+ }
+ }
+
+ List<DistributedMetaStorageHistoryItem> locFullDataList = new ArrayList<>();
+
+ metastorage.iterate(
+ localKeyPrefix(),
+ (key, val) -> {
+ String globalKey = globalKey(key);
+
+ if (firstToWrite[0] != null && firstToWrite[0].key.equals(globalKey)) {
+ if (firstToWrite[0].valBytes != null)
+ locFullDataList.add(firstToWrite[0]);
+
+ firstToWrite[0] = null;
+ }
+ else if (firstToWrite[0] != null && firstToWrite[0].key.compareTo(globalKey) < 0) {
+ if (firstToWrite[0].valBytes != null)
+ locFullDataList.add(firstToWrite[0]);
+
+ firstToWrite[0] = null;
+
+ locFullDataList.add(new DistributedMetaStorageHistoryItem(globalKey, (byte[])val));
+ }
+ else
+ locFullDataList.add(new DistributedMetaStorageHistoryItem(globalKey, (byte[])val));
+ },
+ false
+ );
+
+ if (firstToWrite[0] != null && firstToWrite[0].valBytes != null) {
+ locFullDataList.add(
+ new DistributedMetaStorageHistoryItem(firstToWrite[0].key, firstToWrite[0].valBytes)
+ );
+ }
+
+ locFullData = locFullDataList.toArray(EMPTY_ARRAY);
+
+ return storedVer;
+ }
+ }
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/ReadWriteMetastorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/StartupExtras.java
similarity index 64%
copy from modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/ReadWriteMetastorage.java
copy to modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/StartupExtras.java
index ab48afc..58f8f10 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/ReadWriteMetastorage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/StartupExtras.java
@@ -14,19 +14,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.cache.persistence.metastorage;
-import java.io.Serializable;
-import org.apache.ignite.IgniteCheckedException;
-import org.jetbrains.annotations.NotNull;
+package org.apache.ignite.internal.processors.metastorage.persistence;
-/**
- *
- */
-public interface ReadWriteMetastorage extends ReadOnlyMetastorage {
+import java.util.ArrayList;
+import java.util.List;
+
+/** */
+@SuppressWarnings("PublicField")
+class StartupExtras {
/** */
- public void write(@NotNull String key, @NotNull Serializable val) throws IgniteCheckedException;
+ public List<DistributedMetaStorageHistoryItem> deferredUpdates = new ArrayList<>();
/** */
- public void remove(@NotNull String key) throws IgniteCheckedException;
+ public DistributedMetaStorageClusterNodeData fullNodeData;
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/WritableDistributedMetaStorageBridge.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/WritableDistributedMetaStorageBridge.java
new file mode 100644
index 0000000..37c6181
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/WritableDistributedMetaStorageBridge.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.metastorage.persistence;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.function.BiConsumer;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadWriteMetastorage;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageHistoryItem.EMPTY_ARRAY;
+import static org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUtil.COMMON_KEY_PREFIX;
+import static org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUtil.cleanupGuardKey;
+import static org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUtil.globalKey;
+import static org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUtil.historyItemKey;
+import static org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUtil.historyVersionKey;
+import static org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUtil.localKey;
+import static org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUtil.localKeyPrefix;
+
+/** */
+class WritableDistributedMetaStorageBridge implements DistributedMetaStorageBridge {
+ /** */
+ private static final byte[] DUMMY_VALUE = {};
+
+ /** */
+ private final DistributedMetaStorageImpl dms;
+
+ /** */
+ private final ReadWriteMetastorage metastorage;
+
+ /** */
+ public WritableDistributedMetaStorageBridge(DistributedMetaStorageImpl dms, ReadWriteMetastorage metastorage) {
+ this.dms = dms;
+ this.metastorage = metastorage;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Serializable read(String globalKey, boolean unmarshal) throws IgniteCheckedException {
+ return unmarshal ? metastorage.read(localKey(globalKey)) : metastorage.readRaw(localKey(globalKey));
+ }
+
+ /** {@inheritDoc} */
+ @Override public void iterate(
+ String globalKeyPrefix,
+ BiConsumer<String, ? super Serializable> cb,
+ boolean unmarshal
+ ) throws IgniteCheckedException {
+ metastorage.iterate(
+ localKeyPrefix() + globalKeyPrefix,
+ (key, val) -> cb.accept(globalKey(key), val),
+ unmarshal
+ );
+ }
+
+ /** {@inheritDoc} */
+ @Override public void write(String globalKey, @Nullable byte[] valBytes) throws IgniteCheckedException {
+ if (valBytes == null)
+ metastorage.remove(localKey(globalKey));
+ else
+ metastorage.writeRaw(localKey(globalKey), valBytes);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onUpdateMessage(
+ DistributedMetaStorageHistoryItem histItem,
+ Serializable val,
+ boolean notifyListeners
+ ) throws IgniteCheckedException {
+ metastorage.write(historyItemKey(dms.ver.id + 1), histItem);
+
+ dms.ver = dms.ver.nextVersion(histItem);
+
+ metastorage.write(historyVersionKey(), dms.ver);
+
+ if (notifyListeners)
+ dms.notifyListeners(histItem.key, read(histItem.key, true), val);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void removeHistoryItem(long ver) throws IgniteCheckedException {
+ metastorage.remove(historyItemKey(ver));
+ }
+
+ /** {@inheritDoc} */
+ @Override public DistributedMetaStorageHistoryItem[] localFullData() throws IgniteCheckedException {
+ List<DistributedMetaStorageHistoryItem> locFullData = new ArrayList<>();
+
+ metastorage.iterate(
+ localKeyPrefix(),
+ (key, val) -> locFullData.add(new DistributedMetaStorageHistoryItem(globalKey(key), (byte[])val)),
+ false
+ );
+
+ return locFullData.toArray(EMPTY_ARRAY);
+ }
+
+ /** */
+ public void restore(StartupExtras startupExtras) throws IgniteCheckedException {
+ assert startupExtras != null;
+
+ String cleanupGuardKey = cleanupGuardKey();
+
+ if (metastorage.readRaw(cleanupGuardKey) != null || startupExtras.fullNodeData != null) {
+ metastorage.writeRaw(cleanupGuardKey, DUMMY_VALUE);
+
+ Set<String> allKeys = new HashSet<>();
+
+ metastorage.iterate(COMMON_KEY_PREFIX, (key, val) -> allKeys.add(key), false);
+
+ for (String key : allKeys)
+ metastorage.remove(key);
+
+ if (startupExtras.fullNodeData != null) {
+ DistributedMetaStorageClusterNodeData fullNodeData = startupExtras.fullNodeData;
+
+ dms.ver = fullNodeData.ver;
+
+ dms.clearHistoryCache();
+
+ for (DistributedMetaStorageHistoryItem item : fullNodeData.fullData)
+ metastorage.writeRaw(localKey(item.key), item.valBytes);
+
+ for (int i = 0, len = fullNodeData.hist.length; i < len; i++) {
+ DistributedMetaStorageHistoryItem histItem = fullNodeData.hist[i];
+
+ long histItemVer = dms.ver.id + i + 1 - len;
+
+ metastorage.write(historyItemKey(histItemVer), histItem);
+
+ dms.addToHistoryCache(histItemVer, histItem);
+ }
+
+ metastorage.write(historyVersionKey(), dms.ver);
+ }
+
+ metastorage.remove(cleanupGuardKey);
+ }
+
+ DistributedMetaStorageVersion storedVer = (DistributedMetaStorageVersion)metastorage.read(historyVersionKey());
+
+ if (storedVer == null)
+ metastorage.write(historyVersionKey(), DistributedMetaStorageVersion.INITIAL_VERSION);
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/subscription/GridInternalSubscriptionProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/subscription/GridInternalSubscriptionProcessor.java
index 6db7fa5..5e48547 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/subscription/GridInternalSubscriptionProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/subscription/GridInternalSubscriptionProcessor.java
@@ -22,6 +22,7 @@ import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
import org.apache.ignite.internal.processors.cache.persistence.DatabaseLifecycleListener;
import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetastorageLifecycleListener;
+import org.apache.ignite.internal.processors.metastorage.DistributedMetastorageLifecycleListener;
import org.jetbrains.annotations.NotNull;
/**
@@ -34,10 +35,13 @@ import org.jetbrains.annotations.NotNull;
*/
public class GridInternalSubscriptionProcessor extends GridProcessorAdapter {
/** */
- private List<MetastorageLifecycleListener> metastorageListeners = new ArrayList<>();
+ private final List<MetastorageLifecycleListener> metastorageListeners = new ArrayList<>();
/** */
- private List<DatabaseLifecycleListener> databaseListeners = new ArrayList<>();
+ private final List<DistributedMetastorageLifecycleListener> distributedMetastorageListeners = new ArrayList<>();
+
+ /** */
+ private final List<DatabaseLifecycleListener> dbListeners = new ArrayList<>();
/**
@@ -61,15 +65,28 @@ public class GridInternalSubscriptionProcessor extends GridProcessorAdapter {
}
/** */
+ public void registerDistributedMetastorageListener(@NotNull DistributedMetastorageLifecycleListener lsnr) {
+ if (lsnr == null)
+ throw new NullPointerException("Global metastorage subscriber should be not-null.");
+
+ distributedMetastorageListeners.add(lsnr);
+ }
+
+ /** */
+ public List<DistributedMetastorageLifecycleListener> getDistributedMetastorageSubscribers() {
+ return distributedMetastorageListeners;
+ }
+
+ /** */
public void registerDatabaseListener(@NotNull DatabaseLifecycleListener databaseListener) {
if (databaseListener == null)
throw new NullPointerException("Database subscriber should be not-null.");
- databaseListeners.add(databaseListener);
+ dbListeners.add(databaseListener);
}
/** */
public List<DatabaseLifecycleListener> getDatabaseListeners() {
- return databaseListeners;
+ return dbListeners;
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/marshaller/jdk/JdkMarshaller.java b/modules/core/src/main/java/org/apache/ignite/marshaller/jdk/JdkMarshaller.java
index 742925d..8f7edd1 100644
--- a/modules/core/src/main/java/org/apache/ignite/marshaller/jdk/JdkMarshaller.java
+++ b/modules/core/src/main/java/org/apache/ignite/marshaller/jdk/JdkMarshaller.java
@@ -68,6 +68,9 @@ import org.jetbrains.annotations.Nullable;
* For information about Spring framework visit <a href="http://www.springframework.org/">www.springframework.org</a>
*/
public class JdkMarshaller extends AbstractNodeNameAwareMarshaller {
+ /** Default instance. */
+ public static final JdkMarshaller DEFAULT = new JdkMarshaller();
+
/** Class name filter. */
private final IgnitePredicate<String> clsFilter;
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/encryption/EncryptedCacheDestroyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/encryption/EncryptedCacheDestroyTest.java
index cfbe642..a204297 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/encryption/EncryptedCacheDestroyTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/encryption/EncryptedCacheDestroyTest.java
@@ -123,11 +123,11 @@ public class EncryptedCacheDestroyTest extends AbstractEncryptionTest {
if (keyShouldBeEmpty) {
assertNull(encKey);
- assertNull(metaStore.getData(ENCRYPTION_KEY_PREFIX + grpId));
+ assertNull(metaStore.readRaw(ENCRYPTION_KEY_PREFIX + grpId));
} else {
assertNotNull(encKey);
- assertNotNull(metaStore.getData(ENCRYPTION_KEY_PREFIX + grpId));
+ assertNotNull(metaStore.readRaw(ENCRYPTION_KEY_PREFIX + grpId));
}
}
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/IgniteMetaStorageBasicTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/IgniteMetaStorageBasicTest.java
index 32277db..d02bb5e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/IgniteMetaStorageBasicTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/IgniteMetaStorageBasicTest.java
@@ -18,6 +18,8 @@
package org.apache.ignite.internal.processors.cache.persistence.metastorage;
import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
@@ -37,6 +39,8 @@ import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.marshaller.jdk.JdkMarshaller;
+import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Assert;
import org.junit.Test;
@@ -111,7 +115,7 @@ public class IgniteMetaStorageBasicTest extends GridCommonAbstractTest {
metaStorage.remove(key);
- metaStorage.putData(key, arr/*b.toString().getBytes()*/);
+ metaStorage.writeRaw(key, arr/*b.toString().getBytes()*/);
}
}
finally {
@@ -150,7 +154,7 @@ public class IgniteMetaStorageBasicTest extends GridCommonAbstractTest {
metaStorage.remove(key);
- metaStorage.putData(key, arr);
+ metaStorage.writeRaw(key, arr);
}
}
finally {
@@ -170,7 +174,7 @@ public class IgniteMetaStorageBasicTest extends GridCommonAbstractTest {
for (Iterator<IgniteBiTuple<String, byte[]>> it = generateTestData(size, from).iterator(); it.hasNext(); ) {
IgniteBiTuple<String, byte[]> d = it.next();
- metaStorage.putData(d.getKey(), d.getValue());
+ metaStorage.writeRaw(d.getKey(), d.getValue());
res.put(d.getKey(), d.getValue());
}
@@ -314,7 +318,7 @@ public class IgniteMetaStorageBasicTest extends GridCommonAbstractTest {
try {
for (Map.Entry<String, byte[]> v : testData.entrySet())
- metaStorage.putData(v.getKey(), v.getValue());
+ metaStorage.writeRaw(v.getKey(), v.getValue());
}
finally {
db.checkpointReadUnlock();
@@ -483,6 +487,69 @@ public class IgniteMetaStorageBasicTest extends GridCommonAbstractTest {
verifyKeys(grid(1), KEYS_CNT, KEY_PREFIX, UPDATED_VAL_PREFIX);
}
+ /**
+ * @throws Exception If fails.
+ */
+ @Test
+ public void testReadOnlyIterationOrder() throws Exception {
+ IgniteEx ignite = startGrid(0);
+
+ ignite.cluster().active(true);
+
+ MetaStorage storage = ignite.context().cache().context().database().metaStorage();
+
+ ignite.context().cache().context().database().checkpointReadLock();
+
+ try {
+ storage.write("a", 0);
+
+ storage.write("z", 0);
+
+ storage.write("pref-1", 1);
+
+ storage.write("pref-3", 3);
+
+ storage.write("pref-5", 5);
+
+ storage.write("pref-7", 7);
+
+ GridTestUtils.setFieldValue(storage, "readOnly", true);
+
+ storage.applyUpdate("pref-0", JdkMarshaller.DEFAULT.marshal(0));
+
+ storage.applyUpdate("pref-1", JdkMarshaller.DEFAULT.marshal(10));
+
+ storage.applyUpdate("pref-4", JdkMarshaller.DEFAULT.marshal(4));
+
+ storage.applyUpdate("pref-5", null);
+
+ storage.applyUpdate("pref-8", JdkMarshaller.DEFAULT.marshal(8));
+
+ List<String> keys = new ArrayList<>();
+
+ List<Integer> values = new ArrayList<>();
+
+ storage.iterate("pref", (key, val) -> {
+ keys.add(key);
+
+ values.add((Integer)val);
+ }, true);
+
+ assertEqualsCollections(
+ Arrays.asList("pref-0", "pref-1", "pref-3", "pref-4", "pref-7", "pref-8"),
+ keys
+ );
+
+ assertEqualsCollections(
+ Arrays.asList(0, 10, 3, 4, 7, 8),
+ values
+ );
+ }
+ finally {
+ ignite.context().cache().context().database().checkpointReadUnlock();
+ }
+ }
+
/** */
private void loadKeys(IgniteEx ig,
byte keysCnt,
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStoragePersistentTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStoragePersistentTest.java
new file mode 100644
index 0000000..d3c6cc3
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStoragePersistentTest.java
@@ -0,0 +1,698 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.metastorage;
+
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage;
+import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageImpl;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_GLOBAL_METASTORAGE_HISTORY_MAX_BYTES;
+
+/**
+ * Test for {@link DistributedMetaStorageImpl} with enabled persistence.
+ */
+@RunWith(JUnit4.class)
+public class DistributedMetaStoragePersistentTest extends DistributedMetaStorageTest {
+ /** {@inheritDoc} */
+ @Override protected boolean isPersistent() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void before() throws Exception {
+ super.before();
+
+ cleanPersistenceDir();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void after() throws Exception {
+ super.after();
+
+ cleanPersistenceDir();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testRestart() throws Exception {
+ IgniteEx ignite = startGrid(0);
+
+ ignite.cluster().active(true);
+
+ ignite.context().distributedMetastorage().write("key", "value");
+
+ stopGrid(0);
+
+ ignite = startGrid(0);
+
+ ignite.cluster().active(true);
+
+ assertEquals("value", ignite.context().distributedMetastorage().read("key"));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testJoinDirtyNode() throws Exception {
+ IgniteEx ignite = startGrid(0);
+
+ startGrid(1);
+
+ ignite.cluster().active(true);
+
+ ignite.context().distributedMetastorage().write("key1", "value1");
+
+ stopGrid(1);
+
+ stopGrid(0);
+
+ ignite = startGrid(0);
+
+ ignite.cluster().active(true);
+
+ ignite.context().distributedMetastorage().write("key2", "value2");
+
+ IgniteEx newNode = startGrid(1);
+
+ assertEquals("value1", newNode.context().distributedMetastorage().read("key1"));
+
+ assertEquals("value2", newNode.context().distributedMetastorage().read("key2"));
+
+ assertDistributedMetastoragesAreEqual(ignite, newNode);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testJoinDirtyNodeFullData() throws Exception {
+ System.setProperty(IGNITE_GLOBAL_METASTORAGE_HISTORY_MAX_BYTES, "0");
+
+ IgniteEx ignite = startGrid(0);
+
+ startGrid(1);
+
+ ignite.cluster().active(true);
+
+ ignite.context().distributedMetastorage().write("key1", "value1");
+
+ stopGrid(1);
+
+ stopGrid(0);
+
+ ignite = startGrid(0);
+
+ ignite.cluster().active(true);
+
+ ignite.context().distributedMetastorage().write("key2", "value2");
+
+ ignite.context().distributedMetastorage().write("key3", "value3");
+
+ IgniteEx newNode = startGrid(1);
+
+ assertEquals("value1", newNode.context().distributedMetastorage().read("key1"));
+
+ assertEquals("value2", newNode.context().distributedMetastorage().read("key2"));
+
+ assertEquals("value3", newNode.context().distributedMetastorage().read("key3"));
+
+ assertDistributedMetastoragesAreEqual(ignite, newNode);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testJoinNodeWithLongerHistory() throws Exception {
+ IgniteEx ignite = startGrid(0);
+
+ startGrid(1);
+
+ ignite.cluster().active(true);
+
+ ignite.context().distributedMetastorage().write("key1", "value1");
+
+ stopGrid(1);
+
+ ignite.context().distributedMetastorage().write("key2", "value2");
+
+ stopGrid(0);
+
+ ignite = startGrid(1);
+
+ startGrid(0);
+
+ awaitPartitionMapExchange();
+
+ assertEquals("value1", ignite.context().distributedMetastorage().read("key1"));
+
+ assertEquals("value2", ignite.context().distributedMetastorage().read("key2"));
+
+ assertDistributedMetastoragesAreEqual(ignite, grid(0));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test @Ignore
+ public void testJoinNodeWithoutEnoughHistory() throws Exception {
+ System.setProperty(IGNITE_GLOBAL_METASTORAGE_HISTORY_MAX_BYTES, "0");
+
+ IgniteEx ignite = startGrid(0);
+
+ startGrid(1);
+
+ ignite.cluster().active(true);
+
+ ignite.context().distributedMetastorage().write("key1", "value1");
+
+ stopGrid(1);
+
+ ignite.context().distributedMetastorage().write("key2", "value2");
+
+ ignite.context().distributedMetastorage().write("key3", "value3");
+
+ stopGrid(0);
+
+ ignite = startGrid(1);
+
+ startGrid(0);
+
+ awaitPartitionMapExchange();
+
+ assertEquals("value1", ignite.context().distributedMetastorage().read("key1"));
+
+ assertEquals("value2", ignite.context().distributedMetastorage().read("key2"));
+
+ assertEquals("value3", ignite.context().distributedMetastorage().read("key3"));
+
+ assertDistributedMetastoragesAreEqual(ignite, grid(0));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testNamesCollision() throws Exception {
+ IgniteEx ignite = startGrid(0);
+
+ ignite.cluster().active(true);
+
+ IgniteCacheDatabaseSharedManager dbSharedMgr = ignite.context().cache().context().database();
+
+ MetaStorage locMetastorage = dbSharedMgr.metaStorage();
+
+ DistributedMetaStorage distributedMetastorage = ignite.context().distributedMetastorage();
+
+ dbSharedMgr.checkpointReadLock();
+
+ try {
+ locMetastorage.write("key", "localValue");
+ }
+ finally {
+ dbSharedMgr.checkpointReadUnlock();
+ }
+
+ distributedMetastorage.write("key", "globalValue");
+
+ dbSharedMgr.checkpointReadLock();
+
+ try {
+ assertEquals("localValue", locMetastorage.read("key"));
+ }
+ finally {
+ dbSharedMgr.checkpointReadUnlock();
+ }
+
+ assertEquals("globalValue", distributedMetastorage.read("key"));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testUnstableTopology() throws Exception {
+ int cnt = 8;
+
+ startGridsMultiThreaded(cnt);
+
+ grid(0).cluster().active(true);
+
+ stopGrid(0);
+
+ startGrid(0);
+
+ AtomicInteger gridIdxCntr = new AtomicInteger(0);
+
+ AtomicBoolean stop = new AtomicBoolean();
+
+ IgniteInternalFuture<?> fut = multithreadedAsync(() -> {
+ int gridIdx = gridIdxCntr.incrementAndGet();
+
+ try {
+ while (!stop.get()) {
+ stopGrid(gridIdx, true);
+
+ Thread.sleep(10L);
+
+ startGrid(gridIdx);
+
+ Thread.sleep(10L);
+ }
+ }
+ catch (Exception e) {
+ log.error(e.getMessage(), e);
+ }
+ }, cnt - 1);
+
+ long start = System.currentTimeMillis();
+
+ long duration = GridTestUtils.SF.applyLB(30_000, 5_000);
+
+ try {
+ for (int i = 0; System.currentTimeMillis() < start + duration; i++) {
+ metastorage(0).write(
+ "key" + i, Integer.toString(ThreadLocalRandom.current().nextInt(1000))
+ );
+ }
+ }
+ finally {
+ stop.set(true);
+
+ fut.get();
+ }
+
+ awaitPartitionMapExchange();
+
+ Thread.sleep(3_000L); // Remove later.
+
+ for (int i = 0; i < cnt; i++) {
+ DistributedMetaStorage distributedMetastorage = metastorage(i);
+
+ assertNull(U.field(distributedMetastorage, "startupExtras"));
+ }
+
+ for (int i = 1; i < cnt; i++)
+ assertDistributedMetastoragesAreEqual(grid(0), grid(i));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testWrongStartOrder1() throws Exception {
+ System.setProperty(IGNITE_GLOBAL_METASTORAGE_HISTORY_MAX_BYTES, "0");
+
+ int cnt = 4;
+
+ startGridsMultiThreaded(cnt);
+
+ grid(0).cluster().active(true);
+
+ metastorage(2).write("key1", "value1");
+
+ stopGrid(2);
+
+ metastorage(1).write("key2", "value2");
+
+ stopGrid(1);
+
+ metastorage(0).write("key3", "value3");
+
+ stopGrid(0);
+
+ metastorage(3).write("key4", "value4");
+
+ stopGrid(3);
+
+
+ for (int i = 0; i < cnt; i++)
+ startGrid(i);
+
+ awaitPartitionMapExchange();
+
+ for (int i = 1; i < cnt; i++)
+ assertDistributedMetastoragesAreEqual(grid(0), grid(i));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testWrongStartOrder2() throws Exception {
+ System.setProperty(IGNITE_GLOBAL_METASTORAGE_HISTORY_MAX_BYTES, "0");
+
+ int cnt = 6;
+
+ startGridsMultiThreaded(cnt);
+
+ grid(0).cluster().active(true);
+
+ metastorage(4).write("key1", "value1");
+
+ stopGrid(4);
+
+ metastorage(3).write("key2", "value2");
+
+ stopGrid(3);
+
+ metastorage(0).write("key3", "value3");
+
+ stopGrid(0);
+
+ stopGrid(2);
+
+ metastorage(1).write("key4", "value4");
+
+ stopGrid(1);
+
+ metastorage(5).write("key5", "value5");
+
+ stopGrid(5);
+
+
+ startGrid(1);
+
+ startGrid(0);
+
+ stopGrid(1);
+
+ for (int i = 1; i < cnt; i++)
+ startGrid(i);
+
+ awaitPartitionMapExchange();
+
+ for (int i = 1; i < cnt; i++)
+ assertDistributedMetastoragesAreEqual(grid(0), grid(i));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testWrongStartOrder3() throws Exception {
+ System.setProperty(IGNITE_GLOBAL_METASTORAGE_HISTORY_MAX_BYTES, "0");
+
+ int cnt = 5;
+
+ startGridsMultiThreaded(cnt);
+
+ grid(0).cluster().active(true);
+
+ metastorage(3).write("key1", "value1");
+
+ stopGrid(3);
+
+ stopGrid(0);
+
+ metastorage(2).write("key2", "value2");
+
+ stopGrid(2);
+
+ metastorage(1).write("key3", "value3");
+
+ stopGrid(1);
+
+ metastorage(4).write("key4", "value4");
+
+ stopGrid(4);
+
+
+ startGrid(1);
+
+ startGrid(0);
+
+ stopGrid(1);
+
+ for (int i = 1; i < cnt; i++)
+ startGrid(i);
+
+ awaitPartitionMapExchange();
+
+ for (int i = 1; i < cnt; i++)
+ assertDistributedMetastoragesAreEqual(grid(0), grid(i));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testWrongStartOrder4() throws Exception {
+ System.setProperty(IGNITE_GLOBAL_METASTORAGE_HISTORY_MAX_BYTES, "0");
+
+ int cnt = 6;
+
+ startGridsMultiThreaded(cnt);
+
+ grid(0).cluster().active(true);
+
+ metastorage(4).write("key1", "value1");
+
+ stopGrid(4);
+
+ stopGrid(0);
+
+ metastorage(3).write("key2", "value2");
+
+ stopGrid(3);
+
+ metastorage(2).write("key3", "value3");
+
+ stopGrid(2);
+
+ metastorage(1).write("key4", "value4");
+
+ stopGrid(1);
+
+ metastorage(5).write("key5", "value5");
+
+ stopGrid(5);
+
+
+ startGrid(2);
+
+ startGrid(0);
+
+ stopGrid(2);
+
+ for (int i = 1; i < cnt; i++)
+ startGrid(i);
+
+ awaitPartitionMapExchange();
+
+ for (int i = 1; i < cnt; i++)
+ assertDistributedMetastoragesAreEqual(grid(0), grid(i));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test @SuppressWarnings("ThrowableNotThrown")
+ public void testInactiveClusterWrite() throws Exception {
+ startGrid(0);
+
+ GridTestUtils.assertThrowsAnyCause(log, () -> {
+ metastorage(0).write("key", "value");
+
+ return null;
+ }, IllegalStateException.class, "Ignite cluster is not active");
+
+ GridTestUtils.assertThrowsAnyCause(log, () -> {
+ metastorage(0).remove("key");
+
+ return null;
+ }, IllegalStateException.class, "Ignite cluster is not active");
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test @SuppressWarnings("ThrowableNotThrown")
+ public void testConflictingData() throws Exception {
+ startGrid(0);
+
+ startGrid(1);
+
+ grid(0).cluster().active(true);
+
+ stopGrid(0);
+
+ metastorage(1).write("key", "value1");
+
+ stopGrid(1);
+
+ startGrid(0);
+
+ grid(0).cluster().active(true);
+
+ metastorage(0).write("key", "value2");
+
+ GridTestUtils.assertThrowsAnyCause(
+ log,
+ () -> startGrid(1),
+ IgniteSpiException.class,
+ "Joining node has conflicting distributed metastorage data"
+ );
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testFailover1() throws Exception {
+ System.setProperty(IGNITE_GLOBAL_METASTORAGE_HISTORY_MAX_BYTES, "0");
+
+ startGrid(0);
+
+ startGrid(1);
+
+ grid(0).cluster().active(true);
+
+ stopGrid(1);
+
+ metastorage(0).write("key1", "val1");
+
+ metastorage(0).write("key9", "val9");
+
+ IgniteCacheDatabaseSharedManager dbSharedMgr = grid(0).context().cache().context().database();
+
+ dbSharedMgr.checkpointReadLock();
+
+ try {
+ dbSharedMgr.metaStorage().remove("\u0000key-key9");
+ }
+ finally {
+ dbSharedMgr.checkpointReadUnlock();
+ }
+
+ stopGrid(0);
+
+ startGrid(0);
+
+ startGrid(1);
+
+ awaitPartitionMapExchange();
+
+ assertEquals("val9", metastorage(1).read("key9"));
+
+ assertDistributedMetastoragesAreEqual(grid(0), grid(1));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testFailover2() throws Exception {
+ System.setProperty(IGNITE_GLOBAL_METASTORAGE_HISTORY_MAX_BYTES, "0");
+
+ startGrid(0);
+
+ startGrid(1);
+
+ grid(0).cluster().active(true);
+
+ stopGrid(1);
+
+ metastorage(0).write("key9", "val9");
+
+ metastorage(0).write("key1", "val1");
+
+ IgniteCacheDatabaseSharedManager dbSharedMgr = grid(0).context().cache().context().database();
+
+ dbSharedMgr.checkpointReadLock();
+
+ try {
+ dbSharedMgr.metaStorage().remove("\u0000key-key1");
+ }
+ finally {
+ dbSharedMgr.checkpointReadUnlock();
+ }
+
+ stopGrid(0);
+
+ startGrid(0);
+
+ startGrid(1);
+
+ awaitPartitionMapExchange();
+
+ assertEquals("val1", metastorage(1).read("key1"));
+
+ assertDistributedMetastoragesAreEqual(grid(0), grid(1));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testFailover3() throws Exception {
+ System.setProperty(IGNITE_GLOBAL_METASTORAGE_HISTORY_MAX_BYTES, "0");
+
+ startGrid(0);
+
+ startGrid(1);
+
+ grid(0).cluster().active(true);
+
+ stopGrid(1);
+
+ metastorage(0).write("key1", "val1");
+
+ metastorage(0).write("key9", "val9");
+
+ metastorage(0).write("key5", "val5");
+
+ IgniteCacheDatabaseSharedManager dbSharedMgr = grid(0).context().cache().context().database();
+
+ dbSharedMgr.checkpointReadLock();
+
+ try {
+ dbSharedMgr.metaStorage().write("\u0000key-key5", "wrong-value");
+ }
+ finally {
+ dbSharedMgr.checkpointReadUnlock();
+ }
+
+ stopGrid(0);
+
+ startGrid(0);
+
+ startGrid(1);
+
+ awaitPartitionMapExchange();
+
+ assertEquals("val5", metastorage(1).read("key5"));
+
+ assertDistributedMetastoragesAreEqual(grid(0), grid(1));
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStorageTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStorageTest.java
new file mode 100644
index 0000000..8dbd71a
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStorageTest.java
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.metastorage;
+
+import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.failure.FailureHandler;
+import org.apache.ignite.failure.StopNodeFailureHandler;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageImpl;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_GLOBAL_METASTORAGE_HISTORY_MAX_BYTES;
+
+/**
+ * Test for {@link DistributedMetaStorageImpl} with disabled persistence.
+ */
+@RunWith(JUnit4.class)
+public class DistributedMetaStorageTest extends GridCommonAbstractTest {
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ cfg.setConsistentId(igniteInstanceName);
+
+ cfg.setDataStorageConfiguration(new DataStorageConfiguration()
+ .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
+ .setPersistenceEnabled(isPersistent())
+ )
+ );
+
+ return cfg;
+ }
+
+ /**
+ * @return {@code true} for tests with persistent cluster, {@code false} otherwise.
+ */
+ protected boolean isPersistent() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected FailureHandler getFailureHandler(String igniteInstanceName) {
+ return new StopNodeFailureHandler();
+ }
+
+ /** */
+ @Before
+ public void before() throws Exception {
+ stopAllGrids();
+ }
+
+ /** */
+ @After
+ public void after() throws Exception {
+ stopAllGrids();
+
+ System.clearProperty(IGNITE_GLOBAL_METASTORAGE_HISTORY_MAX_BYTES);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testSingleNode() throws Exception {
+ IgniteEx ignite = startGrid(0);
+
+ ignite.cluster().active(true);
+
+ DistributedMetaStorage metastorage = ignite.context().distributedMetastorage();
+
+ assertNull(metastorage.read("key"));
+
+ metastorage.write("key", "value");
+
+ assertEquals("value", metastorage.read("key"));
+
+ metastorage.remove("key");
+
+ assertNull(metastorage.read("key"));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testMultipleNodes() throws Exception {
+ int cnt = 4;
+
+ startGridsMultiThreaded(cnt);
+
+ grid(0).cluster().active(true);
+
+ for (int i = 0; i < cnt; i++) {
+ String key = UUID.randomUUID().toString();
+
+ String val = UUID.randomUUID().toString();
+
+ metastorage(i).write(key, val);
+
+ for (int j = 0; j < cnt; j++)
+ assertEquals(i + " " + j, val, metastorage(j).read(key));
+ }
+
+ for (int i = 1; i < cnt; i++)
+ assertDistributedMetastoragesAreEqual(grid(0), grid(i));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testListenersOnWrite() throws Exception {
+ int cnt = 4;
+
+ startGridsMultiThreaded(cnt);
+
+ grid(0).cluster().active(true);
+
+ AtomicInteger predCntr = new AtomicInteger();
+
+ for (int i = 0; i < cnt; i++) {
+ DistributedMetaStorage metastorage = metastorage(i);
+
+ metastorage.listen(key -> key.startsWith("k"), (key, oldVal, newVal) -> {
+ assertNull(oldVal);
+
+ assertEquals("value", newVal);
+
+ predCntr.incrementAndGet();
+ });
+ }
+
+ metastorage(0).write("key", "value");
+
+ assertEquals(cnt, predCntr.get());
+
+ for (int i = 1; i < cnt; i++)
+ assertDistributedMetastoragesAreEqual(grid(0), grid(i));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testListenersOnRemove() throws Exception {
+ int cnt = 4;
+
+ startGridsMultiThreaded(cnt);
+
+ grid(0).cluster().active(true);
+
+ metastorage(0).write("key", "value");
+
+ AtomicInteger predCntr = new AtomicInteger();
+
+ for (int i = 0; i < cnt; i++) {
+ DistributedMetaStorage metastorage = metastorage(i);
+
+ metastorage.listen(key -> key.startsWith("k"), (key, oldVal, newVal) -> {
+ assertEquals("value", oldVal);
+
+ assertNull(newVal);
+
+ predCntr.incrementAndGet();
+ });
+ }
+
+ metastorage(0).remove("key");
+
+ assertEquals(cnt, predCntr.get());
+
+ for (int i = 1; i < cnt; i++)
+ assertDistributedMetastoragesAreEqual(grid(0), grid(i));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testCas() throws Exception {
+ startGrids(2);
+
+ grid(0).cluster().active(true);
+
+ assertFalse(metastorage(0).compareAndSet("key", "expVal", "newVal"));
+
+ assertNull(metastorage(0).read("key"));
+
+ assertFalse(metastorage(0).compareAndRemove("key", "expVal"));
+
+ assertTrue(metastorage(0).compareAndSet("key", null, "val1"));
+
+ assertEquals("val1", metastorage(0).read("key"));
+
+ assertFalse(metastorage(0).compareAndSet("key", null, "val2"));
+
+ assertEquals("val1", metastorage(0).read("key"));
+
+ assertTrue(metastorage(0).compareAndSet("key", "val1", "val3"));
+
+ assertEquals("val3", metastorage(0).read("key"));
+
+ assertFalse(metastorage(0).compareAndRemove("key", "val1"));
+
+ assertEquals("val3", metastorage(0).read("key"));
+
+ assertTrue(metastorage(0).compareAndRemove("key", "val3"));
+
+ assertNull(metastorage(0).read("key"));
+
+ assertDistributedMetastoragesAreEqual(grid(0), grid(1));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testJoinCleanNode() throws Exception {
+ IgniteEx ignite = startGrid(0);
+
+ ignite.cluster().active(true);
+
+ ignite.context().distributedMetastorage().write("key", "value");
+
+ IgniteEx newNode = startGrid(1);
+
+ assertEquals("value", newNode.context().distributedMetastorage().read("key"));
+
+ assertDistributedMetastoragesAreEqual(ignite, newNode);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testJoinCleanNodeFullData() throws Exception {
+ System.setProperty(IGNITE_GLOBAL_METASTORAGE_HISTORY_MAX_BYTES, "0");
+
+ IgniteEx ignite = startGrid(0);
+
+ ignite.cluster().active(true);
+
+ ignite.context().distributedMetastorage().write("key1", "value1");
+
+ ignite.context().distributedMetastorage().write("key2", "value2");
+
+ startGrid(1);
+
+ assertEquals("value1", metastorage(1).read("key1"));
+
+ assertEquals("value2", metastorage(1).read("key2"));
+
+ assertDistributedMetastoragesAreEqual(ignite, grid(1));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testDeactivateActivate() throws Exception {
+ System.setProperty(IGNITE_GLOBAL_METASTORAGE_HISTORY_MAX_BYTES, "0");
+
+ startGrid(0);
+
+ grid(0).cluster().active(true);
+
+ metastorage(0).write("key1", "value1");
+
+ metastorage(0).write("key2", "value2");
+
+ grid(0).cluster().active(false);
+
+ startGrid(1);
+
+ CountDownLatch grid1MetaStorageStartLatch = new CountDownLatch(1);
+
+ grid(1).context().internalSubscriptionProcessor().registerDistributedMetastorageListener(
+ new DistributedMetastorageLifecycleListener() {
+ @Override public void onReadyForWrite(DistributedMetaStorage metastorage) {
+ grid1MetaStorageStartLatch.countDown();
+ }
+ }
+ );
+
+ grid(0).cluster().active(true);
+
+ assertEquals("value1", metastorage(0).read("key1"));
+
+ assertEquals("value2", metastorage(0).read("key2"));
+
+ grid1MetaStorageStartLatch.await(1, TimeUnit.SECONDS);
+
+ assertDistributedMetastoragesAreEqual(grid(0), grid(1));
+ }
+
+ /**
+ * @return {@link DistributedMetaStorage} instance for i'th node.
+ */
+ protected DistributedMetaStorage metastorage(int i) {
+ return grid(i).context().distributedMetastorage();
+ }
+
+ /**
+ * Assert that two nodes have the same internal state in {@link DistributedMetaStorage}.
+ */
+ protected void assertDistributedMetastoragesAreEqual(IgniteEx ignite1, IgniteEx ignite2) throws Exception {
+ DistributedMetaStorage distributedMetastorage1 = ignite1.context().distributedMetastorage();
+
+ DistributedMetaStorage distributedMetastorage2 = ignite2.context().distributedMetastorage();
+
+ Object ver1 = U.field(distributedMetastorage1, "ver");
+
+ Object ver2 = U.field(distributedMetastorage2, "ver");
+
+ assertEquals(ver1, ver2);
+
+ Object histCache1 = U.field(distributedMetastorage1, "histCache");
+
+ Object histCache2 = U.field(distributedMetastorage2, "histCache");
+
+ assertEquals(histCache1, histCache2);
+
+ Method fullDataMtd = U.findNonPublicMethod(DistributedMetaStorageImpl.class, "localFullData");
+
+ Object[] fullData1 = (Object[])fullDataMtd.invoke(distributedMetastorage1);
+
+ Object[] fullData2 = (Object[])fullDataMtd.invoke(distributedMetastorage2);
+
+ assertEqualsCollections(Arrays.asList(fullData1), Arrays.asList(fullData2));
+
+ // Also check that arrays are sorted.
+ Arrays.sort(fullData1, Comparator.comparing(o -> U.field(o, "key")));
+
+ assertEqualsCollections(Arrays.asList(fullData1), Arrays.asList(fullData2));
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java
index 7fba2d1..8a31d96 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java
@@ -53,6 +53,8 @@ import org.apache.ignite.internal.processors.database.IgniteDbMultiNodePutGetTes
import org.apache.ignite.internal.processors.database.IgniteDbPutGetWithCacheStoreTest;
import org.apache.ignite.internal.processors.database.IgniteDbSingleNodePutGetTest;
import org.apache.ignite.internal.processors.database.IgniteDbSingleNodeTinyPutGetTest;
+import org.apache.ignite.internal.processors.metastorage.DistributedMetaStoragePersistentTest;
+import org.apache.ignite.internal.processors.metastorage.DistributedMetaStorageTest;
import org.apache.ignite.testframework.GridTestUtils;
import org.junit.runner.RunWith;
import org.junit.runners.Suite;
@@ -170,6 +172,8 @@ public class IgnitePdsTestSuite {
//MetaStorage
GridTestUtils.addTestIfNeeded(suite, IgniteMetaStorageBasicTest.class, ignoredTests);
+ GridTestUtils.addTestIfNeeded(suite, DistributedMetaStorageTest.class, ignoredTests);
+ GridTestUtils.addTestIfNeeded(suite, DistributedMetaStoragePersistentTest.class, ignoredTests);
}
/** */
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java
index 973946d..45ee140 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java
@@ -1018,7 +1018,7 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest {
sharedCtx0.database().checkpointReadLock();
try {
- storage0.putData(String.valueOf(i), new byte[] {(byte)(i % 256), 2, 3});
+ storage0.writeRaw(String.valueOf(i), new byte[] {(byte)(i % 256), 2, 3});
}
finally {
sharedCtx0.database().checkpointReadUnlock();
@@ -1032,7 +1032,7 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest {
sharedCtx1.database().checkpointReadLock();
try {
- storage1.putData(String.valueOf(i), b1);
+ storage1.writeRaw(String.valueOf(i), b1);
}
finally {
sharedCtx1.database().checkpointReadUnlock();
@@ -1040,13 +1040,13 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest {
}
for (int i = 0; i < cnt; i++) {
- byte[] d1 = storage0.getData(String.valueOf(i));
+ byte[] d1 = storage0.readRaw(String.valueOf(i));
assertEquals(3, d1.length);
assertEquals((byte)(i % 256), d1[0]);
assertEquals(2, d1[1]);
assertEquals(3, d1[2]);
- byte[] d2 = storage1.getData(String.valueOf(i));
+ byte[] d2 = storage1.readRaw(String.valueOf(i));
assertEquals(i + 3, d2.length);
assertEquals(1, d2[0]);
assertEquals(2, d2[1]);
@@ -1079,7 +1079,7 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest {
sharedCtx.database().checkpointReadLock();
try {
- storage.putData(String.valueOf(i), b1);
+ storage.writeRaw(String.valueOf(i), b1);
}
finally {
sharedCtx.database().checkpointReadUnlock();
@@ -1087,7 +1087,7 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest {
}
for (int i = 0; i < cnt; i++) {
- byte[] d2 = storage.getData(String.valueOf(i));
+ byte[] d2 = storage.readRaw(String.valueOf(i));
assertEquals(arraySize, d2.length);
for (int k = 0; k < arraySize; k++) {
@@ -1117,7 +1117,7 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest {
sharedCtx0.database().checkpointReadLock();
try {
- storage.putData(String.valueOf(i), new byte[] {1, 2, 3});
+ storage.writeRaw(String.valueOf(i), new byte[] {1, 2, 3});
}
finally {
sharedCtx0.database().checkpointReadUnlock();
@@ -1136,7 +1136,7 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest {
}
for (int i = 10; i < cnt; i++) {
- byte[] d1 = storage.getData(String.valueOf(i));
+ byte[] d1 = storage.readRaw(String.valueOf(i));
assertEquals(3, d1.length);
assertEquals(1, d1[0]);
assertEquals(2, d1[1]);
@@ -1166,7 +1166,7 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest {
sharedCtx0.database().checkpointReadLock();
try {
- storage.putData(String.valueOf(i), new byte[] {1, 2, 3});
+ storage.writeRaw(String.valueOf(i), new byte[] {1, 2, 3});
}
finally {
sharedCtx0.database().checkpointReadUnlock();
@@ -1177,7 +1177,7 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest {
sharedCtx0.database().checkpointReadLock();
try {
- storage.putData(String.valueOf(i), new byte[] {2, 2, 3, 4});
+ storage.writeRaw(String.valueOf(i), new byte[] {2, 2, 3, 4});
}
finally {
sharedCtx0.database().checkpointReadUnlock();
@@ -1185,7 +1185,7 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest {
}
for (int i = 0; i < cnt; i++) {
- byte[] d1 = storage.getData(String.valueOf(i));
+ byte[] d1 = storage.readRaw(String.valueOf(i));
assertEquals(4, d1.length);
assertEquals(2, d1[0]);
assertEquals(2, d1[1]);
@@ -1218,7 +1218,7 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest {
sharedCtx0.database().checkpointReadLock();
try {
- storage.putData(String.valueOf(i), new byte[] {1, 2, 3});
+ storage.writeRaw(String.valueOf(i), new byte[] {1, 2, 3});
}
finally {
sharedCtx0.database().checkpointReadUnlock();
@@ -1226,7 +1226,7 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest {
}
for (int i = 0; i < cnt; i++) {
- byte[] value = storage.getData(String.valueOf(i));
+ byte[] value = storage.readRaw(String.valueOf(i));
assert value != null;
assert value.length == 3;
}
@@ -1244,7 +1244,7 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest {
assert storage != null;
for (int i = 0; i < cnt; i++) {
- byte[] value = storage.getData(String.valueOf(i));
+ byte[] value = storage.readRaw(String.valueOf(i));
assert value != null;
}
}