You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ir...@apache.org on 2019/08/09 13:51:38 UTC
[ignite] branch master updated: IGNITE-9562 Destroyed cache that
resurrected on an old offline node breaks PME - Fixes #6748.
This is an automated email from the ASF dual-hosted git repository.
irakov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 27e9f70 IGNITE-9562 Destroyed cache that resurrected on an old offline node breaks PME - Fixes #6748.
27e9f70 is described below
commit 27e9f705c1f65baae20b7dc3c03e988217dbe3f6
Author: Eduard Shangareev <ed...@gmail.com>
AuthorDate: Fri Aug 9 16:50:45 2019 +0300
IGNITE-9562 Destroyed cache that resurrected on an old offline node breaks PME - Fixes #6748.
Signed-off-by: Ivan Rakov <ir...@apache.org>
---
.../cache/CacheJoinNodeDiscoveryData.java | 7 +
.../internal/processors/cache/CachesRegistry.java | 33 +-
.../processors/cache/ClusterCachesInfo.java | 70 +-
.../cache/DynamicCacheChangeRequest.java | 15 +-
.../processors/cache/GridCacheProcessor.java | 989 +++------------------
.../processors/cache/GridLocalConfigManager.java | 333 +++++++
.../cache/ValidationOnNodeJoinUtils.java | 746 ++++++++++++++++
.../distributed/CacheBaselineTopologyTest.java | 6 +-
.../persistence/IgnitePdsDestroyCacheTest.java | 3 +
.../GridMarshallerMappingConsistencyTest.java | 2 +-
.../db/IgniteCacheGroupsWithRestartsTest.java | 118 ++-
11 files changed, 1382 insertions(+), 940 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheJoinNodeDiscoveryData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheJoinNodeDiscoveryData.java
index 4ab3266..9611899 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheJoinNodeDiscoveryData.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheJoinNodeDiscoveryData.java
@@ -163,6 +163,13 @@ public class CacheJoinNodeDiscoveryData implements Serializable {
}
/**
+ * @return Long which bits represent some flags.
+ */
+ public long getFlags() {
+ return flags;
+ }
+
+ /**
* @param ois ObjectInputStream.
*/
private void readObject(ObjectInputStream ois)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachesRegistry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachesRegistry.java
index 3b906ca..6010cc1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachesRegistry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachesRegistry.java
@@ -31,6 +31,7 @@ import org.apache.ignite.failure.FailureContext;
import org.apache.ignite.failure.FailureType;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.lang.GridPlainRunnable;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.Nullable;
@@ -257,7 +258,11 @@ public class CachesRegistry {
if (cachesToPersist.isEmpty())
return cachesConfPersistFuture = new GridFinishedFuture<>();
- return cachesConfPersistFuture = persistCacheConfigurations(cachesToPersist);
+ List<StoredCacheData> cacheConfigsToPersist = cacheDescriptors.stream()
+ .map(desc -> desc.toStoredData(cctx.cache().splitter()))
+ .collect(Collectors.toList());
+
+ return cachesConfPersistFuture = persistCacheConfigurations(cacheConfigsToPersist);
}
/**
@@ -273,16 +278,12 @@ public class CachesRegistry {
}
/**
- * Persists cache configurations from given {@code cacheDescriptors}.
+ * Persists cache configurations.
*
- * @param cacheDescriptors Cache descriptors to retrieve cache configurations.
+ * @param cacheConfigsToPersist Cache configurations to persist.
* @return Future that will be completed when all cache configurations will be persisted to cache work directory.
*/
- private IgniteInternalFuture<?> persistCacheConfigurations(List<DynamicCacheDescriptor> cacheDescriptors) {
- List<StoredCacheData> cacheConfigsToPersist = cacheDescriptors.stream()
- .map(desc -> desc.toStoredData(cctx.cache().splitter()))
- .collect(Collectors.toList());
-
+ private IgniteInternalFuture<?> persistCacheConfigurations(List<StoredCacheData> cacheConfigsToPersist) {
// Pre-create cache work directories if they don't exist.
for (StoredCacheData data : cacheConfigsToPersist) {
try {
@@ -297,13 +298,15 @@ public class CachesRegistry {
}
}
- return cctx.kernalContext().closure().runLocalSafe(() -> {
- try {
- for (StoredCacheData data : cacheConfigsToPersist)
- cctx.pageStore().storeCacheData(data, false);
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Error while saving cache configurations on disk", e);
+ return cctx.kernalContext().closure().runLocalSafe(new GridPlainRunnable() {
+ @Override public void run() {
+ try {
+ for (StoredCacheData data : cacheConfigsToPersist)
+ cctx.cache().saveCacheConfiguration(data, false);
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Error while saving cache configurations on disk", e);
+ }
}
});
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
index 3df638d..13ef63c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
@@ -33,6 +33,7 @@ import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.stream.Collectors;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.CacheExistsException;
@@ -151,15 +152,13 @@ class ClusterCachesInfo {
* Filters all dynamic cache descriptors and groups that were not presented on node start
* and were received with grid discovery data.
*
- * @param localConfigData node's local cache configurations
- * (both from static config and stored with persistent caches).
- *
+ * @param localCachesOnStart Caches which were already presented on node start.
*/
- public void filterDynamicCacheDescriptors(CacheJoinNodeDiscoveryData localConfigData) {
+ public void filterDynamicCacheDescriptors(Set<String> localCachesOnStart) {
if (ctx.isDaemon())
return;
- filterRegisteredCachesAndCacheGroups(localConfigData.caches());
+ filterRegisteredCachesAndCacheGroups(localCachesOnStart);
List<T2<DynamicCacheDescriptor, NearCacheConfiguration>> locJoinStartCaches = locJoinCachesCtx.caches();
@@ -183,14 +182,14 @@ class ClusterCachesInfo {
*
* @param locCaches Caches from local node configuration (static configuration and persistent caches).
*/
- private void filterRegisteredCachesAndCacheGroups(Map<String, CacheJoinNodeDiscoveryData.CacheInfo> locCaches) {
+ private void filterRegisteredCachesAndCacheGroups(Set<String> locCaches) {
//filter registered caches
Iterator<Map.Entry<String, DynamicCacheDescriptor>> cachesIter = registeredCaches.entrySet().iterator();
while (cachesIter.hasNext()) {
Map.Entry<String, DynamicCacheDescriptor> e = cachesIter.next();
- if (!locCaches.containsKey(e.getKey())) {
+ if (!locCaches.contains(e.getKey())) {
cachesIter.remove();
ctx.discovery().removeCacheFilter(e.getKey());
@@ -1150,23 +1149,23 @@ class ClusterCachesInfo {
/**
* @param dataBag Discovery data bag.
+ * @param splitter Cache configuration splitter.
*/
- public void collectGridNodeData(DiscoveryDataBag dataBag) {
+ public void collectGridNodeData(DiscoveryDataBag dataBag, CacheConfigurationSplitter splitter) {
if (ctx.isDaemon())
return;
if (!dataBag.commonDataCollectedFor(CACHE_PROC.ordinal()))
- dataBag.addGridCommonData(CACHE_PROC.ordinal(), collectCommonDiscoveryData());
+ dataBag.addGridCommonData(CACHE_PROC.ordinal(), collectCommonDiscoveryData(splitter));
}
/**
* @return Information about started caches.
+ * @param cfgSplitter Cache configuration splitter.
*/
- private CacheNodeCommonDiscoveryData collectCommonDiscoveryData() {
+ private CacheNodeCommonDiscoveryData collectCommonDiscoveryData(CacheConfigurationSplitter cfgSplitter) {
Map<Integer, CacheGroupData> cacheGrps = new HashMap<>();
- CacheConfigurationSplitter cfgSplitter = ctx.cache().backwardCompatibleSplitter();
-
for (CacheGroupDescriptor grpDesc : registeredCacheGrps.values()) {
T2<CacheConfiguration, CacheConfigurationEnrichment> splitCfg = cfgSplitter.split(grpDesc);
@@ -1789,6 +1788,53 @@ class ClusterCachesInfo {
}
/**
+ * @param data Joining node data.
+ * @return Message with error or null if everything was OK.
+ */
+ public String validateJoiningNodeData(DiscoveryDataBag.JoiningNodeDiscoveryData data) {
+ if (data.hasJoiningNodeData()) {
+ Serializable joiningNodeData = data.joiningNodeData();
+
+ if (joiningNodeData instanceof CacheJoinNodeDiscoveryData) {
+ CacheJoinNodeDiscoveryData joinData = (CacheJoinNodeDiscoveryData)joiningNodeData;
+
+ Set<String> problemCaches = null;
+
+ for (CacheJoinNodeDiscoveryData.CacheInfo cacheInfo : joinData.caches().values()) {
+ CacheConfiguration<?, ?> cfg = cacheInfo.cacheData().config();
+
+ if (!registeredCaches.containsKey(cfg.getName())) {
+ String conflictErr = checkCacheConflict(cfg);
+
+ if (conflictErr != null) {
+ U.warn(log, "Ignore cache received from joining node. " + conflictErr);
+
+ continue;
+ }
+
+ long flags = cacheInfo.getFlags();
+
+ if (flags == 1L) {
+ if (problemCaches == null)
+ problemCaches = new HashSet<>();
+
+ problemCaches.add(cfg.getName());
+ }
+ }
+ }
+
+ if (!F.isEmpty(problemCaches))
+ return problemCaches.stream().collect(Collectors.joining(", ",
+ "Joining node has caches with data which are not presented on cluster, " +
+ "it could mean that they were already destroyed, to add the node to cluster - " +
+ "remove directories with the caches[", "]"));
+ }
+ }
+
+ return null;
+ }
+
+ /**
* @param clientData Discovery data.
* @param clientNodeId Client node ID.
*/
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
index 3032dc8..231d843 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
@@ -134,21 +134,22 @@ public class DynamicCacheChangeRequest implements Serializable {
/**
* @param ctx Context.
* @param cfg0 Template configuration.
+ * @param splitCfg Cache configuration splitter.
* @return Request to add template.
*/
- static DynamicCacheChangeRequest addTemplateRequest(GridKernalContext ctx, CacheConfiguration<?, ?> cfg0) {
- CacheConfiguration<?, ?> cfg = new CacheConfiguration<>(cfg0);
-
- DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(UUID.randomUUID(), cfg.getName(), ctx.localNodeId());
+ static DynamicCacheChangeRequest addTemplateRequest(
+ GridKernalContext ctx,
+ CacheConfiguration<?, ?> cfg0,
+ T2<CacheConfiguration, CacheConfigurationEnrichment> splitCfg
+ ) {
+ DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(UUID.randomUUID(), cfg0.getName(), ctx.localNodeId());
req.template(true);
- T2<CacheConfiguration, CacheConfigurationEnrichment> splitCfg = ctx.cache().backwardCompatibleSplitter().split(cfg);
-
req.startCacheConfiguration(splitCfg.get1());
req.cacheConfigurationEnrichment(splitCfg.get2());
- req.schema(new QuerySchema(cfg.getQueryEntities()));
+ req.schema(new QuerySchema(cfg0.getQueryEntities()));
req.deploymentId(IgniteUuid.randomUuid());
return req;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 7ba8ae3..8eb6164 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -20,12 +20,10 @@ package org.apache.ignite.internal.processors.cache;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
-import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.Iterator;
-import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
@@ -36,12 +34,10 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-import javax.cache.configuration.FactoryBuilder;
-import javax.cache.expiry.EternalExpiryPolicy;
-import javax.cache.expiry.ExpiryPolicy;
import javax.management.MBeanServer;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteCompute;
@@ -49,24 +45,18 @@ import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cache.CacheExistsException;
import org.apache.ignite.cache.CacheMode;
-import org.apache.ignite.cache.CacheRebalanceMode;
import org.apache.ignite.cache.QueryEntity;
import org.apache.ignite.cache.affinity.AffinityFunction;
import org.apache.ignite.cache.affinity.AffinityFunctionContext;
-import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cache.store.CacheStore;
import org.apache.ignite.cache.store.CacheStoreSessionListener;
import org.apache.ignite.cluster.ClusterGroup;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.configuration.DataPageEvictionMode;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.DeploymentMode;
-import org.apache.ignite.configuration.DiskPageCompression;
import org.apache.ignite.configuration.FileSystemConfiguration;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.configuration.MemoryConfiguration;
import org.apache.ignite.configuration.NearCacheConfiguration;
import org.apache.ignite.configuration.TransactionConfiguration;
import org.apache.ignite.events.EventType;
@@ -76,7 +66,6 @@ import org.apache.ignite.internal.IgniteComponentType;
import org.apache.ignite.internal.IgniteFeatures;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
-import org.apache.ignite.internal.IgniteNodeAttributes;
import org.apache.ignite.internal.IgniteTransactionsEx;
import org.apache.ignite.internal.binary.BinaryContext;
import org.apache.ignite.internal.binary.BinaryMarshaller;
@@ -90,7 +79,6 @@ import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
-import org.apache.ignite.internal.processors.cache.CacheJoinNodeDiscoveryData.CacheInfo;
import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
import org.apache.ignite.internal.processors.cache.datastructures.CacheDataStructuresManager;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCache;
@@ -146,8 +134,6 @@ import org.apache.ignite.internal.processors.query.schema.SchemaExchangeWorkerTa
import org.apache.ignite.internal.processors.query.schema.SchemaNodeLeaveExchangeWorkerTask;
import org.apache.ignite.internal.processors.query.schema.message.SchemaAbstractDiscoveryMessage;
import org.apache.ignite.internal.processors.query.schema.message.SchemaProposeDiscoveryMessage;
-import org.apache.ignite.internal.processors.security.OperationSecurityContext;
-import org.apache.ignite.internal.processors.security.SecurityContext;
import org.apache.ignite.internal.processors.service.GridServiceProcessor;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
import org.apache.ignite.internal.suggestions.GridPerformanceSuggestions;
@@ -177,7 +163,6 @@ import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.lifecycle.LifecycleAware;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.marshaller.MarshallerUtils;
-import org.apache.ignite.marshaller.jdk.JdkMarshaller;
import org.apache.ignite.mxbean.CacheGroupMetricsMXBean;
import org.apache.ignite.mxbean.IgniteMBeanAware;
import org.apache.ignite.plugin.security.SecurityException;
@@ -186,9 +171,6 @@ import org.apache.ignite.spi.IgniteNodeValidationResult;
import org.apache.ignite.spi.discovery.DiscoveryDataBag;
import org.apache.ignite.spi.discovery.DiscoveryDataBag.GridDiscoveryData;
import org.apache.ignite.spi.discovery.DiscoveryDataBag.JoiningNodeDiscoveryData;
-import org.apache.ignite.spi.encryption.EncryptionSpi;
-import org.apache.ignite.spi.indexing.IndexingSpi;
-import org.apache.ignite.spi.indexing.noop.NoopIndexingSpi;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -200,23 +182,16 @@ import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
import static org.apache.ignite.cache.CacheMode.LOCAL;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheMode.REPLICATED;
-import static org.apache.ignite.cache.CacheRebalanceMode.NONE;
import static org.apache.ignite.cache.CacheRebalanceMode.SYNC;
-import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
import static org.apache.ignite.configuration.DeploymentMode.CONTINUOUS;
-import static org.apache.ignite.configuration.DeploymentMode.ISOLATED;
-import static org.apache.ignite.configuration.DeploymentMode.PRIVATE;
import static org.apache.ignite.configuration.DeploymentMode.SHARED;
import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType.CACHE_PROC;
import static org.apache.ignite.internal.IgniteComponentType.JTA;
import static org.apache.ignite.internal.IgniteFeatures.TRANSACTION_OWNER_THREAD_DUMP_PROVIDING;
-import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_CONSISTENCY_CHECK_SKIPPED;
-import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_TX_CONFIG;
-import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isDefaultDataRegionPersistent;
import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isNearEnabled;
import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isPersistentCache;
-import static org.apache.ignite.internal.processors.security.SecurityUtils.nodeSecurityContext;
+import static org.apache.ignite.internal.processors.cache.ValidationOnNodeJoinUtils.validateHashIdResolvers;
import static org.apache.ignite.internal.util.IgniteUtils.doInParallel;
/**
@@ -224,15 +199,6 @@ import static org.apache.ignite.internal.util.IgniteUtils.doInParallel;
*/
@SuppressWarnings({"unchecked", "TypeMayBeWeakened", "deprecation"})
public class GridCacheProcessor extends GridProcessorAdapter {
- /** Template of message of conflicts during configuration merge*/
- private static final String MERGE_OF_CONFIG_CONFLICTS_MESSAGE =
- "Conflicts during configuration merge for cache '%s' : \n%s";
-
- /** Template of message of node join was fail because it requires to merge of config */
- private static final String MERGE_OF_CONFIG_REQUIRED_MESSAGE = "Failed to join node to the active cluster " +
- "(the config of the cache '%s' has to be merged which is impossible on active grid). " +
- "Deactivate grid and retry node join or clean the joining node.";
-
/** Invalid region configuration message. */
private static final String INVALID_REGION_CONFIGURATION_MESSAGE = "Failed to join node " +
"(Incompatible data region configuration [region=%s, locNodeId=%s, isPersistenceEnabled=%s, rmtNodeId=%s, isPersistenceEnabled=%s])";
@@ -247,10 +213,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
/** */
private static final String CACHE_NAMES_AND_OPERATION_FORMAT = "[cacheNames=%s, operation=%s]";
- /** */
- private final boolean startClientCaches =
- IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_START_CACHES_ON_JOIN, false);
-
/** Enables start caches in parallel. */
private final boolean IGNITE_ALLOW_START_CACHES_IN_PARALLEL =
IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_ALLOW_START_CACHES_IN_PARALLEL, true);
@@ -265,6 +227,9 @@ public class GridCacheProcessor extends GridProcessorAdapter {
/** */
private final ConcurrentMap<Integer, CacheGroupContext> cacheGrps = new ConcurrentHashMap<>();
+ /** Flag that caches were already filtered out. */
+ private final AtomicBoolean alreadyFiltered = new AtomicBoolean();
+
/** */
private final Map<String, GridCacheAdapter<?, ?>> caches;
@@ -274,9 +239,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
/** Map of proxies. */
private final ConcurrentHashMap<String, IgniteCacheProxyImpl<?, ?>> jCacheProxies;
- /** Caches stop sequence. */
- private final Deque<String> stopSeq;
-
/** Transaction interface implementation. */
private IgniteTransactionsImpl transactions;
@@ -297,6 +259,9 @@ public class GridCacheProcessor extends GridProcessorAdapter {
private ClusterCachesInfo cachesInfo;
/** */
+ private GridLocalConfigManager locCfgMgr;
+
+ /** */
private IdentityHashMap<CacheStore, ThreadLocal> sesHolders = new IdentityHashMap<>();
/** Must use JDK marsh since it is used by discovery to fire custom events. */
@@ -320,9 +285,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
/** Tmp storage for meta migration. */
private MetaStorage.TmpStorage tmpStorage;
- /** Node's local cache configurations (both from static configuration and from persistent caches). */
- private CacheJoinNodeDiscoveryData localConfigs;
-
/** Cache configuration splitter. */
private CacheConfigurationSplitter splitter;
@@ -337,7 +299,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
caches = new ConcurrentHashMap<>();
jCacheProxies = new ConcurrentHashMap<>();
- stopSeq = new LinkedList<>();
internalCaches = new HashSet<>();
marsh = MarshallerUtils.jdkMarshaller(ctx.igniteInstanceName());
@@ -346,19 +307,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
}
/**
- * @param cfg Initializes cache configuration with proper defaults.
- * @param cacheObjCtx Cache object context.
- * @throws IgniteCheckedException If configuration is not valid.
- */
- private void initialize(CacheConfiguration cfg, CacheObjectContext cacheObjCtx)
- throws IgniteCheckedException {
- CU.initializeConfigDefaults(log, cfg, cacheObjCtx);
-
- ctx.coordinators().preProcessCacheConfiguration(cfg);
- ctx.igfsHelper().preProcessCacheConfiguration(cfg);
- }
-
- /**
* @param cfg Configuration to check for possible performance issues.
* @param hasStore {@code True} if store is configured.
*/
@@ -487,241 +435,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
}
/**
- * @param c Ignite Configuration.
- * @param cc Cache Configuration.
- * @return {@code true} if cache is starting on client node and this node is affinity node for the cache.
- */
- private boolean storesLocallyOnClient(IgniteConfiguration c, CacheConfiguration cc) {
- if (c.isClientMode() && c.getDataStorageConfiguration() == null) {
- if (cc.getCacheMode() == LOCAL)
- return true;
-
- return ctx.discovery().cacheAffinityNode(ctx.discovery().localNode(), cc.getName());
-
- }
- else
- return false;
- }
-
- /**
- * @param c Ignite configuration.
- * @param cc Configuration to validate.
- * @param cacheType Cache type.
- * @param cfgStore Cache store.
- * @throws IgniteCheckedException If failed.
- */
- private void validate(IgniteConfiguration c,
- CacheConfiguration cc,
- CacheType cacheType,
- @Nullable CacheStore cfgStore) throws IgniteCheckedException {
- assertParameter(cc.getName() != null && !cc.getName().isEmpty(), "name is null or empty");
-
- if (cc.getCacheMode() == REPLICATED) {
- if (cc.getNearConfiguration() != null &&
- ctx.discovery().cacheAffinityNode(ctx.discovery().localNode(), cc.getName())) {
- U.warn(log, "Near cache cannot be used with REPLICATED cache, " +
- "will be ignored [cacheName=" + U.maskName(cc.getName()) + ']');
-
- cc.setNearConfiguration(null);
- }
- }
-
- if (storesLocallyOnClient(c, cc))
- throw new IgniteCheckedException("DataRegion for client caches must be explicitly configured " +
- "on client node startup. Use DataStorageConfiguration to configure DataRegion.");
-
- if (cc.getCacheMode() == LOCAL && !cc.getAffinity().getClass().equals(LocalAffinityFunction.class))
- U.warn(log, "AffinityFunction configuration parameter will be ignored for local cache [cacheName=" +
- U.maskName(cc.getName()) + ']');
-
- if (cc.getAffinity().partitions() > CacheConfiguration.MAX_PARTITIONS_COUNT)
- throw new IgniteCheckedException("Cannot have more than " + CacheConfiguration.MAX_PARTITIONS_COUNT +
- " partitions [cacheName=" + cc.getName() + ", partitions=" + cc.getAffinity().partitions() + ']');
-
- if (cc.getRebalanceMode() != CacheRebalanceMode.NONE) {
- assertParameter(cc.getRebalanceBatchSize() > 0, "rebalanceBatchSize > 0");
- assertParameter(cc.getRebalanceTimeout() >= 0, "rebalanceTimeout >= 0");
- assertParameter(cc.getRebalanceThrottle() >= 0, "rebalanceThrottle >= 0");
- assertParameter(cc.getRebalanceBatchesPrefetchCount() > 0, "rebalanceBatchesPrefetchCount > 0");
- }
-
- if (cc.getCacheMode() == PARTITIONED || cc.getCacheMode() == REPLICATED) {
- if (cc.getAtomicityMode() == ATOMIC && cc.getWriteSynchronizationMode() == FULL_ASYNC)
- U.warn(log, "Cache write synchronization mode is set to FULL_ASYNC. All single-key 'put' and " +
- "'remove' operations will return 'null', all 'putx' and 'removex' operations will return" +
- " 'true' [cacheName=" + U.maskName(cc.getName()) + ']');
- }
-
- DeploymentMode depMode = c.getDeploymentMode();
-
- if (c.isPeerClassLoadingEnabled() && (depMode == PRIVATE || depMode == ISOLATED) &&
- !CU.isSystemCache(cc.getName()) && !(c.getMarshaller() instanceof BinaryMarshaller))
- throw new IgniteCheckedException("Cache can be started in PRIVATE or ISOLATED deployment mode only when" +
- " BinaryMarshaller is used [depMode=" + ctx.config().getDeploymentMode() + ", marshaller=" +
- c.getMarshaller().getClass().getName() + ']');
-
- if (cc.getAffinity().partitions() > CacheConfiguration.MAX_PARTITIONS_COUNT)
- throw new IgniteCheckedException("Affinity function must return at most " +
- CacheConfiguration.MAX_PARTITIONS_COUNT + " partitions [actual=" + cc.getAffinity().partitions() +
- ", affFunction=" + cc.getAffinity() + ", cacheName=" + cc.getName() + ']');
-
- if (cc.getAtomicityMode() == TRANSACTIONAL_SNAPSHOT) {
- assertParameter(cc.getCacheMode() != LOCAL,
- "LOCAL cache mode cannot be used with TRANSACTIONAL_SNAPSHOT atomicity mode");
-
- assertParameter(cc.getNearConfiguration() == null,
- "near cache cannot be used with TRANSACTIONAL_SNAPSHOT atomicity mode");
-
- assertParameter(!cc.isReadThrough(),
- "readThrough cannot be used with TRANSACTIONAL_SNAPSHOT atomicity mode");
-
- assertParameter(!cc.isWriteThrough(),
- "writeThrough cannot be used with TRANSACTIONAL_SNAPSHOT atomicity mode");
-
- assertParameter(!cc.isWriteBehindEnabled(),
- "writeBehindEnabled cannot be used with TRANSACTIONAL_SNAPSHOT atomicity mode");
-
- assertParameter(cc.getRebalanceMode() != NONE,
- "Rebalance mode NONE cannot be used with TRANSACTIONAL_SNAPSHOT atomicity mode");
-
- ExpiryPolicy expPlc = null;
-
- if (cc.getExpiryPolicyFactory() instanceof FactoryBuilder.SingletonFactory)
- expPlc = (ExpiryPolicy)cc.getExpiryPolicyFactory().create();
-
- if (!(expPlc instanceof EternalExpiryPolicy)) {
- assertParameter(cc.getExpiryPolicyFactory() == null,
- "expiry policy cannot be used with TRANSACTIONAL_SNAPSHOT atomicity mode");
- }
-
- assertParameter(cc.getInterceptor() == null,
- "interceptor cannot be used with TRANSACTIONAL_SNAPSHOT atomicity mode");
-
- // Disable in-memory evictions for mvcc cache. TODO IGNITE-10738
- String memPlcName = cc.getDataRegionName();
- DataRegion dataRegion = sharedCtx.database().dataRegion(memPlcName);
-
- if (dataRegion != null && !dataRegion.config().isPersistenceEnabled() &&
- dataRegion.config().getPageEvictionMode() != DataPageEvictionMode.DISABLED) {
- throw new IgniteCheckedException("Data pages evictions cannot be used with TRANSACTIONAL_SNAPSHOT " +
- "cache atomicity mode for in-memory regions. Please, either disable evictions or enable " +
- "persistence for data regions with TRANSACTIONAL_SNAPSHOT caches. [cacheName=" + cc.getName() +
- ", dataRegionName=" + memPlcName + ", pageEvictionMode=" +
- dataRegion.config().getPageEvictionMode() + ']');
- }
-
- IndexingSpi idxSpi = ctx.config().getIndexingSpi();
-
- assertParameter(idxSpi == null || idxSpi instanceof NoopIndexingSpi,
- "Custom IndexingSpi cannot be used with TRANSACTIONAL_SNAPSHOT atomicity mode");
- }
-
- if (cc.isWriteBehindEnabled() && ctx.discovery().cacheAffinityNode(ctx.discovery().localNode(), cc.getName())) {
- if (cfgStore == null)
- throw new IgniteCheckedException("Cannot enable write-behind (writer or store is not provided) " +
- "for cache: " + U.maskName(cc.getName()));
-
- assertParameter(cc.getWriteBehindBatchSize() > 0, "writeBehindBatchSize > 0");
- assertParameter(cc.getWriteBehindFlushSize() >= 0, "writeBehindFlushSize >= 0");
- assertParameter(cc.getWriteBehindFlushFrequency() >= 0, "writeBehindFlushFrequency >= 0");
- assertParameter(cc.getWriteBehindFlushThreadCount() > 0, "writeBehindFlushThreadCount > 0");
-
- if (cc.getWriteBehindFlushSize() == 0 && cc.getWriteBehindFlushFrequency() == 0)
- throw new IgniteCheckedException("Cannot set both 'writeBehindFlushFrequency' and " +
- "'writeBehindFlushSize' parameters to 0 for cache: " + U.maskName(cc.getName()));
- }
-
- if (cc.isReadThrough() && cfgStore == null
- && ctx.discovery().cacheAffinityNode(ctx.discovery().localNode(), cc.getName()))
- throw new IgniteCheckedException("Cannot enable read-through (loader or store is not provided) " +
- "for cache: " + U.maskName(cc.getName()));
-
- if (cc.isWriteThrough() && cfgStore == null
- && ctx.discovery().cacheAffinityNode(ctx.discovery().localNode(), cc.getName()))
- throw new IgniteCheckedException("Cannot enable write-through (writer or store is not provided) " +
- "for cache: " + U.maskName(cc.getName()));
-
- long delay = cc.getRebalanceDelay();
-
- if (delay != 0) {
- if (cc.getCacheMode() != PARTITIONED)
- U.warn(log, "Rebalance delay is supported only for partitioned caches (will ignore): " + (cc.getName()));
- else if (cc.getRebalanceMode() == SYNC) {
- if (delay < 0) {
- U.warn(log, "Ignoring SYNC rebalance mode with manual rebalance start (node will not wait for " +
- "rebalancing to be finished): " + U.maskName(cc.getName()));
- }
- else {
- U.warn(log, "Using SYNC rebalance mode with rebalance delay (node will wait until rebalancing is " +
- "initiated for " + delay + "ms) for cache: " + U.maskName(cc.getName()));
- }
- }
- }
-
- ctx.igfsHelper().validateCacheConfiguration(cc);
- ctx.coordinators().validateCacheConfiguration(cc);
-
- if (cc.getAtomicityMode() == ATOMIC)
- assertParameter(cc.getTransactionManagerLookupClassName() == null,
- "transaction manager can not be used with ATOMIC cache");
-
- if ((cc.getEvictionPolicyFactory() != null || cc.getEvictionPolicy() != null) && !cc.isOnheapCacheEnabled())
- throw new IgniteCheckedException("Onheap cache must be enabled if eviction policy is configured [cacheName="
- + U.maskName(cc.getName()) + "]");
-
- if (cacheType != CacheType.DATA_STRUCTURES && DataStructuresProcessor.isDataStructureCache(cc.getName()))
- throw new IgniteCheckedException("Using cache names reserved for datastructures is not allowed for " +
- "other cache types [cacheName=" + cc.getName() + ", cacheType=" + cacheType + "]");
-
- if (cacheType != CacheType.DATA_STRUCTURES && DataStructuresProcessor.isReservedGroup(cc.getGroupName()))
- throw new IgniteCheckedException("Using cache group names reserved for datastructures is not allowed for " +
- "other cache types [cacheName=" + cc.getName() + ", groupName=" + cc.getGroupName() +
- ", cacheType=" + cacheType + "]");
-
- // Make sure we do not use sql schema for system views.
- if (ctx.query().moduleEnabled()) {
- String schema = QueryUtils.normalizeSchemaName(cc.getName(), cc.getSqlSchema());
-
- if (F.eq(schema, QueryUtils.SCHEMA_SYS)) {
- if (cc.getSqlSchema() == null) {
- // Conflict on cache name.
- throw new IgniteCheckedException("SQL schema name derived from cache name is reserved (" +
- "please set explicit SQL schema name through CacheConfiguration.setSqlSchema() or choose " +
- "another cache name) [cacheName=" + cc.getName() + ", schemaName=" + cc.getSqlSchema() + "]");
- }
- else {
- // Conflict on schema name.
- throw new IgniteCheckedException("SQL schema name is reserved (please choose another one) [" +
- "cacheName=" + cc.getName() + ", schemaName=" + cc.getSqlSchema() + ']');
- }
- }
- }
-
- if (cc.isEncryptionEnabled() && !ctx.clientNode()) {
- StringBuilder cacheSpec = new StringBuilder("[cacheName=").append(cc.getName())
- .append(", groupName=").append(cc.getGroupName())
- .append(", cacheType=").append(cacheType)
- .append(']');
-
- if (!CU.isPersistentCache(cc, c.getDataStorageConfiguration())) {
- throw new IgniteCheckedException("Using encryption is not allowed" +
- " for not persistent cache " + cacheSpec.toString());
- }
-
- EncryptionSpi encSpi = c.getEncryptionSpi();
-
- if (encSpi == null) {
- throw new IgniteCheckedException("EncryptionSpi should be configured to use encrypted cache " +
- cacheSpec.toString());
- }
-
- if (cc.getDiskPageCompression() != DiskPageCompression.DISABLED)
- throw new IgniteCheckedException("Encryption cannot be used with disk page compression " +
- cacheSpec.toString());
- }
- }
-
- /**
* @param ctx Context.
* @return DHT managers.
*/
@@ -837,31 +550,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
}
}
- /**
- * @throws IgniteCheckedException If failed.
- */
- private void restoreCacheConfigurations() throws IgniteCheckedException {
- if (ctx.isDaemon())
- return;
-
- Map<String, CacheInfo> caches = new HashMap<>();
-
- Map<String, CacheInfo> templates = new HashMap<>();
-
- addCacheOnJoinFromConfig(caches, templates);
-
- CacheJoinNodeDiscoveryData discoData = new CacheJoinNodeDiscoveryData(
- IgniteUuid.randomUuid(),
- caches,
- templates,
- startAllCachesOnClientStart()
- );
-
- localConfigs = discoData;
-
- cachesInfo.onStart(discoData);
- }
-
/** {@inheritDoc} */
@SuppressWarnings({"unchecked"})
@Override public void start() throws IgniteCheckedException {
@@ -885,14 +573,20 @@ public class GridCacheProcessor extends GridProcessorAdapter {
sharedCtx = createSharedContext(ctx, sessionListeners);
+ locCfgMgr = new GridLocalConfigManager(this, ctx);
+
transactions = new IgniteTransactionsImpl(sharedCtx, null);
// Start shared managers.
for (GridCacheSharedManager mgr : sharedCtx.managers())
mgr.start(sharedCtx);
- if (!ctx.isDaemon() && (!CU.isPersistenceEnabled(ctx.config())) || ctx.config().isClientMode())
- restoreCacheConfigurations();
+ if (!ctx.isDaemon() && (!CU.isPersistenceEnabled(ctx.config())) || ctx.config().isClientMode()) {
+ CacheJoinNodeDiscoveryData data = locCfgMgr.restoreCacheConfigurations();
+
+ if (data != null)
+ cachesInfo.onStart(data);
+ }
if (log.isDebugEnabled())
log.debug("Started cache processor.");
@@ -901,178 +595,16 @@ public class GridCacheProcessor extends GridProcessorAdapter {
ctx.authentication().cacheProcessorStarted();
}
- /**
- * @param cfg Cache configuration.
- * @param sql SQL flag.
- * @param caches Caches map.
- * @param templates Templates map.
- * @throws IgniteCheckedException If failed.
- */
- private void addCacheOnJoin(CacheConfiguration<?, ?> cfg, boolean sql,
- Map<String, CacheInfo> caches,
- Map<String, CacheInfo> templates) throws IgniteCheckedException {
- String cacheName = cfg.getName();
-
- CU.validateCacheName(cacheName);
-
- cloneCheckSerializable(cfg);
-
- CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(cfg);
-
- // Initialize defaults.
- initialize(cfg, cacheObjCtx);
-
- StoredCacheData cacheData = new StoredCacheData(cfg);
-
- cacheData.sql(sql);
-
- T2<CacheConfiguration, CacheConfigurationEnrichment> splitCfg = splitter().split(cfg);
-
- cacheData.config(splitCfg.get1());
- cacheData.cacheConfigurationEnrichment(splitCfg.get2());
-
- cfg = splitCfg.get1();
-
- if (GridCacheUtils.isCacheTemplateName(cacheName))
- templates.put(cacheName, new CacheInfo(cacheData, CacheType.USER, false, 0, true));
- else {
- if (caches.containsKey(cacheName)) {
- throw new IgniteCheckedException("Duplicate cache name found (check configuration and " +
- "assign unique name to each cache): " + cacheName);
- }
-
- CacheType cacheType = cacheType(cacheName);
-
- if (cacheType != CacheType.USER && cfg.getDataRegionName() == null)
- cfg.setDataRegionName(sharedCtx.database().systemDateRegionName());
-
- addStoredCache(caches, cacheData, cacheName, cacheType, true);
- }
- }
-
- /**
- * Add stored cache data to caches storage.
- *
- * @param caches Cache storage.
- * @param cacheData Cache data to add.
- * @param cacheName Cache name.
- * @param cacheType Cache type.
- * @param isStaticalyConfigured Statically configured flag.
- */
- private void addStoredCache(Map<String, CacheInfo> caches, StoredCacheData cacheData, String cacheName,
- CacheType cacheType, boolean isStaticalyConfigured) {
- if (!caches.containsKey(cacheName)) {
- if (!cacheType.userCache())
- stopSeq.addLast(cacheName);
- else
- stopSeq.addFirst(cacheName);
- }
-
- caches.put(cacheName, new CacheInfo(cacheData, cacheType, cacheData.sql(), 0, isStaticalyConfigured));
- }
/**
- * @param caches Caches map.
- * @param templates Templates map.
- * @throws IgniteCheckedException If failed.
- */
- private void addCacheOnJoinFromConfig(
- Map<String, CacheInfo> caches,
- Map<String, CacheInfo> templates
- ) throws IgniteCheckedException {
- assert !ctx.config().isDaemon();
-
- CacheConfiguration[] cfgs = ctx.config().getCacheConfiguration();
-
- for (int i = 0; i < cfgs.length; i++) {
- CacheConfiguration<?, ?> cfg = new CacheConfiguration(cfgs[i]);
-
- // Replace original configuration value.
- cfgs[i] = cfg;
-
- addCacheOnJoin(cfg, false, caches, templates);
- }
-
- if (CU.isPersistenceEnabled(ctx.config()) && ctx.cache().context().pageStore() != null) {
- Map<String, StoredCacheData> storedCaches = ctx.cache().context().pageStore().readCacheConfigurations();
-
- if (!F.isEmpty(storedCaches)) {
- List<String> skippedConfigs = new ArrayList<>();
-
- for (StoredCacheData storedCacheData : storedCaches.values()) {
- // Backward compatibility for old stored caches data.
- if (storedCacheData.hasOldCacheConfigurationFormat()) {
- storedCacheData = new StoredCacheData(storedCacheData);
-
- T2<CacheConfiguration, CacheConfigurationEnrichment> splitCfg = splitter().split(storedCacheData.config());
-
- storedCacheData.config(splitCfg.get1());
- storedCacheData.cacheConfigurationEnrichment(splitCfg.get2());
-
- // Overwrite with new format.
- saveCacheConfiguration(storedCacheData);
- }
-
- String cacheName = storedCacheData.config().getName();
-
- CacheType type = cacheType(cacheName);
-
- if (!caches.containsKey(cacheName))
- // No static cache - add the configuration.
- addStoredCache(caches, storedCacheData, cacheName, type, false);
- else {
- // A static cache with the same name already exists.
- CacheConfiguration cfg = caches.get(cacheName).cacheData().config();
- CacheConfiguration cfgFromStore = storedCacheData.config();
-
- validateCacheConfigurationOnRestore(cfg, cfgFromStore);
-
- if (!keepStaticCacheConfiguration) {
- addStoredCache(caches, storedCacheData, cacheName, type, false);
-
- if (type == CacheType.USER)
- skippedConfigs.add(cacheName);
- }
- }
- }
-
- if (!F.isEmpty(skippedConfigs))
- U.warn(log, "Static configuration for the following caches will be ignored because a persistent " +
- "cache with the same name already exist (see " +
- "https://apacheignite.readme.io/docs/cache-configuration for more information): " +
- skippedConfigs);
- }
- }
- }
-
- /**
- * Validates cache configuration against stored cache configuration when persistence is enabled.
- *
- * @param cfg Configured cache configuration.
- * @param cfgFromStore Stored cache configuration
- * @throws IgniteCheckedException If validation failed.
+ * @param cfg Initializes cache configuration with proper defaults.
+ * @param cacheObjCtx Cache object context.
+ * @throws IgniteCheckedException If configuration is not valid.
*/
- private void validateCacheConfigurationOnRestore(CacheConfiguration cfg, CacheConfiguration cfgFromStore)
- throws IgniteCheckedException {
- assert cfg != null && cfgFromStore != null;
-
- if ((cfg.getAtomicityMode() == TRANSACTIONAL_SNAPSHOT ||
- cfgFromStore.getAtomicityMode() == TRANSACTIONAL_SNAPSHOT)
- && cfg.getAtomicityMode() != cfgFromStore.getAtomicityMode()) {
- throw new IgniteCheckedException("Cannot start cache. Statically configured atomicity mode differs from " +
- "previously stored configuration. Please check your configuration: [cacheName=" + cfg.getName() +
- ", configuredAtomicityMode=" + cfg.getAtomicityMode() +
- ", storedAtomicityMode=" + cfgFromStore.getAtomicityMode() + "]");
- }
-
- boolean staticCfgVal = cfg.isEncryptionEnabled();
-
- boolean storedVal = cfgFromStore.isEncryptionEnabled();
+ void initialize(CacheConfiguration cfg, CacheObjectContext cacheObjCtx) throws IgniteCheckedException {
+ CU.initializeConfigDefaults(log, cfg, cacheObjCtx);
- if (storedVal != staticCfgVal) {
- throw new IgniteCheckedException("Encrypted flag value differs. Static config value is '" + staticCfgVal +
- "' and value stored on the disk is '" + storedVal + "'");
- }
+ ctx.coordinators().preProcessCacheConfiguration(cfg);
}
/**
@@ -1093,32 +625,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
}
/**
- * @param rmtNode Remote node to check.
- * @return Data storage configuration
- */
- private DataStorageConfiguration extractDataStorage(ClusterNode rmtNode) {
- return GridCacheUtils.extractDataStorage(
- rmtNode,
- ctx.marshallerContext().jdkMarshaller(),
- U.resolveClassLoader(ctx.config())
- );
- }
-
- /**
- * @param dataStorageCfg User-defined data regions.
- */
- private Map<String, DataRegionConfiguration> dataRegionCfgs(DataStorageConfiguration dataStorageCfg) {
- if(dataStorageCfg != null) {
- return Optional.ofNullable(dataStorageCfg.getDataRegionConfigurations())
- .map(Stream::of)
- .orElseGet(Stream::empty)
- .collect(Collectors.toMap(DataRegionConfiguration::getName, e -> e));
- }
-
- return Collections.emptyMap();
- }
-
- /**
* @param grpId Group ID.
* @return Cache group.
*/
@@ -1142,7 +648,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
boolean checkConsistency = !getBoolean(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK);
if (checkConsistency)
- checkConsistency();
+ ValidationOnNodeJoinUtils.checkConsistency(ctx, log);
cachesInfo.onKernalStart(checkConsistency);
@@ -1197,33 +703,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
ctx.timeout().addTimeoutObject(new RemovedItemsCleanupTask(timeout));
}
- /**
- * @throws IgniteCheckedException if check failed.
- */
- private void checkConsistency() throws IgniteCheckedException {
- Collection<ClusterNode> rmtNodes = ctx.discovery().remoteNodes();
-
- boolean changeablePoolSize = IgniteFeatures.allNodesSupports(rmtNodes, IgniteFeatures.DIFFERENT_REBALANCE_POOL_SIZE);
-
- for (ClusterNode n : rmtNodes) {
- if (Boolean.TRUE.equals(n.attribute(ATTR_CONSISTENCY_CHECK_SKIPPED)))
- continue;
-
- if(!changeablePoolSize)
- checkRebalanceConfiguration(n);
-
- checkTransactionConfiguration(n);
-
- checkMemoryConfiguration(n);
-
- DeploymentMode locDepMode = ctx.config().getDeploymentMode();
- DeploymentMode rmtDepMode = n.attribute(IgniteNodeAttributes.ATTR_DEPLOYMENT_MODE);
-
- CU.checkAttributeMismatch(log, null, n.id(), "deploymentMode", "Deployment mode",
- locDepMode, rmtDepMode, true);
- }
- }
-
/** {@inheritDoc} */
@Override public void stop(boolean cancel) throws IgniteCheckedException {
stopCaches(cancel);
@@ -1248,7 +727,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
* @param cancel Cancel.
*/
public void stopCaches(boolean cancel) {
- for (String cacheName : stopSeq) {
+ for (String cacheName : locCfgMgr.stopSequence()) {
GridCacheAdapter<?, ?> cache = stoppedCaches.remove(cacheName);
if (cache != null)
@@ -1315,7 +794,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
aff.cancelFutures(affErr);
}
- for (String cacheName : stopSeq) {
+ for (String cacheName : locCfgMgr.stopSequence()) {
GridCacheAdapter<?, ?> cache = caches.remove(cacheName);
if (cache != null) {
@@ -1680,7 +1159,16 @@ public class GridCacheProcessor extends GridProcessorAdapter {
CacheStore cfgStore = cfg.getCacheStoreFactory() != null ? cfg.getCacheStoreFactory().create() : null;
- validate(ctx.config(), cfg, desc.cacheType(), cfgStore);
+ ValidationOnNodeJoinUtils.validate(ctx.config(), cfg, desc.cacheType(), cfgStore, ctx, log, (x, y) -> {
+ try {
+ assertParameter(x, y);
+ }
+ catch (IgniteCheckedException ex) {
+ return ex;
+ }
+
+ return null;
+ });
if (pluginMgr == null)
pluginMgr = new CachePluginManager(ctx, cfg);
@@ -2131,10 +1619,9 @@ public class GridCacheProcessor extends GridProcessorAdapter {
* @return Caches to be started when this node starts.
*/
@Nullable public LocalJoinCachesContext localJoinCachesContext() {
- if (ctx.discovery().localNode().order() == 1 && localConfigs != null)
- cachesInfo.filterDynamicCacheDescriptors(localConfigs);
-
- localConfigs = null;
+ if (ctx.discovery().localNode().order() == 1 && alreadyFiltered.compareAndSet(false, true)) {
+ cachesInfo.filterDynamicCacheDescriptors(locCfgMgr.localCachesOnStart());
+ }
return cachesInfo.localJoinCachesContext();
}
@@ -3237,6 +2724,32 @@ public class GridCacheProcessor extends GridProcessorAdapter {
}
/**
+ * @param rmtNode Remote node to check.
+ * @return Data storage configuration
+ */
+ private DataStorageConfiguration extractDataStorage(ClusterNode rmtNode) {
+ return GridCacheUtils.extractDataStorage(
+ rmtNode,
+ ctx.marshallerContext().jdkMarshaller(),
+ U.resolveClassLoader(ctx.config())
+ );
+ }
+
+ /**
+ * @param dataStorageCfg User-defined data regions.
+ */
+ private Map<String, DataRegionConfiguration> dataRegionCfgs(DataStorageConfiguration dataStorageCfg) {
+ if (dataStorageCfg != null) {
+ return Optional.ofNullable(dataStorageCfg.getDataRegionConfigurations())
+ .map(Stream::of)
+ .orElseGet(Stream::empty)
+ .collect(Collectors.toMap(DataRegionConfiguration::getName, e -> e));
+ }
+
+ return Collections.emptyMap();
+ }
+
+ /**
* Force checkpoint and remove offheap checkpoint listener after it was finished.
*
* @param grpToStop Cache group to stop.
@@ -3503,7 +3016,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
/** {@inheritDoc} */
@Override public void collectGridNodeData(DiscoveryDataBag dataBag) {
- cachesInfo.collectGridNodeData(dataBag);
+ cachesInfo.collectGridNodeData(dataBag, backwardCompatibleSplitter());
}
/** {@inheritDoc} */
@@ -3525,132 +3038,12 @@ public class GridCacheProcessor extends GridProcessorAdapter {
if(!cachesInfo.isMergeConfigSupports(node))
return null;
- if (discoData.hasJoiningNodeData() && discoData.joiningNodeData() instanceof CacheJoinNodeDiscoveryData) {
- CacheJoinNodeDiscoveryData nodeData = (CacheJoinNodeDiscoveryData)discoData.joiningNodeData();
-
- boolean isGridActive = ctx.state().clusterState().active();
-
- StringBuilder errorMsg = new StringBuilder();
-
- SecurityContext secCtx = null;
+ String validationRes = cachesInfo.validateJoiningNodeData(discoData);
- if (ctx.security().enabled()) {
- try {
- secCtx = nodeSecurityContext(marsh, U.resolveClassLoader(ctx.config()), node);
- }
- catch (SecurityException se) {
- errorMsg.append(se.getMessage());
- }
- }
-
- if (!node.isClient()) {
- validateRmtRegions(node).forEach(error -> {
- if (errorMsg.length() > 0)
- errorMsg.append("\n");
+ if (validationRes != null)
+ return new IgniteNodeValidationResult(node.id(), validationRes, validationRes);
- errorMsg.append(error);
- });
- }
-
- for (CacheJoinNodeDiscoveryData.CacheInfo cacheInfo : nodeData.caches().values()) {
- if (secCtx != null && cacheInfo.cacheType() == CacheType.USER) {
- try (OperationSecurityContext s = ctx.security().withContext(secCtx)) {
- authorizeCacheCreate(cacheInfo.cacheData().config());
- }
- catch (SecurityException ex) {
- if (errorMsg.length() > 0)
- errorMsg.append("\n");
-
- errorMsg.append(ex.getMessage());
- }
- }
-
- DynamicCacheDescriptor locDesc = cacheDescriptor(cacheInfo.cacheData().config().getName());
-
- if (locDesc == null)
- continue;
-
- QuerySchemaPatch schemaPatch = locDesc.makeSchemaPatch(cacheInfo.cacheData().queryEntities());
-
- if (schemaPatch.hasConflicts() || (isGridActive && !schemaPatch.isEmpty())) {
- if (errorMsg.length() > 0)
- errorMsg.append("\n");
-
- if (schemaPatch.hasConflicts())
- errorMsg.append(String.format(MERGE_OF_CONFIG_CONFLICTS_MESSAGE,
- locDesc.cacheName(), schemaPatch.getConflictsMessage()));
- else
- errorMsg.append(String.format(MERGE_OF_CONFIG_REQUIRED_MESSAGE, locDesc.cacheName()));
- }
-
- // This check must be done on join, otherwise group encryption key will be
- // written to metastore regardless of validation check and could trigger WAL write failures.
- boolean locEnc = locDesc.cacheConfiguration().isEncryptionEnabled();
- boolean rmtEnc = cacheInfo.cacheData().config().isEncryptionEnabled();
-
- if (locEnc != rmtEnc) {
- if (errorMsg.length() > 0)
- errorMsg.append("\n");
-
- // Message will be printed on remote node, so need to swap local and remote.
- errorMsg.append(String.format(ENCRYPT_MISMATCH_MESSAGE, locDesc.cacheName(), rmtEnc, locEnc));
- }
- }
-
- if (errorMsg.length() > 0) {
- String msg = errorMsg.toString();
-
- return new IgniteNodeValidationResult(node.id(), msg);
- }
- }
-
- return null;
- }
-
- /**
- * @param rmtNode Joining node.
- * @return List of validation errors.
- */
- private List<String> validateRmtRegions(ClusterNode rmtNode) {
- List<String> errorMessages = new ArrayList<>();
-
- DataStorageConfiguration rmtStorageCfg = extractDataStorage(rmtNode);
- Map<String, DataRegionConfiguration> rmtRegionCfgs = dataRegionCfgs(rmtStorageCfg);
-
- DataStorageConfiguration locStorageCfg = ctx.config().getDataStorageConfiguration();
-
- if (isDefaultDataRegionPersistent(locStorageCfg) != isDefaultDataRegionPersistent(rmtStorageCfg)) {
- errorMessages.add(String.format(
- INVALID_REGION_CONFIGURATION_MESSAGE,
- "DEFAULT",
- ctx.localNodeId(),
- isDefaultDataRegionPersistent(locStorageCfg),
- rmtNode.id(),
- isDefaultDataRegionPersistent(rmtStorageCfg)
- ));
- }
-
- for (ClusterNode clusterNode : ctx.discovery().aliveServerNodes()) {
- Map<String, DataRegionConfiguration> nodeRegionCfg = dataRegionCfgs(extractDataStorage(clusterNode));
-
- for (Map.Entry<String, DataRegionConfiguration> nodeRegionCfgEntry : nodeRegionCfg.entrySet()) {
- String regionName = nodeRegionCfgEntry.getKey();
-
- DataRegionConfiguration rmtRegionCfg = rmtRegionCfgs.get(regionName);
-
- if (rmtRegionCfg != null && rmtRegionCfg.isPersistenceEnabled() != nodeRegionCfgEntry.getValue().isPersistenceEnabled())
- errorMessages.add(String.format(
- INVALID_REGION_CONFIGURATION_MESSAGE,
- regionName,
- ctx.localNodeId(),
- nodeRegionCfgEntry.getValue().isPersistenceEnabled(),
- rmtNode.id(),
- rmtRegionCfg.isPersistenceEnabled()
- ));
- }
- }
-
- return errorMessages;
+ return ValidationOnNodeJoinUtils.validateNode(node, discoData, marsh, ctx, this::cacheDescriptor);
}
/**
@@ -3836,13 +3229,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
}
/**
- * @return {@code True} if need locally start all existing caches on client node start.
- */
- private boolean startAllCachesOnClientStart() {
- return startClientCaches && ctx.clientNode();
- }
-
- /**
* Dynamically starts cache using template configuration.
*
* @param cacheName Cache name.
@@ -4297,7 +3683,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
}
/** Resolve cache type for input cacheType */
- @NotNull private CacheType resolveCacheType(CacheConfiguration ccfg) {
+ private @NotNull CacheType resolveCacheType(CacheConfiguration ccfg) {
if (CU.isUtilityCache(ccfg.getName()))
return CacheType.UTILITY;
else if (internalCaches.contains(ccfg.getName()))
@@ -4384,7 +3770,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
* @param destroy Cache data destroy flag. Setting to {@code true} will cause removing all cache data from store.
* @return Future that will be completed when cache is destroyed.
*/
- @NotNull public DynamicCacheChangeRequest createStopRequest(String cacheName, boolean restart, IgniteUuid restartId, boolean destroy) {
+ public @NotNull DynamicCacheChangeRequest createStopRequest(String cacheName, boolean restart, IgniteUuid restartId, boolean destroy) {
DynamicCacheChangeRequest req = DynamicCacheChangeRequest.stopRequest(ctx, cacheName, false, true);
req.stop(true);
@@ -4401,7 +3787,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
* @param reqs cache stop requests.
* @return compound future.
*/
- @NotNull public IgniteInternalFuture<?> dynamicChangeCaches(List<DynamicCacheChangeRequest> reqs) {
+ public @NotNull IgniteInternalFuture<?> dynamicChangeCaches(List<DynamicCacheChangeRequest> reqs) {
GridCompoundFuture<?, ?> compoundFut = new GridCompoundFuture<>();
for (DynamicCacheStartFuture fut : initiateCacheChanges(reqs))
@@ -4522,22 +3908,19 @@ public class GridCacheProcessor extends GridProcessorAdapter {
public void saveCacheConfiguration(DynamicCacheDescriptor desc) throws IgniteCheckedException {
assert desc != null;
- if (sharedCtx.pageStore() != null && !sharedCtx.kernalContext().clientNode() &&
- isPersistentCache(desc.cacheConfiguration(), sharedCtx.gridConfig().getDataStorageConfiguration()))
- sharedCtx.pageStore().storeCacheData(desc.toStoredData(splitter), true);
+ locCfgMgr.saveCacheConfiguration(desc.toStoredData(splitter), true);
}
/**
* Save cache configuration to persistent store if necessary.
*
* @param storedCacheData Stored cache data.
+ * @param overwrite Overwrite existing.
*/
- public void saveCacheConfiguration(StoredCacheData storedCacheData) throws IgniteCheckedException {
+ public void saveCacheConfiguration(StoredCacheData storedCacheData, boolean overwrite) throws IgniteCheckedException {
assert storedCacheData != null;
- if (sharedCtx.pageStore() != null && !sharedCtx.kernalContext().clientNode() &&
- isPersistentCache(storedCacheData.config(), sharedCtx.gridConfig().getDataStorageConfiguration()))
- sharedCtx.pageStore().storeCacheData(storedCacheData, true);
+ locCfgMgr.saveCacheConfiguration(storedCacheData, overwrite);
}
/**
@@ -4635,8 +4018,9 @@ public class GridCacheProcessor extends GridProcessorAdapter {
* Authorize creating cache.
*
* @param cfg Cache configuration.
+ * @param ctx Kernal context.
*/
- private void authorizeCacheCreate(CacheConfiguration cfg) {
+ static void authorizeCacheCreate(CacheConfiguration cfg, GridKernalContext ctx) {
if(cfg != null) {
ctx.security().authorize(cfg.getName(), SecurityPermission.CACHE_CREATE);
@@ -4656,14 +4040,14 @@ public class GridCacheProcessor extends GridProcessorAdapter {
if (req.stop())
ctx.security().authorize(req.cacheName(), SecurityPermission.CACHE_DESTROY);
else
- authorizeCacheCreate(req.startCacheConfiguration());
+ authorizeCacheCreate(req.startCacheConfiguration(), ctx);
}
}
/**
* @return Non null exception if node is stopping or disconnected.
*/
- @Nullable private IgniteCheckedException checkNodeState() {
+ private @Nullable IgniteCheckedException checkNodeState() {
if (ctx.isStopping()) {
return new IgniteCheckedException("Failed to execute dynamic cache change request, " +
"node is stopping.");
@@ -4747,42 +4131,9 @@ public class GridCacheProcessor extends GridProcessorAdapter {
return false;
}
- /**
- * Checks that preload-order-dependant caches has SYNC or ASYNC preloading mode.
- *
- * @param cfgs Caches.
- * @return Maximum detected preload order.
- * @throws IgniteCheckedException If validation failed.
- */
- private int validatePreloadOrder(CacheConfiguration[] cfgs) throws IgniteCheckedException {
- int maxOrder = 0;
-
- for (CacheConfiguration cfg : cfgs) {
- int rebalanceOrder = cfg.getRebalanceOrder();
-
- if (rebalanceOrder > 0) {
- if (cfg.getCacheMode() == LOCAL)
- throw new IgniteCheckedException("Rebalance order set for local cache (fix configuration and restart the " +
- "node): " + U.maskName(cfg.getName()));
-
- if (cfg.getRebalanceMode() == CacheRebalanceMode.NONE)
- throw new IgniteCheckedException("Only caches with SYNC or ASYNC rebalance mode can be set as rebalance " +
- "dependency for other caches [cacheName=" + U.maskName(cfg.getName()) +
- ", rebalanceMode=" + cfg.getRebalanceMode() + ", rebalanceOrder=" + cfg.getRebalanceOrder() + ']');
-
- maxOrder = Math.max(maxOrder, rebalanceOrder);
- }
- else if (rebalanceOrder < 0)
- throw new IgniteCheckedException("Rebalance order cannot be negative for cache (fix configuration and restart " +
- "the node) [cacheName=" + cfg.getName() + ", rebalanceOrder=" + rebalanceOrder + ']');
- }
-
- return maxOrder;
- }
-
/** {@inheritDoc} */
- @Nullable @Override public IgniteNodeValidationResult validateNode(ClusterNode node) {
- IgniteNodeValidationResult res = validateHashIdResolvers(node);
+ @Override public @Nullable IgniteNodeValidationResult validateNode(ClusterNode node) {
+ IgniteNodeValidationResult res = validateHashIdResolvers(node, ctx, cacheDescriptors());
if (res == null)
res = validateRestartingCaches(node);
@@ -4814,145 +4165,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
}
/**
- * @param node Joining node.
- * @return Validation result or {@code null} in case of success.
- */
- @Nullable private IgniteNodeValidationResult validateHashIdResolvers(ClusterNode node) {
- if (!node.isClient()) {
- for (DynamicCacheDescriptor desc : cacheDescriptors().values()) {
- CacheConfiguration cfg = desc.cacheConfiguration();
-
- if (cfg.getAffinity() instanceof RendezvousAffinityFunction) {
- RendezvousAffinityFunction aff = (RendezvousAffinityFunction)cfg.getAffinity();
-
- Object nodeHashObj = aff.resolveNodeHash(node);
-
- for (ClusterNode topNode : ctx.discovery().aliveServerNodes()) {
- Object topNodeHashObj = aff.resolveNodeHash(topNode);
-
- if (nodeHashObj.hashCode() == topNodeHashObj.hashCode()) {
- String errMsg = "Failed to add node to topology because it has the same hash code for " +
- "partitioned affinity as one of existing nodes [cacheName=" +
- cfg.getName() + ", existingNodeId=" + topNode.id() + ']';
-
- String sndMsg = "Failed to add node to topology because it has the same hash code for " +
- "partitioned affinity as one of existing nodes [cacheName=" +
- cfg.getName() + ", existingNodeId=" + topNode.id() + ']';
-
- return new IgniteNodeValidationResult(topNode.id(), errMsg, sndMsg);
- }
- }
- }
- }
- }
-
- return null;
- }
-
- /**
- * @param rmt Remote node to check.
- * @throws IgniteCheckedException If check failed.
- */
- private void checkTransactionConfiguration(ClusterNode rmt) throws IgniteCheckedException {
- TransactionConfiguration rmtTxCfg = rmt.attribute(ATTR_TX_CONFIG);
-
- if (rmtTxCfg != null) {
- TransactionConfiguration locTxCfg = ctx.config().getTransactionConfiguration();
-
- checkDeadlockDetectionConfig(rmt, rmtTxCfg, locTxCfg);
-
- checkSerializableEnabledConfig(rmt, rmtTxCfg, locTxCfg);
- }
- }
-
- /** */
- private void checkDeadlockDetectionConfig(ClusterNode rmt, TransactionConfiguration rmtTxCfg,
- TransactionConfiguration locTxCfg) {
- boolean locDeadlockDetectionEnabled = locTxCfg.getDeadlockTimeout() > 0;
- boolean rmtDeadlockDetectionEnabled = rmtTxCfg.getDeadlockTimeout() > 0;
-
- if (locDeadlockDetectionEnabled != rmtDeadlockDetectionEnabled) {
- U.warn(log, "Deadlock detection is enabled on one node and disabled on another. " +
- "Disabled detection on one node can lead to undetected deadlocks. [rmtNodeId=" + rmt.id() +
- ", locDeadlockTimeout=" + locTxCfg.getDeadlockTimeout() +
- ", rmtDeadlockTimeout=" + rmtTxCfg.getDeadlockTimeout());
- }
- }
-
- /** */
- private void checkSerializableEnabledConfig(ClusterNode rmt, TransactionConfiguration rmtTxCfg,
- TransactionConfiguration locTxCfg) throws IgniteCheckedException {
- if (locTxCfg.isTxSerializableEnabled() != rmtTxCfg.isTxSerializableEnabled())
- throw new IgniteCheckedException("Serializable transactions enabled mismatch " +
- "(fix txSerializableEnabled property or set -D" + IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK + "=true " +
- "system property) [rmtNodeId=" + rmt.id() +
- ", locTxSerializableEnabled=" + locTxCfg.isTxSerializableEnabled() +
- ", rmtTxSerializableEnabled=" + rmtTxCfg.isTxSerializableEnabled() + ']');
- }
-
- /**
- * @param rmt Remote node to check.
- * @throws IgniteCheckedException If check failed.
- */
- private void checkMemoryConfiguration(ClusterNode rmt) throws IgniteCheckedException {
- ClusterNode locNode = ctx.discovery().localNode();
-
- if (ctx.config().isClientMode() || locNode.isDaemon() || rmt.isClient() || rmt.isDaemon())
- return;
-
- DataStorageConfiguration dsCfg = null;
-
- Object dsCfgBytes = rmt.attribute(IgniteNodeAttributes.ATTR_DATA_STORAGE_CONFIG);
-
- if (dsCfgBytes instanceof byte[])
- dsCfg = new JdkMarshaller().unmarshal((byte[])dsCfgBytes, U.resolveClassLoader(ctx.config()));
-
- if (dsCfg == null) {
- // Try to use legacy memory configuration.
- MemoryConfiguration memCfg = rmt.attribute(IgniteNodeAttributes.ATTR_MEMORY_CONFIG);
-
- if (memCfg != null) {
- dsCfg = new DataStorageConfiguration();
-
- // All properties that are used in validation should be converted here.
- dsCfg.setPageSize(memCfg.getPageSize());
- }
- }
-
- if (dsCfg != null) {
- DataStorageConfiguration locDsCfg = ctx.config().getDataStorageConfiguration();
-
- if (dsCfg.getPageSize() != locDsCfg.getPageSize()) {
- throw new IgniteCheckedException("Memory configuration mismatch (fix configuration or set -D" +
- IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK + "=true system property) [rmtNodeId=" + rmt.id() +
- ", locPageSize = " + locDsCfg.getPageSize() + ", rmtPageSize = " + dsCfg.getPageSize() + "]");
- }
- }
- }
-
- /**
- * @param rmt Remote node to check.
- * @throws IgniteCheckedException If check failed.
- */
- private void checkRebalanceConfiguration(ClusterNode rmt) throws IgniteCheckedException {
- ClusterNode locNode = ctx.discovery().localNode();
-
- if (ctx.config().isClientMode() || locNode.isDaemon() || rmt.isClient() || rmt.isDaemon())
- return;
-
- Integer rebalanceThreadPoolSize = rmt.attribute(IgniteNodeAttributes.ATTR_REBALANCE_POOL_SIZE);
-
- if (rebalanceThreadPoolSize != null && rebalanceThreadPoolSize != ctx.config().getRebalanceThreadPoolSize()) {
- throw new IgniteCheckedException("Rebalance configuration mismatch (fix configuration or set -D" +
- IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK + "=true system property)." +
- " Different values of such parameter may lead to rebalance process instability and hanging. " +
- " [rmtNodeId=" + rmt.id() +
- ", locRebalanceThreadPoolSize = " + ctx.config().getRebalanceThreadPoolSize() +
- ", rmtRebalanceThreadPoolSize = " + rebalanceThreadPoolSize + "]");
- }
- }
-
- /**
* @param cfg Cache configuration.
* @return Query manager.
*/
@@ -5072,23 +4284,16 @@ public class GridCacheProcessor extends GridProcessorAdapter {
* @return All configured cache instances.
*/
public Collection<IgniteInternalCache<?, ?>> caches() {
- return F.viewReadOnly(jCacheProxies.values(), new IgniteClosure<IgniteCacheProxy<?, ?>,
- IgniteInternalCache<?, ?>>() {
- @Override public IgniteInternalCache<?, ?> apply(IgniteCacheProxy<?, ?> entries) {
- return entries.internalProxy();
- }
- });
+ return F.viewReadOnly(jCacheProxies.values(),
+ (IgniteClosure<IgniteCacheProxy<?, ?>, IgniteInternalCache<?, ?>>)IgniteCacheProxy::internalProxy);
}
/**
* @return All configured cache instances.
*/
public Collection<IgniteCacheProxy<?, ?>> jcaches() {
- return F.viewReadOnly(jCacheProxies.values(), new IgniteClosure<IgniteCacheProxyImpl<?, ?>, IgniteCacheProxy<?, ?>>() {
- @Override public IgniteCacheProxy<?, ?> apply(IgniteCacheProxyImpl<?, ?> proxy) {
- return proxy.gatewayWrapper();
- }
- });
+ return F.viewReadOnly(jCacheProxies.values(),
+ (IgniteClosure<IgniteCacheProxyImpl<?, ?>, IgniteCacheProxy<?, ?>>)IgniteCacheProxyImpl::gatewayWrapper);
}
/**
@@ -5178,7 +4383,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
* @return Cache instance for given name.
* @throws IgniteCheckedException If failed.
*/
- @Nullable public <K, V> IgniteCacheProxy<K, V> publicJCache(String cacheName,
+ @SuppressWarnings({"unchecked", "ConstantConditions"})
+ public @Nullable <K, V> IgniteCacheProxy<K, V> publicJCache(String cacheName,
boolean failIfNotStarted,
boolean checkThreadTx) throws IgniteCheckedException {
assert cacheName != null;
@@ -5296,7 +4502,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
* @param cacheId Cache ID.
* @return Cache descriptor.
*/
- @Nullable public DynamicCacheDescriptor cacheDescriptor(int cacheId) {
+ public @Nullable DynamicCacheDescriptor cacheDescriptor(int cacheId) {
for (DynamicCacheDescriptor cacheDesc : cacheDescriptors().values()) {
CacheConfiguration ccfg = cacheDesc.cacheConfiguration();
@@ -5323,7 +4529,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
if (desc != null)
return;
- DynamicCacheChangeRequest req = DynamicCacheChangeRequest.addTemplateRequest(ctx, cacheCfg);
+ DynamicCacheChangeRequest req = DynamicCacheChangeRequest.addTemplateRequest(ctx, cacheCfg,
+ backwardCompatibleSplitter().split(cacheCfg));
TemplateConfigurationFuture fut = new TemplateConfigurationFuture(req.cacheName(), req.deploymentId());
@@ -5393,7 +4600,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
* @param awaitInit Await proxy initialization.
* @return Cache proxy.
*/
- @Nullable public IgniteCacheProxyImpl<?, ?> jcacheProxy(String name, boolean awaitInit) {
+ public @Nullable IgniteCacheProxyImpl<?, ?> jcacheProxy(String name, boolean awaitInit) {
IgniteCacheProxyImpl<?, ?> cache = jCacheProxies.get(name);
if (awaitInit)
@@ -5407,7 +4614,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
* @param proxy Cache proxy.
* @return Previous cache proxy.
*/
- @Nullable public IgniteCacheProxyImpl<?, ?> addjCacheProxy(String name, IgniteCacheProxyImpl<?, ?> proxy) {
+ public @Nullable IgniteCacheProxyImpl<?, ?> addjCacheProxy(String name, IgniteCacheProxyImpl<?, ?> proxy) {
return jCacheProxies.putIfAbsent(name, proxy);
}
@@ -5666,7 +4873,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
* @return Configuration copy.
* @throws IgniteCheckedException If validation failed.
*/
- private CacheConfiguration cloneCheckSerializable(final CacheConfiguration val) throws IgniteCheckedException {
+ CacheConfiguration cloneCheckSerializable(final CacheConfiguration val) throws IgniteCheckedException {
if (val == null)
return null;
@@ -6015,7 +5222,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
/**
* @param oldFormat Old format.
*/
- public CacheConfigurationSplitter splitter(boolean oldFormat) {
+ private CacheConfigurationSplitter splitter(boolean oldFormat) {
// Requesting splitter with old format support is rare operation.
// It's acceptable to allocate it every time by request.
return oldFormat ? new CacheConfigurationSplitterOldFormat(enricher) : splitter;
@@ -6034,7 +5241,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
*
* @return Cache configuration splitter with or without old format support depending on cluster state.
*/
- public CacheConfigurationSplitter backwardCompatibleSplitter() {
+ private CacheConfigurationSplitter backwardCompatibleSplitter() {
IgniteDiscoverySpi spi = (IgniteDiscoverySpi) ctx.discovery().getInjectedDiscoverySpi();
boolean oldFormat = !spi.allNodesSupport(IgniteFeatures.SPLITTED_CACHE_CONFIGURATIONS);
@@ -6072,7 +5279,9 @@ public class GridCacheProcessor extends GridProcessorAdapter {
/** {@inheritDoc} */
@Override public void onReadyForRead(ReadOnlyMetastorage metastorage) throws IgniteCheckedException {
- restoreCacheConfigurations();
+ CacheJoinNodeDiscoveryData data = locCfgMgr.restoreCacheConfigurations();
+
+ cachesInfo.onStart(data);
}
/** {@inheritDoc} */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridLocalConfigManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridLocalConfigManager.java
new file mode 100644
index 0000000..c59b3fb
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridLocalConfigManager.java
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteUuid;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
+import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isPersistentCache;
+
+/**
+ * Responsible for restoring local cache configurations (both from static configuration and persistence).
+ * Keep stop sequence of caches and caches which were presented on node before node join.
+ */
+public class GridLocalConfigManager {
+ /** */
+ private final boolean startClientCaches =
+ IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_START_CACHES_ON_JOIN, false);
+
+ /** Caches stop sequence. */
+ private final Deque<String> stopSeq = new LinkedList<>();
+
+ /** Logger. */
+ private final IgniteLogger log;
+
+ /** Node's local caches on start (both from static configuration and from persistent caches). */
+ private Set<String> localCachesOnStart;
+
+ /** Cache processor. */
+ private final GridCacheProcessor cacheProcessor;
+
+ /** Context. */
+ private final GridKernalContext ctx;
+
+ /**
+ * @param cacheProcessor Cache processor.
+ * @param kernalCtx Kernal context.
+ */
+ public GridLocalConfigManager(
+ GridCacheProcessor cacheProcessor,
+ GridKernalContext kernalCtx
+ ) {
+ this.cacheProcessor = cacheProcessor;
+ ctx = kernalCtx;
+ log = ctx.log(getClass());
+ }
+
+ /**
+ * Save cache configuration to persistent store if necessary.
+ *
+ * @param storedCacheData Stored cache data.
+ * @param overwrite Overwrite existing.
+ */
+ public void saveCacheConfiguration(StoredCacheData storedCacheData, boolean overwrite) throws IgniteCheckedException {
+ assert storedCacheData != null;
+
+ GridCacheSharedContext<Object, Object> sharedContext = cacheProcessor.context();
+
+ if (sharedContext.pageStore() != null
+ && !sharedContext.kernalContext().clientNode()
+ && isPersistentCache(storedCacheData.config(), sharedContext.gridConfig().getDataStorageConfiguration()))
+ sharedContext.pageStore().storeCacheData(storedCacheData, overwrite);
+ }
+
+ /**
+ *
+ */
+ public Collection<String> stopSequence() {
+ return stopSeq;
+ }
+
+ /**
+ * @return Caches to be started when this node starts.
+ */
+ public Set<String> localCachesOnStart() {
+ return localCachesOnStart;
+ }
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ public CacheJoinNodeDiscoveryData restoreCacheConfigurations() throws IgniteCheckedException {
+ if (ctx.isDaemon())
+ return null;
+
+ Map<String, CacheJoinNodeDiscoveryData.CacheInfo> caches = new HashMap<>();
+
+ Map<String, CacheJoinNodeDiscoveryData.CacheInfo> templates = new HashMap<>();
+
+ restoreCaches(caches, templates, ctx.config(), ctx.cache().context().pageStore());
+
+ CacheJoinNodeDiscoveryData discoData = new CacheJoinNodeDiscoveryData(
+ IgniteUuid.randomUuid(),
+ caches,
+ templates,
+ startAllCachesOnClientStart()
+ );
+
+ localCachesOnStart = new HashSet<>(discoData.caches().keySet());
+
+ return discoData;
+ }
+
+ /**
+ * @return {@code True} if need locally start all existing caches on client node start.
+ */
+ private boolean startAllCachesOnClientStart() {
+ return startClientCaches && ctx.clientNode();
+ }
+
+ /**
+ * @param caches Caches accumulator.
+ * @param templates Templates accumulator.
+ * @param config Ignite configuration.
+ * @param pageStoreManager Page store manager.
+ */
+ private void restoreCaches(
+ Map<String, CacheJoinNodeDiscoveryData.CacheInfo> caches,
+ Map<String, CacheJoinNodeDiscoveryData.CacheInfo> templates,
+ IgniteConfiguration config,
+ IgnitePageStoreManager pageStoreManager
+ ) throws IgniteCheckedException {
+ assert !config.isDaemon() : "Trying to restore cache configurations on daemon node.";
+
+ CacheConfiguration[] cfgs = config.getCacheConfiguration();
+
+ for (int i = 0; i < cfgs.length; i++) {
+ CacheConfiguration<?, ?> cfg = new CacheConfiguration(cfgs[i]);
+
+ // Replace original configuration value.
+ cfgs[i] = cfg;
+
+ addCacheFromConfiguration(cfg, false, caches, templates);
+ }
+
+ if (CU.isPersistenceEnabled(config) && pageStoreManager != null) {
+ Map<String, StoredCacheData> storedCaches = pageStoreManager.readCacheConfigurations();
+
+ if (!F.isEmpty(storedCaches)) {
+ List<String> skippedConfigs = new ArrayList<>();
+
+ for (StoredCacheData storedCacheData : storedCaches.values()) {
+ // Backward compatibility for old stored caches data.
+ if (storedCacheData.hasOldCacheConfigurationFormat()) {
+ storedCacheData = new StoredCacheData(storedCacheData);
+
+ T2<CacheConfiguration, CacheConfigurationEnrichment> splitCfg =
+ cacheProcessor.splitter().split(storedCacheData.config());
+
+ storedCacheData.config(splitCfg.get1());
+ storedCacheData.cacheConfigurationEnrichment(splitCfg.get2());
+
+ // Overwrite with new format.
+ saveCacheConfiguration(storedCacheData, true);
+ }
+
+ String cacheName = storedCacheData.config().getName();
+
+ CacheType type = ctx.cache().cacheType(cacheName);
+
+ if (!caches.containsKey(cacheName))
+ // No static cache - add the configuration.
+ addStoredCache(caches, storedCacheData, cacheName, type, true, false);
+ else {
+ // A static cache with the same name already exists.
+ CacheConfiguration cfg = caches.get(cacheName).cacheData().config();
+ CacheConfiguration cfgFromStore = storedCacheData.config();
+
+ validateCacheConfigurationOnRestore(cfg, cfgFromStore);
+
+ addStoredCache(caches, storedCacheData, cacheName, type, true,
+ cacheProcessor.keepStaticCacheConfiguration());
+
+ if (!cacheProcessor.keepStaticCacheConfiguration() && type == CacheType.USER)
+ skippedConfigs.add(cacheName);
+
+ }
+ }
+
+ if (!F.isEmpty(skippedConfigs)) {
+ U.warn(log, "Static configuration for the following caches will be ignored because a persistent " +
+ "cache with the same name already exist (see " +
+ "https://apacheignite.readme.io/docs/cache-configuration for more information): " +
+ skippedConfigs);
+ }
+ }
+ }
+ }
+
+ /**
+ * Add stored cache data to caches storage.
+ *
+ * @param caches Cache storage.
+ * @param cacheData Cache data to add.
+ * @param cacheName Cache name.
+ * @param cacheType Cache type.
+ * @param isStaticallyConfigured Statically configured flag.
+ */
+ private void addStoredCache(
+ Map<String, CacheJoinNodeDiscoveryData.CacheInfo> caches,
+ StoredCacheData cacheData,
+ String cacheName,
+ CacheType cacheType,
+ boolean persistedBefore,
+ boolean isStaticallyConfigured
+ ) {
+ if (!caches.containsKey(cacheName)) {
+ if (!cacheType.userCache())
+ stopSeq.addLast(cacheName);
+ else
+ stopSeq.addFirst(cacheName);
+ }
+
+ caches.put(cacheName, new CacheJoinNodeDiscoveryData.CacheInfo(cacheData, cacheType, cacheData.sql(),
+ persistedBefore ? 1 : 0, isStaticallyConfigured));
+ }
+
+ /**
+ * @param cfg Cache configuration.
+ * @param sql SQL flag.
+ * @param caches Caches map.
+ * @param templates Templates map.
+ * @throws IgniteCheckedException If failed.
+ */
+ private void addCacheFromConfiguration(
+ CacheConfiguration<?, ?> cfg,
+ boolean sql,
+ Map<String, CacheJoinNodeDiscoveryData.CacheInfo> caches,
+ Map<String, CacheJoinNodeDiscoveryData.CacheInfo> templates
+ ) throws IgniteCheckedException {
+ String cacheName = cfg.getName();
+
+ CU.validateCacheName(cacheName);
+
+ cacheProcessor.cloneCheckSerializable(cfg);
+
+ CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(cfg);
+
+ // Initialize defaults.
+ cacheProcessor.initialize(cfg, cacheObjCtx);
+
+ StoredCacheData cacheData = new StoredCacheData(cfg);
+
+ cacheData.sql(sql);
+
+ T2<CacheConfiguration, CacheConfigurationEnrichment> splitCfg = cacheProcessor.splitter().split(cfg);
+
+ cacheData.config(splitCfg.get1());
+ cacheData.cacheConfigurationEnrichment(splitCfg.get2());
+
+ cfg = splitCfg.get1();
+
+ if (GridCacheUtils.isCacheTemplateName(cacheName))
+ templates.put(cacheName, new CacheJoinNodeDiscoveryData.CacheInfo(cacheData, CacheType.USER, false, 0, true));
+ else {
+ if (caches.containsKey(cacheName)) {
+ throw new IgniteCheckedException("Duplicate cache name found (check configuration and " +
+ "assign unique name to each cache): " + cacheName);
+ }
+
+ CacheType cacheType = ctx.cache().cacheType(cacheName);
+
+ if (cacheType != CacheType.USER && cfg.getDataRegionName() == null)
+ cfg.setDataRegionName(cacheProcessor.context().database().systemDateRegionName());
+
+ addStoredCache(caches, cacheData, cacheName, cacheType, false, true);
+ }
+ }
+
+ /**
+ * Validates cache configuration against stored cache configuration when persistence is enabled.
+ *
+ * @param cfg Configured cache configuration.
+ * @param cfgFromStore Stored cache configuration
+ * @throws IgniteCheckedException If validation failed.
+ */
+ private void validateCacheConfigurationOnRestore(CacheConfiguration cfg, CacheConfiguration cfgFromStore)
+ throws IgniteCheckedException {
+ assert cfg != null && cfgFromStore != null;
+
+ if ((cfg.getAtomicityMode() == TRANSACTIONAL_SNAPSHOT ||
+ cfgFromStore.getAtomicityMode() == TRANSACTIONAL_SNAPSHOT)
+ && cfg.getAtomicityMode() != cfgFromStore.getAtomicityMode()) {
+ throw new IgniteCheckedException("Cannot start cache. Statically configured atomicity mode differs from " +
+ "previously stored configuration. Please check your configuration: [cacheName=" + cfg.getName() +
+ ", configuredAtomicityMode=" + cfg.getAtomicityMode() +
+ ", storedAtomicityMode=" + cfgFromStore.getAtomicityMode() + "]");
+ }
+
+ boolean staticCfgVal = cfg.isEncryptionEnabled();
+
+ boolean storedVal = cfgFromStore.isEncryptionEnabled();
+
+ if (storedVal != staticCfgVal) {
+ throw new IgniteCheckedException("Encrypted flag value differs. Static config value is '" + staticCfgVal +
+ "' and value stored on the disk is '" + storedVal + "'");
+ }
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ValidationOnNodeJoinUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ValidationOnNodeJoinUtils.java
new file mode 100644
index 0000000..f275f28
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ValidationOnNodeJoinUtils.java
@@ -0,0 +1,746 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import javax.cache.configuration.FactoryBuilder;
+import javax.cache.expiry.EternalExpiryPolicy;
+import javax.cache.expiry.ExpiryPolicy;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.CacheRebalanceMode;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.store.CacheStore;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataPageEvictionMode;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.DeploymentMode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.MemoryConfiguration;
+import org.apache.ignite.configuration.TransactionConfiguration;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteFeatures;
+import org.apache.ignite.internal.IgniteNodeAttributes;
+import org.apache.ignite.internal.binary.BinaryMarshaller;
+import org.apache.ignite.internal.processors.cache.persistence.DataRegion;
+import org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor;
+import org.apache.ignite.internal.processors.query.QuerySchemaPatch;
+import org.apache.ignite.internal.processors.query.QueryUtils;
+import org.apache.ignite.internal.processors.security.OperationSecurityContext;
+import org.apache.ignite.internal.processors.security.SecurityContext;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteProductVersion;
+import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.marshaller.jdk.JdkMarshaller;
+import org.apache.ignite.plugin.security.SecurityException;
+import org.apache.ignite.spi.IgniteNodeValidationResult;
+import org.apache.ignite.spi.discovery.DiscoveryDataBag;
+import org.apache.ignite.spi.encryption.EncryptionSpi;
+import org.apache.ignite.spi.indexing.IndexingSpi;
+import org.apache.ignite.spi.indexing.noop.NoopIndexingSpi;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
+import static org.apache.ignite.cache.CacheMode.LOCAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.cache.CacheRebalanceMode.NONE;
+import static org.apache.ignite.cache.CacheRebalanceMode.SYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
+import static org.apache.ignite.configuration.DeploymentMode.ISOLATED;
+import static org.apache.ignite.configuration.DeploymentMode.PRIVATE;
+import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_CONSISTENCY_CHECK_SKIPPED;
+import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_TX_CONFIG;
+import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isDefaultDataRegionPersistent;
+import static org.apache.ignite.internal.processors.security.SecurityUtils.nodeSecurityContext;
+
+/**
+ * Util class for joining node validation.
+ */
+public class ValidationOnNodeJoinUtils {
+ /** Template of message of conflicts during configuration merge */
+ private static final String MERGE_OF_CONFIG_CONFLICTS_MESSAGE =
+ "Conflicts during configuration merge for cache '%s' : \n%s";
+
+ /** Template of message of node join was fail because it requires to merge of config */
+ private static final String MERGE_OF_CONFIG_REQUIRED_MESSAGE = "Failed to join node to the active cluster " +
+ "(the config of the cache '%s' has to be merged which is impossible on active grid). " +
+ "Deactivate grid and retry node join or clean the joining node.";
+
+ /** Template of message of failed node join because encryption settings are different for the same cache. */
+ private static final String ENCRYPT_MISMATCH_MESSAGE = "Failed to join node to the cluster " +
+ "(encryption settings are different for cache '%s' : local=%s, remote=%s.)";
+
+ /** Supports non default precision and scale for DECIMAL and VARCHAR types. */
+ private static final IgniteProductVersion PRECISION_SCALE_SINCE_VER = IgniteProductVersion.fromString("2.7.0");
+
+ /** Invalid region configuration message. */
+ private static final String INVALID_REGION_CONFIGURATION_MESSAGE = "Failed to join node " +
+ "(Incompatible data region configuration [region=%s, locNodeId=%s, isPersistenceEnabled=%s, rmtNodeId=%s, isPersistenceEnabled=%s])";
+
+ /**
+ * Checks a joining node to configuration consistency.
+ *
+ * @param node Node.
+ * @param discoData Disco data.
+ * @param marsh Marsh.
+ * @param ctx Context.
+ * @param cacheDescProvider Cache descriptor provider.
+ */
+ @Nullable static IgniteNodeValidationResult validateNode(
+ ClusterNode node,
+ DiscoveryDataBag.JoiningNodeDiscoveryData discoData,
+ Marshaller marsh,
+ GridKernalContext ctx,
+ Function<String, DynamicCacheDescriptor> cacheDescProvider
+ ) {
+ if (discoData.hasJoiningNodeData() && discoData.joiningNodeData() instanceof CacheJoinNodeDiscoveryData) {
+ CacheJoinNodeDiscoveryData nodeData = (CacheJoinNodeDiscoveryData)discoData.joiningNodeData();
+
+ boolean isGridActive = ctx.state().clusterState().active();
+
+ StringBuilder errorMsg = new StringBuilder();
+
+ if (!node.isClient()) {
+ validateRmtRegions(node, ctx).forEach(error -> {
+ if (errorMsg.length() > 0)
+ errorMsg.append("\n");
+
+ errorMsg.append(error);
+ });
+ }
+
+ SecurityContext secCtx = null;
+
+ if (ctx.security().enabled()) {
+ try {
+ secCtx = nodeSecurityContext(marsh, U.resolveClassLoader(ctx.config()), node);
+ }
+ catch (SecurityException se) {
+ errorMsg.append(se.getMessage());
+ }
+ }
+
+ for (CacheJoinNodeDiscoveryData.CacheInfo cacheInfo : nodeData.caches().values()) {
+ if (secCtx != null && cacheInfo.cacheType() == CacheType.USER) {
+ try (OperationSecurityContext s = ctx.security().withContext(secCtx)) {
+ GridCacheProcessor.authorizeCacheCreate(cacheInfo.cacheData().config(), ctx);
+ }
+ catch (SecurityException ex) {
+ if (errorMsg.length() > 0)
+ errorMsg.append("\n");
+
+ errorMsg.append(ex.getMessage());
+ }
+ }
+
+ DynamicCacheDescriptor locDesc = cacheDescProvider.apply(cacheInfo.cacheData().config().getName());
+
+ if (locDesc == null)
+ continue;
+
+ QuerySchemaPatch schemaPatch = locDesc.makeSchemaPatch(cacheInfo.cacheData().queryEntities());
+
+ if (schemaPatch.hasConflicts() || (isGridActive && !schemaPatch.isEmpty())) {
+ if (errorMsg.length() > 0)
+ errorMsg.append("\n");
+
+ if (schemaPatch.hasConflicts())
+ errorMsg.append(String.format(MERGE_OF_CONFIG_CONFLICTS_MESSAGE,
+ locDesc.cacheName(), schemaPatch.getConflictsMessage()));
+ else
+ errorMsg.append(String.format(MERGE_OF_CONFIG_REQUIRED_MESSAGE, locDesc.cacheName()));
+ }
+
+ // This check must be done on join, otherwise group encryption key will be
+ // written to metastore regardless of validation check and could trigger WAL write failures.
+ boolean locEnc = locDesc.cacheConfiguration().isEncryptionEnabled();
+ boolean rmtEnc = cacheInfo.cacheData().config().isEncryptionEnabled();
+
+ if (locEnc != rmtEnc) {
+ if (errorMsg.length() > 0)
+ errorMsg.append("\n");
+
+ // Message will be printed on remote node, so need to swap local and remote.
+ errorMsg.append(String.format(ENCRYPT_MISMATCH_MESSAGE, locDesc.cacheName(), rmtEnc, locEnc));
+ }
+ }
+
+ if (errorMsg.length() > 0) {
+ String msg = errorMsg.toString();
+
+ return new IgniteNodeValidationResult(node.id(), msg);
+ }
+ }
+ return null;
+ }
+
+ /**
+ * @param c Ignite configuration.
+ * @param cc Configuration to validate.
+ * @param cacheType Cache type.
+ * @param cfgStore Cache store.
+ * @param ctx Context.
+ * @param log Logger.
+ * @throws IgniteCheckedException If failed.
+ */
+ static void validate(
+ IgniteConfiguration c,
+ CacheConfiguration cc,
+ CacheType cacheType,
+ @Nullable CacheStore cfgStore,
+ GridKernalContext ctx,
+ IgniteLogger log,
+ BiFunction<Boolean, String, IgniteCheckedException> assertParam
+ ) throws IgniteCheckedException {
+ apply(assertParam, cc.getName() != null && !cc.getName().isEmpty(), "name is null or empty");
+
+ if (cc.getCacheMode() == REPLICATED) {
+ if (cc.getNearConfiguration() != null &&
+ ctx.discovery().cacheAffinityNode(ctx.discovery().localNode(), cc.getName())) {
+ U.warn(log, "Near cache cannot be used with REPLICATED cache, " +
+ "will be ignored [cacheName=" + U.maskName(cc.getName()) + ']');
+
+ cc.setNearConfiguration(null);
+ }
+ }
+
+ if (storesLocallyOnClient(c, cc, ctx))
+ throw new IgniteCheckedException("DataRegion for client caches must be explicitly configured " +
+ "on client node startup. Use DataStorageConfiguration to configure DataRegion.");
+
+ if (cc.getCacheMode() == LOCAL && !cc.getAffinity().getClass().equals(GridCacheProcessor.LocalAffinityFunction.class))
+ U.warn(log, "AffinityFunction configuration parameter will be ignored for local cache [cacheName=" +
+ U.maskName(cc.getName()) + ']');
+
+ if (cc.getAffinity().partitions() > CacheConfiguration.MAX_PARTITIONS_COUNT)
+ throw new IgniteCheckedException("Cannot have more than " + CacheConfiguration.MAX_PARTITIONS_COUNT +
+ " partitions [cacheName=" + cc.getName() + ", partitions=" + cc.getAffinity().partitions() + ']');
+
+ if (cc.getRebalanceMode() != CacheRebalanceMode.NONE) {
+ apply(assertParam, cc.getRebalanceBatchSize() > 0, "rebalanceBatchSize > 0");
+ apply(assertParam, cc.getRebalanceTimeout() >= 0, "rebalanceTimeout >= 0");
+ apply(assertParam, cc.getRebalanceThrottle() >= 0, "rebalanceThrottle >= 0");
+ apply(assertParam, cc.getRebalanceBatchesPrefetchCount() > 0, "rebalanceBatchesPrefetchCount > 0");
+ }
+
+ if (cc.getCacheMode() == PARTITIONED || cc.getCacheMode() == REPLICATED) {
+ if (cc.getAtomicityMode() == ATOMIC && cc.getWriteSynchronizationMode() == FULL_ASYNC)
+ U.warn(log, "Cache write synchronization mode is set to FULL_ASYNC. All single-key 'put' and " +
+ "'remove' operations will return 'null', all 'putx' and 'removex' operations will return" +
+ " 'true' [cacheName=" + U.maskName(cc.getName()) + ']');
+ }
+
+ DeploymentMode depMode = c.getDeploymentMode();
+
+ if (c.isPeerClassLoadingEnabled() && (depMode == PRIVATE || depMode == ISOLATED) &&
+ !CU.isSystemCache(cc.getName()) && !(c.getMarshaller() instanceof BinaryMarshaller))
+ throw new IgniteCheckedException("Cache can be started in PRIVATE or ISOLATED deployment mode only when" +
+ " BinaryMarshaller is used [depMode=" + ctx.config().getDeploymentMode() + ", marshaller=" +
+ c.getMarshaller().getClass().getName() + ']');
+
+ if (cc.getAffinity().partitions() > CacheConfiguration.MAX_PARTITIONS_COUNT)
+ throw new IgniteCheckedException("Affinity function must return at most " +
+ CacheConfiguration.MAX_PARTITIONS_COUNT + " partitions [actual=" + cc.getAffinity().partitions() +
+ ", affFunction=" + cc.getAffinity() + ", cacheName=" + cc.getName() + ']');
+
+ if (cc.getAtomicityMode() == TRANSACTIONAL_SNAPSHOT) {
+ apply(assertParam, cc.getCacheMode() != LOCAL,
+ "LOCAL cache mode cannot be used with TRANSACTIONAL_SNAPSHOT atomicity mode");
+
+ apply(assertParam, cc.getNearConfiguration() == null,
+ "near cache cannot be used with TRANSACTIONAL_SNAPSHOT atomicity mode");
+
+ apply(assertParam, !cc.isReadThrough(),
+ "readThrough cannot be used with TRANSACTIONAL_SNAPSHOT atomicity mode");
+
+ apply(assertParam, !cc.isWriteThrough(),
+ "writeThrough cannot be used with TRANSACTIONAL_SNAPSHOT atomicity mode");
+
+ apply(assertParam, !cc.isWriteBehindEnabled(),
+ "writeBehindEnabled cannot be used with TRANSACTIONAL_SNAPSHOT atomicity mode");
+
+ apply(assertParam, cc.getRebalanceMode() != NONE,
+ "Rebalance mode NONE cannot be used with TRANSACTIONAL_SNAPSHOT atomicity mode");
+
+ ExpiryPolicy expPlc = null;
+
+ if (cc.getExpiryPolicyFactory() instanceof FactoryBuilder.SingletonFactory)
+ expPlc = (ExpiryPolicy)cc.getExpiryPolicyFactory().create();
+
+ if (!(expPlc instanceof EternalExpiryPolicy)) {
+ apply(assertParam, cc.getExpiryPolicyFactory() == null,
+ "expiry policy cannot be used with TRANSACTIONAL_SNAPSHOT atomicity mode");
+ }
+
+ apply(assertParam, cc.getInterceptor() == null,
+ "interceptor cannot be used with TRANSACTIONAL_SNAPSHOT atomicity mode");
+
+ // Disable in-memory evictions for mvcc cache. TODO IGNITE-10738
+ String memPlcName = cc.getDataRegionName();
+ DataRegion dataRegion = ctx.cache().context().database().dataRegion(memPlcName);
+
+ if (dataRegion != null && !dataRegion.config().isPersistenceEnabled() &&
+ dataRegion.config().getPageEvictionMode() != DataPageEvictionMode.DISABLED) {
+ throw new IgniteCheckedException("Data pages evictions cannot be used with TRANSACTIONAL_SNAPSHOT " +
+ "cache atomicity mode for in-memory regions. Please, either disable evictions or enable " +
+ "persistence for data regions with TRANSACTIONAL_SNAPSHOT caches. [cacheName=" + cc.getName() +
+ ", dataRegionName=" + memPlcName + ", pageEvictionMode=" +
+ dataRegion.config().getPageEvictionMode() + ']');
+ }
+
+ IndexingSpi idxSpi = ctx.config().getIndexingSpi();
+
+ apply(assertParam, idxSpi == null || idxSpi instanceof NoopIndexingSpi,
+ "Custom IndexingSpi cannot be used with TRANSACTIONAL_SNAPSHOT atomicity mode");
+ }
+
+ if (cc.isWriteBehindEnabled() && ctx.discovery().cacheAffinityNode(ctx.discovery().localNode(), cc.getName())) {
+ if (cfgStore == null)
+ throw new IgniteCheckedException("Cannot enable write-behind (writer or store is not provided) " +
+ "for cache: " + U.maskName(cc.getName()));
+
+ apply(assertParam, cc.getWriteBehindBatchSize() > 0, "writeBehindBatchSize > 0");
+ apply(assertParam, cc.getWriteBehindFlushSize() >= 0, "writeBehindFlushSize >= 0");
+ apply(assertParam, cc.getWriteBehindFlushFrequency() >= 0, "writeBehindFlushFrequency >= 0");
+ apply(assertParam, cc.getWriteBehindFlushThreadCount() > 0, "writeBehindFlushThreadCount > 0");
+
+ if (cc.getWriteBehindFlushSize() == 0 && cc.getWriteBehindFlushFrequency() == 0)
+ throw new IgniteCheckedException("Cannot set both 'writeBehindFlushFrequency' and " +
+ "'writeBehindFlushSize' parameters to 0 for cache: " + U.maskName(cc.getName()));
+ }
+
+ if (cc.isReadThrough() && cfgStore == null
+ && ctx.discovery().cacheAffinityNode(ctx.discovery().localNode(), cc.getName()))
+ throw new IgniteCheckedException("Cannot enable read-through (loader or store is not provided) " +
+ "for cache: " + U.maskName(cc.getName()));
+
+ if (cc.isWriteThrough() && cfgStore == null
+ && ctx.discovery().cacheAffinityNode(ctx.discovery().localNode(), cc.getName()))
+ throw new IgniteCheckedException("Cannot enable write-through (writer or store is not provided) " +
+ "for cache: " + U.maskName(cc.getName()));
+
+ long delay = cc.getRebalanceDelay();
+
+ if (delay != 0) {
+ if (cc.getCacheMode() != PARTITIONED)
+ U.warn(log, "Rebalance delay is supported only for partitioned caches (will ignore): " + (cc.getName()));
+ else if (cc.getRebalanceMode() == SYNC) {
+ if (delay < 0) {
+ U.warn(log, "Ignoring SYNC rebalance mode with manual rebalance start (node will not wait for " +
+ "rebalancing to be finished): " + U.maskName(cc.getName()));
+ }
+ else {
+ U.warn(log, "Using SYNC rebalance mode with rebalance delay (node will wait until rebalancing is " +
+ "initiated for " + delay + "ms) for cache: " + U.maskName(cc.getName()));
+ }
+ }
+ }
+
+ ctx.coordinators().validateCacheConfiguration(cc);
+
+ if (cc.getAtomicityMode() == ATOMIC)
+ apply(assertParam, cc.getTransactionManagerLookupClassName() == null,
+ "transaction manager can not be used with ATOMIC cache");
+
+ if ((cc.getEvictionPolicyFactory() != null || cc.getEvictionPolicy() != null) && !cc.isOnheapCacheEnabled())
+ throw new IgniteCheckedException("Onheap cache must be enabled if eviction policy is configured [cacheName="
+ + U.maskName(cc.getName()) + "]");
+
+ if (cacheType != CacheType.DATA_STRUCTURES && DataStructuresProcessor.isDataStructureCache(cc.getName()))
+ throw new IgniteCheckedException("Using cache names reserved for datastructures is not allowed for " +
+ "other cache types [cacheName=" + cc.getName() + ", cacheType=" + cacheType + "]");
+
+ if (cacheType != CacheType.DATA_STRUCTURES && DataStructuresProcessor.isReservedGroup(cc.getGroupName()))
+ throw new IgniteCheckedException("Using cache group names reserved for datastructures is not allowed for " +
+ "other cache types [cacheName=" + cc.getName() + ", groupName=" + cc.getGroupName() +
+ ", cacheType=" + cacheType + "]");
+
+ // Make sure we do not use sql schema for system views.
+ if (ctx.query().moduleEnabled()) {
+ String schema = QueryUtils.normalizeSchemaName(cc.getName(), cc.getSqlSchema());
+
+ if (F.eq(schema, QueryUtils.SCHEMA_SYS)) {
+ if (cc.getSqlSchema() == null) {
+ // Conflict on cache name.
+ throw new IgniteCheckedException("SQL schema name derived from cache name is reserved (" +
+ "please set explicit SQL schema name through CacheConfiguration.setSqlSchema() or choose " +
+ "another cache name) [cacheName=" + cc.getName() + ", schemaName=" + cc.getSqlSchema() + "]");
+ }
+ else {
+ // Conflict on schema name.
+ throw new IgniteCheckedException("SQL schema name is reserved (please choose another one) [" +
+ "cacheName=" + cc.getName() + ", schemaName=" + cc.getSqlSchema() + ']');
+ }
+ }
+ }
+
+ if (cc.isEncryptionEnabled() && !ctx.clientNode()) {
+ if (!CU.isPersistentCache(cc, c.getDataStorageConfiguration())) {
+ throw new IgniteCheckedException("Using encryption is not allowed" +
+ " for not persistent cache [cacheName=" + cc.getName() + ", groupName=" + cc.getGroupName() +
+ ", cacheType=" + cacheType + "]");
+ }
+
+ EncryptionSpi encSpi = c.getEncryptionSpi();
+
+ if (encSpi == null) {
+ throw new IgniteCheckedException("EncryptionSpi should be configured to use encrypted cache " +
+ "[cacheName=" + cc.getName() + ", groupName=" + cc.getGroupName() +
+ ", cacheType=" + cacheType + "]");
+ }
+ }
+
+ Collection<QueryEntity> ents = cc.getQueryEntities();
+
+ if (ctx.discovery().discoCache() != null) {
+ boolean nonDfltPrecScaleExists = ents.stream().anyMatch(
+ e -> !F.isEmpty(e.getFieldsPrecision()) || !F.isEmpty(e.getFieldsScale()));
+
+ if (nonDfltPrecScaleExists) {
+ ClusterNode oldestNode = ctx.discovery().discoCache().oldestServerNode();
+
+ if (PRECISION_SCALE_SINCE_VER.compareTo(oldestNode.version()) > 0) {
+ throw new IgniteCheckedException("Non default precision and scale is supported since version 2.7. " +
+ "The node with oldest version [node=" + oldestNode + ']');
+ }
+ }
+ }
+ }
+
+ /**
+ * @param ctx Context.
+ * @param log Logger.
+ * @throws IgniteCheckedException if check failed.
+ */
+ static void checkConsistency(GridKernalContext ctx, IgniteLogger log) throws IgniteCheckedException {
+ Collection<ClusterNode> rmtNodes = ctx.discovery().remoteNodes();
+
+ boolean changeablePoolSize =
+ IgniteFeatures.allNodesSupports(rmtNodes, IgniteFeatures.DIFFERENT_REBALANCE_POOL_SIZE);
+
+ for (ClusterNode n : rmtNodes) {
+ if (Boolean.TRUE.equals(n.attribute(ATTR_CONSISTENCY_CHECK_SKIPPED)))
+ continue;
+
+ if (!changeablePoolSize)
+ checkRebalanceConfiguration(n, ctx);
+
+ checkTransactionConfiguration(n, ctx, log);
+
+ checkMemoryConfiguration(n, ctx);
+
+ DeploymentMode locDepMode = ctx.config().getDeploymentMode();
+ DeploymentMode rmtDepMode = n.attribute(IgniteNodeAttributes.ATTR_DEPLOYMENT_MODE);
+
+ CU.checkAttributeMismatch(log, null, n.id(), "deploymentMode", "Deployment mode",
+ locDepMode, rmtDepMode, true);
+ }
+ }
+
+ /**
+ * @param rmtNode Joining node.
+ * @param ctx Context
+ * @return List of validation errors.
+ */
+ private static List<String> validateRmtRegions(ClusterNode rmtNode, GridKernalContext ctx) {
+ List<String> errorMessages = new ArrayList<>();
+
+ DataStorageConfiguration rmtStorageCfg = extractDataStorage(rmtNode, ctx);
+ Map<String, DataRegionConfiguration> rmtRegionCfgs = dataRegionCfgs(rmtStorageCfg);
+
+ DataStorageConfiguration locStorageCfg = ctx.config().getDataStorageConfiguration();
+
+ if (isDefaultDataRegionPersistent(locStorageCfg) != isDefaultDataRegionPersistent(rmtStorageCfg)) {
+ errorMessages.add(String.format(
+ INVALID_REGION_CONFIGURATION_MESSAGE,
+ "DEFAULT",
+ ctx.localNodeId(),
+ isDefaultDataRegionPersistent(locStorageCfg),
+ rmtNode.id(),
+ isDefaultDataRegionPersistent(rmtStorageCfg)
+ ));
+ }
+
+ for (ClusterNode clusterNode : ctx.discovery().aliveServerNodes()) {
+ Map<String, DataRegionConfiguration> nodeRegionCfg = dataRegionCfgs(extractDataStorage(clusterNode, ctx));
+
+ for (Map.Entry<String, DataRegionConfiguration> nodeRegionCfgEntry : nodeRegionCfg.entrySet()) {
+ String regionName = nodeRegionCfgEntry.getKey();
+
+ DataRegionConfiguration rmtRegionCfg = rmtRegionCfgs.get(regionName);
+
+ if (rmtRegionCfg != null && rmtRegionCfg.isPersistenceEnabled() != nodeRegionCfgEntry.getValue().isPersistenceEnabled())
+ errorMessages.add(String.format(
+ INVALID_REGION_CONFIGURATION_MESSAGE,
+ regionName,
+ ctx.localNodeId(),
+ nodeRegionCfgEntry.getValue().isPersistenceEnabled(),
+ rmtNode.id(),
+ rmtRegionCfg.isPersistenceEnabled()
+ ));
+ }
+ }
+
+ return errorMessages;
+ }
+
+ /**
+ * @param assertParam Assert parameter.
+ * @param cond The condition result.
+ * @param condDesc The description of condition.
+ */
+ private static void apply(
+ BiFunction<Boolean, String, IgniteCheckedException> assertParam,
+ Boolean cond,
+ String condDesc
+ ) throws IgniteCheckedException {
+ IgniteCheckedException apply = assertParam.apply(cond, condDesc);
+
+ if (apply != null)
+ throw apply;
+ }
+
+ /**
+ * @param c Ignite Configuration.
+ * @param cc Cache Configuration.
+ * @param ctx Context.
+ * @return {@code true} if cache is starting on client node and this node is affinity node for the cache.
+ */
+ private static boolean storesLocallyOnClient(IgniteConfiguration c, CacheConfiguration cc, GridKernalContext ctx) {
+ if (c.isClientMode() && c.getDataStorageConfiguration() == null) {
+ if (cc.getCacheMode() == LOCAL)
+ return true;
+
+ return ctx.discovery().cacheAffinityNode(ctx.discovery().localNode(), cc.getName());
+
+ }
+ else
+ return false;
+ }
+
+ /**
+ * @param rmt Remote node to check.
+ * @param ctx Context.
+ * @throws IgniteCheckedException If check failed.
+ */
+ private static void checkRebalanceConfiguration(
+ ClusterNode rmt,
+ GridKernalContext ctx
+ ) throws IgniteCheckedException {
+ ClusterNode locNode = ctx.discovery().localNode();
+
+ if (ctx.config().isClientMode() || locNode.isDaemon() || rmt.isClient() || rmt.isDaemon())
+ return;
+
+ Integer rebalanceThreadPoolSize = rmt.attribute(IgniteNodeAttributes.ATTR_REBALANCE_POOL_SIZE);
+
+ if (rebalanceThreadPoolSize != null && rebalanceThreadPoolSize != ctx.config().getRebalanceThreadPoolSize()) {
+ throw new IgniteCheckedException("Rebalance configuration mismatch (fix configuration or set -D" +
+ IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK + "=true system property)." +
+ " Different values of such parameter may lead to rebalance process instability and hanging. " +
+ " [rmtNodeId=" + rmt.id() +
+ ", locRebalanceThreadPoolSize = " + ctx.config().getRebalanceThreadPoolSize() +
+ ", rmtRebalanceThreadPoolSize = " + rebalanceThreadPoolSize + "]");
+ }
+ }
+
+ /**
+ * @param rmt Remote node to check.
+ * @param ctx Context.
+ * @param log Logger.
+ * @throws IgniteCheckedException If check failed.
+ */
+ private static void checkTransactionConfiguration(
+ ClusterNode rmt,
+ GridKernalContext ctx,
+ IgniteLogger log
+ ) throws IgniteCheckedException {
+ TransactionConfiguration rmtTxCfg = rmt.attribute(ATTR_TX_CONFIG);
+
+ if (rmtTxCfg != null) {
+ TransactionConfiguration locTxCfg = ctx.config().getTransactionConfiguration();
+
+ checkDeadlockDetectionConfig(rmt, rmtTxCfg, locTxCfg, log);
+
+ checkSerializableEnabledConfig(rmt, rmtTxCfg, locTxCfg);
+ }
+ }
+
+ /**
+ *
+ */
+ private static void checkDeadlockDetectionConfig(
+ ClusterNode rmt,
+ TransactionConfiguration rmtTxCfg,
+ TransactionConfiguration locTxCfg,
+ IgniteLogger log
+ ) {
+ boolean locDeadlockDetectionEnabled = locTxCfg.getDeadlockTimeout() > 0;
+ boolean rmtDeadlockDetectionEnabled = rmtTxCfg.getDeadlockTimeout() > 0;
+
+ if (locDeadlockDetectionEnabled != rmtDeadlockDetectionEnabled) {
+ U.warn(log, "Deadlock detection is enabled on one node and disabled on another. " +
+ "Disabled detection on one node can lead to undetected deadlocks. [rmtNodeId=" + rmt.id() +
+ ", locDeadlockTimeout=" + locTxCfg.getDeadlockTimeout() +
+ ", rmtDeadlockTimeout=" + rmtTxCfg.getDeadlockTimeout());
+ }
+ }
+
+ /**
+ *
+ */
+ private static void checkSerializableEnabledConfig(
+ ClusterNode rmt,
+ TransactionConfiguration rmtTxCfg,
+ TransactionConfiguration locTxCfg
+ ) throws IgniteCheckedException {
+ if (locTxCfg.isTxSerializableEnabled() != rmtTxCfg.isTxSerializableEnabled())
+ throw new IgniteCheckedException("Serializable transactions enabled mismatch " +
+ "(fix txSerializableEnabled property or set -D" + IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK + "=true " +
+ "system property) [rmtNodeId=" + rmt.id() +
+ ", locTxSerializableEnabled=" + locTxCfg.isTxSerializableEnabled() +
+ ", rmtTxSerializableEnabled=" + rmtTxCfg.isTxSerializableEnabled() + ']');
+ }
+
+ /**
+ * @param rmt Remote node to check.
+ * @param ctx Context.
+ * @throws IgniteCheckedException If check failed.
+ */
+ private static void checkMemoryConfiguration(ClusterNode rmt, GridKernalContext ctx) throws IgniteCheckedException {
+ ClusterNode locNode = ctx.discovery().localNode();
+
+ if (ctx.config().isClientMode() || locNode.isDaemon() || rmt.isClient() || rmt.isDaemon())
+ return;
+
+ DataStorageConfiguration dsCfg = null;
+
+ Object dsCfgBytes = rmt.attribute(IgniteNodeAttributes.ATTR_DATA_STORAGE_CONFIG);
+
+ if (dsCfgBytes instanceof byte[])
+ dsCfg = new JdkMarshaller().unmarshal((byte[])dsCfgBytes, U.resolveClassLoader(ctx.config()));
+
+ if (dsCfg == null) {
+ // Try to use legacy memory configuration.
+ MemoryConfiguration memCfg = rmt.attribute(IgniteNodeAttributes.ATTR_MEMORY_CONFIG);
+
+ if (memCfg != null) {
+ dsCfg = new DataStorageConfiguration();
+
+ // All properties that are used in validation should be converted here.
+ dsCfg.setPageSize(memCfg.getPageSize());
+ }
+ }
+
+ if (dsCfg != null) {
+ DataStorageConfiguration locDsCfg = ctx.config().getDataStorageConfiguration();
+
+ if (dsCfg.getPageSize() != locDsCfg.getPageSize()) {
+ throw new IgniteCheckedException("Memory configuration mismatch (fix configuration or set -D" +
+ IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK + "=true system property) [rmtNodeId=" + rmt.id() +
+ ", locPageSize = " + locDsCfg.getPageSize() + ", rmtPageSize = " + dsCfg.getPageSize() + "]");
+ }
+ }
+ }
+
+ /**
+ * @param node Joining node.
+ * @param ctx Context.
+ * @param map Cache descriptors.
+ * @return Validation result or {@code null} in case of success.
+ */
+ @Nullable static IgniteNodeValidationResult validateHashIdResolvers(
+ ClusterNode node,
+ GridKernalContext ctx,
+ Map<String, DynamicCacheDescriptor> map
+ ) {
+ if (!node.isClient()) {
+ for (DynamicCacheDescriptor desc : map.values()) {
+ CacheConfiguration cfg = desc.cacheConfiguration();
+
+ if (cfg.getAffinity() instanceof RendezvousAffinityFunction) {
+ RendezvousAffinityFunction aff = (RendezvousAffinityFunction)cfg.getAffinity();
+
+ Object nodeHashObj = aff.resolveNodeHash(node);
+
+ for (ClusterNode topNode : ctx.discovery().aliveServerNodes()) {
+ Object topNodeHashObj = aff.resolveNodeHash(topNode);
+
+ if (nodeHashObj.hashCode() == topNodeHashObj.hashCode()) {
+ String errMsg = "Failed to add node to topology because it has the same hash code for " +
+ "partitioned affinity as one of existing nodes [cacheName=" +
+ cfg.getName() + ", existingNodeId=" + topNode.id() + ']';
+
+ String sndMsg = "Failed to add node to topology because it has the same hash code for " +
+ "partitioned affinity as one of existing nodes [cacheName=" +
+ cfg.getName() + ", existingNodeId=" + topNode.id() + ']';
+
+ return new IgniteNodeValidationResult(topNode.id(), errMsg, sndMsg);
+ }
+ }
+ }
+ }
+ }
+
+ return null;
+ }
+
+ /**
+ * @param rmtNode Remote node to check.
+ * @param ctx Context.
+ * @return Data storage configuration
+ */
+ private static DataStorageConfiguration extractDataStorage(ClusterNode rmtNode, GridKernalContext ctx) {
+ return GridCacheUtils.extractDataStorage(
+ rmtNode,
+ ctx.marshallerContext().jdkMarshaller(),
+ U.resolveClassLoader(ctx.config())
+ );
+ }
+
+ /**
+ * @param dataStorageCfg User-defined data regions.
+ */
+ private static Map<String, DataRegionConfiguration> dataRegionCfgs(DataStorageConfiguration dataStorageCfg) {
+ if (dataStorageCfg != null) {
+ return Optional.ofNullable(dataStorageCfg.getDataRegionConfigurations())
+ .map(Stream::of)
+ .orElseGet(Stream::empty)
+ .collect(Collectors.toMap(DataRegionConfiguration::getName, e -> e));
+ }
+
+ return Collections.emptyMap();
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBaselineTopologyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBaselineTopologyTest.java
index cc573e5..93b2fbb 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBaselineTopologyTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBaselineTopologyTest.java
@@ -817,7 +817,11 @@ public class CacheBaselineTopologyTest extends GridCommonAbstractTest {
stopAllGrids();
- startGrids(5);
+ startGrid(1); //TODO https://issues.apache.org/jira/browse/IGNITE-8717 (replace with startGrids(5); //after)
+ startGrid(0);
+ startGrid(2);
+ startGrid(3);
+ startGrid(4);
GridTestUtils.waitForCondition(() -> grid(0).cluster().active(), getTestTimeout());
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsDestroyCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsDestroyCacheTest.java
index 3c42b30..ed274ec 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsDestroyCacheTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsDestroyCacheTest.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.cache.persistence;
import org.apache.ignite.Ignite;
+import org.junit.Ignore;
import org.junit.Test;
/**
@@ -61,6 +62,7 @@ public class IgnitePdsDestroyCacheTest extends IgnitePdsDestroyCacheAbstractTest
*
* @throws Exception If failed.
*/
+ @Ignore("https://issues.apache.org/jira/browse/IGNITE-8717")
@Test
public void testDestroyCachesAbruptly() throws Exception {
Ignite ignite = startGrids(NODES);
@@ -77,6 +79,7 @@ public class IgnitePdsDestroyCacheTest extends IgnitePdsDestroyCacheAbstractTest
*
* @throws Exception If failed.
*/
+ @Ignore("https://issues.apache.org/jira/browse/IGNITE-8717")
@Test
public void testDestroyGroupCachesAbruptly() throws Exception {
Ignite ignite = startGrids(NODES);
diff --git a/modules/core/src/test/java/org/apache/ignite/marshaller/GridMarshallerMappingConsistencyTest.java b/modules/core/src/test/java/org/apache/ignite/marshaller/GridMarshallerMappingConsistencyTest.java
index 7178feb..06f92f4 100644
--- a/modules/core/src/test/java/org/apache/ignite/marshaller/GridMarshallerMappingConsistencyTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/marshaller/GridMarshallerMappingConsistencyTest.java
@@ -148,8 +148,8 @@ public class GridMarshallerMappingConsistencyTest extends GridCommonAbstractTest
stopAllGrids();
- Ignite g2 = startGrid(2);
startGrid(1);
+ Ignite g2 = startGrid(2);
assertTrue("Failed to wait for automatic grid activation",
GridTestUtils.waitForCondition(() -> g2.cluster().active(), getTestTimeout()));
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgniteCacheGroupsWithRestartsTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgniteCacheGroupsWithRestartsTest.java
index 865aee5..833af0e 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgniteCacheGroupsWithRestartsTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgniteCacheGroupsWithRestartsTest.java
@@ -27,6 +27,7 @@ import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.BiFunction;
import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.QueryEntity;
import org.apache.ignite.cache.QueryIndex;
import org.apache.ignite.cache.QueryIndexType;
@@ -39,8 +40,10 @@ import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.commandline.CommandHandler;
import org.apache.ignite.internal.commandline.cache.argument.FindAndDeleteGarbageArg;
+import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.visor.VisorTaskArgument;
import org.apache.ignite.internal.visor.cache.VisorFindAndDeleteGarbageInPersistenceJobResult;
import org.apache.ignite.internal.visor.cache.VisorFindAndDeleteGarbageInPersistenceTask;
@@ -48,18 +51,16 @@ import org.apache.ignite.internal.visor.cache.VisorFindAndDeleteGarbageInPersist
import org.apache.ignite.internal.visor.cache.VisorFindAndDeleteGarbageInPersistenceTaskResult;
import org.apache.ignite.testframework.junits.WithSystemProperty;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-import org.apache.ignite.testframework.junits.multijvm.IgniteProcessProxy;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import static org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager.IGNITE_PDS_SKIP_CHECKPOINT_ON_NODE_STOP;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.DFLT_STORE_DIR;
/**
- * Testing corner cases in cache group functionality:
- * -stopping cache in shared group and immediate node leaving;
- * -starting cache in shared group with the same name as destroyed one;
- * -etc.
+ * Testing corner cases in cache group functionality: -stopping cache in shared group and immediate node leaving;
+ * -starting cache in shared group with the same name as destroyed one; -etc.
*/
@WithSystemProperty(key=IGNITE_PDS_SKIP_CHECKPOINT_ON_NODE_STOP, value="true")
@SuppressWarnings({"unchecked", "ThrowableNotThrown"})
@@ -67,10 +68,17 @@ public class IgniteCacheGroupsWithRestartsTest extends GridCommonAbstractTest {
/** Group name. */
public static final String GROUP = "group";
+ /**
+ *
+ */
+ private volatile boolean startExtraStaticCache;
+
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration configuration = super.getConfiguration(gridName);
+ configuration.setConsistentId(gridName);
+
configuration.setConnectorConfiguration(new ConnectorConfiguration());
DataStorageConfiguration cfg = new DataStorageConfiguration();
@@ -81,6 +89,9 @@ public class IgniteCacheGroupsWithRestartsTest extends GridCommonAbstractTest {
configuration.setDataStorageConfiguration(cfg);
+ if (startExtraStaticCache)
+ configuration.setCacheConfiguration(getCacheConfiguration(3));
+
return configuration;
}
@@ -147,7 +158,7 @@ public class IgniteCacheGroupsWithRestartsTest extends GridCommonAbstractTest {
assertNull(ex.cachex(getCacheName(0)));
- IgniteProcessProxy.kill(grid(2).configuration().getIgniteInstanceName());
+ stopGrid(2, true);
startGrid(2);
@@ -164,11 +175,88 @@ public class IgniteCacheGroupsWithRestartsTest extends GridCommonAbstractTest {
* @throws Exception If failed.
*/
@Test
+ public void testNodeRestartBetweenCacheStop() throws Exception {
+ IgniteEx ex = startGrids(3);
+
+ prepareCachesAndData(ex);
+
+ stopGrid(2, true);
+
+ ex.destroyCache(getCacheName(0));
+
+ assertNull(ex.cachex(getCacheName(0)));
+
+ try {
+ startGrid(2);
+
+ fail();
+ }
+ catch (Exception e) {
+ List<Throwable> list = X.getThrowableList(e);
+
+ assertTrue(list.stream().
+ anyMatch(x -> x.getMessage().
+ contains("Joining node has caches with data which are not presented on cluster")));
+ }
+
+ removeCacheDir(getTestIgniteInstanceName(2), "cacheGroup-group");
+
+ IgniteEx node2 = startGrid(2);
+
+ assertEquals(3, node2.cluster().nodes().size());
+ }
+
+ /**
+ * @param instanceName Instance name.
+ * @param cacheGroup Cache group.
+ */
+ private void removeCacheDir(String instanceName, String cacheGroup) throws IgniteCheckedException {
+ String dn2DirName = instanceName.replace(".", "_");
+
+ U.delete(U.resolveWorkDirectory(U.defaultWorkDirectory(),
+ DFLT_STORE_DIR + "/" + dn2DirName + "/" + cacheGroup, true));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Ignore("https://ggsystems.atlassian.net/browse/GG-21755")
+ @Test
+ public void testNodeRestartWithNewStaticallyConfiguredCache() throws Exception {
+ IgniteEx ex = startGrids(3);
+
+ prepareCachesAndData(ex);
+
+ stopGrid(2, true);
+
+ assertNull(ex.cachex(getCacheName(3)));
+
+ startExtraStaticCache = true;
+
+ IgniteEx node2;
+ try {
+ node2 = startGrid(2);
+ }
+ finally {
+ startExtraStaticCache = false;
+ }
+
+ assertNotNull(ex.cachex(getCacheName(3)));
+ assertNotNull(node2.cachex(getCacheName(3)));
+
+ IgniteCache<Object, Object> cache = ex.cache(getCacheName(3));
+
+ assertEquals(0, cache.size());
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
public void testCleaningGarbageAfterCacheDestroyedAndNodeStop() throws Exception {
testFindAndDeleteGarbage(this::executeTask);
}
-
/**
* @throws Exception If failed.
*/
@@ -245,24 +333,24 @@ public class IgniteCacheGroupsWithRestartsTest extends GridCommonAbstractTest {
/**
* @param ignite Ignite to execute task on.
- * @param deleteFoundGarbage If clearing mode should be used.
+ * @param delFoundGarbage If clearing mode should be used.
* @return Result of task run.
*/
private VisorFindAndDeleteGarbageInPersistenceTaskResult executeTaskViaControlConsoleUtil(
IgniteEx ignite,
- boolean deleteFoundGarbage
+ boolean delFoundGarbage
) {
- CommandHandler handler = new CommandHandler();
+ CommandHandler hnd = new CommandHandler();
List<String> args = new ArrayList<>(Arrays.asList("--yes", "--port", "11212", "--cache", "find_garbage",
ignite.localNode().id().toString()));
- if (deleteFoundGarbage)
+ if (delFoundGarbage)
args.add(FindAndDeleteGarbageArg.DELETE.argName());
- handler.execute(args);
+ hnd.execute(args);
- return handler.getLastOperationResult();
+ return hnd.getLastOperationResult();
}
/**
@@ -288,7 +376,9 @@ public class IgniteCacheGroupsWithRestartsTest extends GridCommonAbstractTest {
*
*/
static class Account {
- /** */
+ /**
+ *
+ */
private final int val;
/**