You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/05/18 08:34:13 UTC
[1/6] ignite git commit: Moved logic related to caches discovery data
handling to ClusterCachesInfo. Start of statically configured caches in the
same way as dynamic ones: from GridDhtPartitionsExchangeFuture.
Repository: ignite
Updated Branches:
refs/heads/master e84d23787 -> 201769640
http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
index 1023140..e423098 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
@@ -64,6 +64,7 @@ import org.apache.ignite.internal.processors.cache.IgniteCacheInvokeReadThroughS
import org.apache.ignite.internal.processors.cache.IgniteCacheInvokeReadThroughTest;
import org.apache.ignite.internal.processors.cache.IgniteCacheLoadRebalanceEvictionSelfTest;
import org.apache.ignite.internal.processors.cache.IgniteCacheReadThroughStoreCallTest;
+import org.apache.ignite.internal.processors.cache.IgniteCacheStartTest;
import org.apache.ignite.internal.processors.cache.IgniteCacheTxLocalPeekModesTest;
import org.apache.ignite.internal.processors.cache.IgniteCacheTxNearPeekModesTest;
import org.apache.ignite.internal.processors.cache.IgniteCacheTxPeekModesTest;
@@ -83,6 +84,7 @@ import org.apache.ignite.internal.processors.cache.IgniteSystemCacheOnClientTest
import org.apache.ignite.internal.processors.cache.MarshallerCacheJobRunNodeRestartTest;
import org.apache.ignite.internal.processors.cache.distributed.CacheAffinityEarlyTest;
import org.apache.ignite.internal.processors.cache.distributed.CacheAtomicPrimarySyncBackPressureTest;
+import org.apache.ignite.internal.processors.cache.distributed.CacheDiscoveryDataConcurrentJoinTest;
import org.apache.ignite.internal.processors.cache.distributed.CacheGetFutureHangsSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.CacheNoValueClassOnServerNodeTest;
import org.apache.ignite.internal.processors.cache.distributed.CacheStartOnJoinTest;
@@ -222,6 +224,8 @@ public class IgniteCacheTestSuite4 extends TestSuite {
suite.addTestSuite(IgniteCacheCreatePutMultiNodeSelfTest.class);
suite.addTestSuite(IgniteCacheCreatePutTest.class);
suite.addTestSuite(CacheStartOnJoinTest.class);
+ suite.addTestSuite(IgniteCacheStartTest.class);
+ suite.addTestSuite(CacheDiscoveryDataConcurrentJoinTest.class);
suite.addTestSuite(GridCacheTxLoadFromStoreOnLockSelfTest.class);
http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/spring/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreFactorySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/spring/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreFactorySelfTest.java b/modules/spring/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreFactorySelfTest.java
index 9389646..0fd16f0 100644
--- a/modules/spring/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreFactorySelfTest.java
+++ b/modules/spring/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreFactorySelfTest.java
@@ -77,7 +77,7 @@ public class CacheJdbcPojoStoreFactorySelfTest extends GridCommonAbstractTest {
}
return null;
}
- }, IgniteException.class, "Failed to load bean in application context");
+ }, IgniteException.class, "Spring bean with provided name doesn't exist");
}
/**
[6/6] ignite git commit: Merge remote-tracking branch 'origin/master'
Posted by sb...@apache.org.
Merge remote-tracking branch 'origin/master'
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/20176964
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/20176964
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/20176964
Branch: refs/heads/master
Commit: 201769640cf4a48ecb0728d5edb0b16c682ee46b
Parents: 8600200 e84d237
Author: sboikov <sb...@gridgain.com>
Authored: Thu May 18 11:33:54 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu May 18 11:33:54 2017 +0300
----------------------------------------------------------------------
.../dotnet/Apache.Ignite.Core/Binary/TimestampAttribute.cs | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
[5/6] ignite git commit: Moved logic related to caches discovery data
handling to ClusterCachesInfo. Start of statically configured caches in the
same way as dynamic ones: from GridDhtPartitionsExchangeFuture.
Posted by sb...@apache.org.
Moved logic related to caches discovery data handling to ClusterCachesInfo.
Start of statically configured caches in the same way as dynamic ones: from GridDhtPartitionsExchangeFuture.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/86002002
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/86002002
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/86002002
Branch: refs/heads/master
Commit: 86002002fda063eb33431e8bfd3f3a0df047325b
Parents: 20f4d18
Author: sboikov <sb...@gridgain.com>
Authored: Thu May 18 11:33:25 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu May 18 11:33:25 2017 +0300
----------------------------------------------------------------------
.../discovery/GridDiscoveryManager.java | 11 +-
.../affinity/AffinityTopologyVersion.java | 9 +
.../cache/CacheAffinitySharedManager.java | 286 ++--
.../CacheClientReconnectDiscoveryData.java | 133 ++
.../internal/processors/cache/CacheData.java | 157 ++
.../cache/CacheJoinNodeDiscoveryData.java | 147 ++
.../cache/CacheNodeCommonDiscoveryData.java | 82 +
.../processors/cache/ClusterCachesInfo.java | 903 +++++++++++
.../cache/DynamicCacheChangeBatch.java | 92 +-
.../cache/DynamicCacheChangeRequest.java | 102 +-
.../cache/DynamicCacheDescriptor.java | 171 +--
.../processors/cache/ExchangeActions.java | 338 +++++
.../processors/cache/GridCacheContext.java | 26 +-
.../GridCachePartitionExchangeManager.java | 54 +-
.../processors/cache/GridCacheProcessor.java | 1413 +++++-------------
.../dht/GridClientPartitionTopology.java | 2 +-
.../dht/GridDhtAffinityAssignmentRequest.java | 36 +-
.../dht/GridDhtAffinityAssignmentResponse.java | 36 +-
.../dht/GridDhtAssignmentFetchFuture.java | 63 +-
.../dht/GridDhtPartitionTopologyImpl.java | 12 +-
.../GridDhtAtomicAbstractUpdateRequest.java | 2 +
.../GridDhtPartitionsExchangeFuture.java | 160 +-
.../dht/preloader/GridDhtPreloader.java | 4 +-
.../processors/cache/local/GridLocalCache.java | 31 +-
.../cache/local/GridLocalLockFuture.java | 41 +-
.../cache/query/GridCacheQueryManager.java | 3 +-
.../continuous/CacheContinuousQueryHandler.java | 42 +-
.../continuous/CacheContinuousQueryManager.java | 11 +-
.../cluster/GridClusterStateProcessor.java | 26 +-
.../org/apache/ignite/spi/IgniteSpiAdapter.java | 2 +-
.../ignite/spi/discovery/DiscoveryDataBag.java | 18 +-
.../spi/discovery/tcp/TcpDiscoverySpi.java | 4 +-
.../core/src/test/config/examples.properties | 1 +
...ityFunctionBackupFilterAbstractSelfTest.java | 13 +-
.../internal/GridNodeMetricsLogSelfTest.java | 13 +-
.../GridCacheAbstractLocalStoreSelfTest.java | 41 +-
...gniteCacheP2pUnmarshallingNearErrorTest.java | 6 +-
.../processors/cache/IgniteCacheStartTest.java | 191 +++
.../cache/IgniteDynamicCacheStartSelfTest.java | 2 +-
...niteTopologyValidatorGridSplitCacheTest.java | 8 +-
.../CacheDiscoveryDataConcurrentJoinTest.java | 198 +++
.../CacheLateAffinityAssignmentTest.java | 4 +-
.../cache/distributed/CacheStartOnJoinTest.java | 10 +
.../IgniteCrossCacheTxStoreSelfTest.java | 12 +-
...ePartitionedBasicStoreMultiNodeSelfTest.java | 22 +-
.../loadtests/hashmap/GridCacheTestContext.java | 4 +
.../testsuites/IgniteCacheTestSuite4.java | 4 +
.../jdbc/CacheJdbcPojoStoreFactorySelfTest.java | 2 +-
48 files changed, 3198 insertions(+), 1750 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index e5f2278..24c7283 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -315,8 +315,12 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
boolean nearEnabled,
CacheMode cacheMode
) {
- if (!registeredCaches.containsKey(cacheName))
+ if (!registeredCaches.containsKey(cacheName)) {
+ if (cacheMode == CacheMode.REPLICATED)
+ nearEnabled = false;
+
registeredCaches.put(cacheName, new CachePredicate(filter, nearEnabled, cacheMode));
+ }
}
/**
@@ -2737,7 +2741,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
* @return {@code True} if this node is a data node for given cache.
*/
public boolean dataNode(ClusterNode node) {
- return !node.isDaemon() && CU.affinityNode(node, cacheFilter);
+ return CU.affinityNode(node, cacheFilter);
}
/**
@@ -2753,9 +2757,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
* @return {@code True} if near cache is present on the given nodes.
*/
public boolean nearNode(ClusterNode node) {
- if (node.isDaemon())
- return false;
-
if (CU.affinityNode(node, cacheFilter))
return nearEnabled;
http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java
index f564e28..8669530 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java
@@ -73,6 +73,15 @@ public class AffinityTopologyVersion implements Comparable<AffinityTopologyVersi
}
/**
+ * @return Topology version with incremented minor version.
+ */
+ public AffinityTopologyVersion nextMinorVersion() {
+ assert topVer > 0;
+
+ return new AffinityTopologyVersion(topVer, minorTopVer + 1);
+ }
+
+ /**
* @return Topology version.
*/
public long topologyVersion() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 0958208..cec42a8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -32,6 +32,7 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.affinity.AffinityFunction;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.internal.IgniteInternalFuture;
@@ -48,7 +49,6 @@ import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.IgniteInClosureX;
import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiInClosure;
@@ -90,8 +90,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
private final Object mux = new Object();
/** Pending affinity assignment futures. */
- private final ConcurrentMap<T2<Integer, AffinityTopologyVersion>, GridDhtAssignmentFetchFuture>
- pendingAssignmentFetchFuts = new ConcurrentHashMap8<>();
+ private final ConcurrentMap<Long, GridDhtAssignmentFetchFuture> pendingAssignmentFetchFuts =
+ new ConcurrentHashMap8<>();
/** Discovery listener. */
private final GridLocalEventListener discoLsnr = new GridLocalEventListener() {
@@ -312,50 +312,59 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
cctx.io().addHandler(cacheId, GridDhtAffinityAssignmentResponse.class,
new IgniteBiInClosure<UUID, GridDhtAffinityAssignmentResponse>() {
@Override public void apply(UUID nodeId, GridDhtAffinityAssignmentResponse res) {
- processAffinityAssignmentResponse(cacheId, nodeId, res);
+ processAffinityAssignmentResponse(nodeId, res);
}
});
}
}
/**
+ * @param exchActions Cache change requests to execute on exchange.
+ */
+ private void updateCachesInfo(ExchangeActions exchActions) {
+ for (ExchangeActions.ActionData action : exchActions.cacheStopRequests()) {
+ DynamicCacheDescriptor desc = registeredCaches.remove(action.descriptor().cacheId());
+
+ assert desc != null : action.request().cacheName();
+ }
+
+ for (ExchangeActions.ActionData action : exchActions.cacheStartRequests()) {
+ DynamicCacheChangeRequest req = action.request();
+
+ Integer cacheId = action.descriptor().cacheId();
+
+ DynamicCacheDescriptor desc = new DynamicCacheDescriptor(cctx.kernalContext(),
+ req.startCacheConfiguration(),
+ req.cacheType(),
+ false,
+ action.descriptor().receivedFrom(),
+ action.descriptor().staticallyConfigured(),
+ req.deploymentId(),
+ req.schema());
+
+ DynamicCacheDescriptor old = registeredCaches.put(cacheId, desc);
+
+ assert old == null : old;
+ }
+ }
+
+ /**
* Called on exchange initiated for cache start/stop request.
*
* @param fut Exchange future.
* @param crd Coordinator flag.
- * @param reqs Cache change requests.
+ * @param exchActions Cache change requests.
* @throws IgniteCheckedException If failed.
* @return {@code True} if client-only exchange is needed.
*/
public boolean onCacheChangeRequest(final GridDhtPartitionsExchangeFuture fut,
boolean crd,
- Collection<DynamicCacheChangeRequest> reqs)
- throws IgniteCheckedException {
- assert !F.isEmpty(reqs) : fut;
-
- for (DynamicCacheChangeRequest req : reqs) {
- Integer cacheId = CU.cacheId(req.cacheName());
-
- if (req.stop()) {
- DynamicCacheDescriptor desc = registeredCaches.remove(cacheId);
-
- assert desc != null : cacheId;
- }
- else if (req.start() && !req.clientStartOnly()) {
- DynamicCacheDescriptor desc = new DynamicCacheDescriptor(cctx.kernalContext(),
- req.startCacheConfiguration(),
- req.cacheType(),
- false,
- req.deploymentId(),
- req.schema());
-
- DynamicCacheDescriptor old = registeredCaches.put(cacheId, desc);
-
- assert old == null : old;
- }
- }
+ ExchangeActions exchActions)
+ throws IgniteCheckedException
+ {
+ assert exchActions != null && !exchActions.empty() : exchActions;
- boolean clientOnly = true;
+ updateCachesInfo(exchActions);
// Affinity did not change for existing caches.
forAllCaches(crd && lateAffAssign, new IgniteInClosureX<GridAffinityAssignmentCache>() {
@@ -367,87 +376,103 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
}
});
- Set<Integer> stoppedCaches = null;
+ for (ExchangeActions.ActionData action : exchActions.newAndClientCachesStartRequests()) {
+ DynamicCacheDescriptor cacheDesc = action.descriptor();
- for (DynamicCacheChangeRequest req : reqs) {
- if (!(req.clientStartOnly() || req.close()))
- clientOnly = false;
+ DynamicCacheChangeRequest req = action.request();
- Integer cacheId = CU.cacheId(req.cacheName());
+ boolean startCache;
- if (req.start()) {
- cctx.cache().prepareCacheStart(req, fut.topologyVersion());
+ NearCacheConfiguration nearCfg = null;
- if (fut.isCacheAdded(cacheId, fut.topologyVersion())) {
+ if (cctx.localNodeId().equals(req.initiatingNodeId())) {
+ startCache = true;
+
+ nearCfg = req.nearCacheConfiguration();
+ }
+ else {
+ startCache = cctx.cacheContext(action.descriptor().cacheId()) == null &&
+ CU.affinityNode(cctx.localNode(), req.startCacheConfiguration().getNodeFilter());
+ }
+
+ if (startCache) {
+ cctx.cache().prepareCacheStart(cacheDesc, nearCfg, fut.topologyVersion());
+
+ if (fut.cacheAddedOnExchange(cacheDesc.cacheId(), cacheDesc.receivedFrom())) {
if (fut.discoCache().cacheAffinityNodes(req.cacheName()).isEmpty())
U.quietAndWarn(log, "No server nodes found for cache client: " + req.cacheName());
}
+ }
- if (!crd || !lateAffAssign) {
- GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
+ if (!crd || !lateAffAssign) {
+ GridCacheContext cacheCtx = cctx.cacheContext(cacheDesc.cacheId());
- if (cacheCtx != null && !cacheCtx.isLocal()) {
- boolean clientCacheStarted =
- req.clientStartOnly() && req.initiatingNodeId().equals(cctx.localNodeId());
+ if (cacheCtx != null && !cacheCtx.isLocal()) {
+ boolean clientCacheStarted =
+ req.clientStartOnly() && req.initiatingNodeId().equals(cctx.localNodeId());
- if (clientCacheStarted)
- initAffinity(cacheCtx.affinity().affinityCache(), fut, lateAffAssign);
- else if (!req.clientStartOnly()) {
- assert fut.topologyVersion().equals(cacheCtx.startTopologyVersion());
+ if (clientCacheStarted)
+ initAffinity(cacheDesc, cacheCtx.affinity().affinityCache(), fut, lateAffAssign);
+ else if (!req.clientStartOnly()) {
+ assert fut.topologyVersion().equals(cacheCtx.startTopologyVersion());
- GridAffinityAssignmentCache aff = cacheCtx.affinity().affinityCache();
+ GridAffinityAssignmentCache aff = cacheCtx.affinity().affinityCache();
- assert aff.lastVersion().equals(AffinityTopologyVersion.NONE) : aff.lastVersion();
+ assert aff.lastVersion().equals(AffinityTopologyVersion.NONE) : aff.lastVersion();
- List<List<ClusterNode>> assignment = aff.calculate(fut.topologyVersion(),
- fut.discoveryEvent(), fut.discoCache());
+ List<List<ClusterNode>> assignment = aff.calculate(fut.topologyVersion(),
+ fut.discoveryEvent(), fut.discoCache());
- aff.initialize(fut.topologyVersion(), assignment);
- }
+ aff.initialize(fut.topologyVersion(), assignment);
}
}
- else
- initStartedCacheOnCoordinator(fut, cacheId);
}
- else if (req.stop() || req.close()) {
- cctx.cache().blockGateway(req);
+ else
+ initStartedCacheOnCoordinator(fut, cacheDesc.cacheId());
+ }
- if (crd) {
- boolean rmvCache = false;
+ for (DynamicCacheChangeRequest req : exchActions.closeRequests(cctx.localNodeId())) {
+ Integer cacheId = CU.cacheId(req.cacheName());
- if (req.close() && req.initiatingNodeId().equals(cctx.localNodeId())) {
- GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
+ cctx.cache().blockGateway(req);
- rmvCache = cacheCtx != null && !cacheCtx.affinityNode();
- }
- else if (req.stop())
- rmvCache = true;
+ if (crd) {
+ GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
- if (rmvCache) {
- CacheHolder cache = caches.remove(cacheId);
+ // Client cache was stopped, need create 'client' CacheHolder.
+ if (cacheCtx != null && !cacheCtx.affinityNode()) {
+ CacheHolder cache = caches.remove(cacheId);
- if (cache != null) {
- if (!req.stop()) {
- assert !cache.client();
+ assert !cache.client() : cache;
- cache = CacheHolder2.create(cctx,
- cctx.cache().cacheDescriptor(cacheId),
- fut,
- cache.affinity());
+ cache = CacheHolder2.create(cctx,
+ cctx.cache().cacheDescriptor(cacheId),
+ fut,
+ cache.affinity());
- caches.put(cacheId, cache);
- }
- else {
- if (stoppedCaches == null)
- stoppedCaches = new HashSet<>();
+ caches.put(cacheId, cache);
+ }
+ }
+ }
- stoppedCaches.add(cache.cacheId());
+ Set<Integer> stoppedCaches = null;
- cctx.io().removeHandler(cacheId, GridDhtAffinityAssignmentResponse.class);
- }
- }
- }
- }
+ for (ExchangeActions.ActionData action : exchActions.cacheStopRequests()) {
+ DynamicCacheDescriptor desc = action.descriptor();
+
+ cctx.cache().blockGateway(action.request());
+
+ if (crd && lateAffAssign && desc.cacheConfiguration().getCacheMode() != LOCAL) {
+ CacheHolder cache = caches.remove(desc.cacheId());
+
+ assert cache != null : action.request();
+
+ if (stoppedCaches == null)
+ stoppedCaches = new HashSet<>();
+
+ stoppedCaches.add(cache.cacheId());
+
+ cctx.io().removeHandler(desc.cacheId(), GridDhtAffinityAssignmentResponse.class);
}
}
@@ -479,7 +504,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
}
}
- return clientOnly;
+ return exchActions.clientOnlyExchange();
}
/**
@@ -578,7 +603,11 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
assert affTopVer.topologyVersion() > 0 : affTopVer;
- IgniteUuid deploymentId = registeredCaches.get(aff.cacheId()).deploymentId();
+ DynamicCacheDescriptor desc = registeredCaches.get(aff.cacheId());
+
+ assert desc != null : aff.cacheName();
+
+ IgniteUuid deploymentId = desc.deploymentId();
if (!deploymentId.equals(deploymentIds.get(aff.cacheId()))) {
aff.clientEventTopologyChange(exchFut.discoveryEvent(), topVer);
@@ -659,7 +688,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
});
}
else
- initCachesAffinity(fut);
+ initAffinityNoLateAssignment(fut);
}
}
@@ -667,7 +696,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
* @param fut Future to add.
*/
public void addDhtAssignmentFetchFuture(GridDhtAssignmentFetchFuture fut) {
- GridDhtAssignmentFetchFuture old = pendingAssignmentFetchFuts.putIfAbsent(fut.key(), fut);
+ GridDhtAssignmentFetchFuture old = pendingAssignmentFetchFuts.putIfAbsent(fut.id(), fut);
assert old == null : "More than one thread is trying to fetch partition assignments [fut=" + fut +
", allFuts=" + pendingAssignmentFetchFuts + ']';
@@ -677,27 +706,23 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
* @param fut Future to remove.
*/
public void removeDhtAssignmentFetchFuture(GridDhtAssignmentFetchFuture fut) {
- boolean rmv = pendingAssignmentFetchFuts.remove(fut.key(), fut);
+ boolean rmv = pendingAssignmentFetchFuts.remove(fut.id(), fut);
- assert rmv : "Failed to remove assignment fetch future: " + fut.key();
+ assert rmv : "Failed to remove assignment fetch future: " + fut.id();
}
/**
- * @param cacheId Cache ID.
* @param nodeId Node ID.
* @param res Response.
*/
- private void processAffinityAssignmentResponse(Integer cacheId, UUID nodeId, GridDhtAffinityAssignmentResponse res) {
+ private void processAffinityAssignmentResponse(UUID nodeId, GridDhtAffinityAssignmentResponse res) {
if (log.isDebugEnabled())
log.debug("Processing affinity assignment response [node=" + nodeId + ", res=" + res + ']');
- for (GridDhtAssignmentFetchFuture fut : pendingAssignmentFetchFuts.values()) {
- if (fut.key().get1().equals(cacheId)) {
- fut.onResponse(nodeId, res);
+ GridDhtAssignmentFetchFuture fut = pendingAssignmentFetchFuts.get(res.futureId());
- break;
- }
- }
+ if (fut != null)
+ fut.onResponse(nodeId, res);
}
/**
@@ -773,7 +798,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
}
/**
- * Initialized affinity started on this exchange.
+ * Initialized affinity for cache received from node joining on this exchange.
*
* @param crd Coordinator flag.
* @param fut Exchange future.
@@ -782,12 +807,10 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
*/
public void initStartedCaches(boolean crd,
final GridDhtPartitionsExchangeFuture fut,
- @Nullable Collection<DynamicCacheDescriptor> descs) throws IgniteCheckedException {
- if (descs != null) {
- for (DynamicCacheDescriptor desc : descs) {
- if (!registeredCaches.containsKey(desc.cacheId()))
- registeredCaches.put(desc.cacheId(), desc);
- }
+ Collection<DynamicCacheDescriptor> descs) throws IgniteCheckedException {
+ for (DynamicCacheDescriptor desc : descs) {
+ if (!registeredCaches.containsKey(desc.cacheId()))
+ registeredCaches.put(desc.cacheId(), desc);
}
if (crd && lateAffAssign) {
@@ -808,28 +831,34 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
forAllCaches(false, new IgniteInClosureX<GridAffinityAssignmentCache>() {
@Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException {
if (aff.lastVersion().equals(AffinityTopologyVersion.NONE))
- initAffinity(aff, fut, false);
+ initAffinity(registeredCaches.get(aff.cacheId()), aff, fut, false);
}
});
}
}
/**
+ * @param desc Cache descriptor.
* @param aff Affinity.
* @param fut Exchange future.
* @param fetch Force fetch flag.
* @throws IgniteCheckedException If failed.
*/
- private void initAffinity(GridAffinityAssignmentCache aff, GridDhtPartitionsExchangeFuture fut, boolean fetch)
+ private void initAffinity(DynamicCacheDescriptor desc,
+ GridAffinityAssignmentCache aff,
+ GridDhtPartitionsExchangeFuture fut,
+ boolean fetch)
throws IgniteCheckedException {
- if (!fetch && canCalculateAffinity(aff, fut)) {
+ assert desc != null;
+
+ if (!fetch && canCalculateAffinity(desc, aff, fut)) {
List<List<ClusterNode>> assignment = aff.calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache());
aff.initialize(fut.topologyVersion(), assignment);
}
else {
GridDhtAssignmentFetchFuture fetchFut = new GridDhtAssignmentFetchFuture(cctx,
- aff.cacheName(),
+ desc,
fut.topologyVersion(),
fut.discoCache());
@@ -840,11 +869,16 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
}
/**
+ * @param desc Cache descriptor.
* @param aff Affinity.
* @param fut Exchange future.
* @return {@code True} if local node can calculate affinity on it's own for this partition map exchange.
*/
- private boolean canCalculateAffinity(GridAffinityAssignmentCache aff, GridDhtPartitionsExchangeFuture fut) {
+ private boolean canCalculateAffinity(DynamicCacheDescriptor desc,
+ GridAffinityAssignmentCache aff,
+ GridDhtPartitionsExchangeFuture fut) {
+ assert desc != null : aff.cacheName();
+
// Do not request affinity from remote nodes if affinity function is not centralized.
if (!aff.centralizedAffinityFunction())
return true;
@@ -852,13 +886,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
// If local node did not initiate exchange or local node is the only cache node in grid.
Collection<ClusterNode> affNodes = cctx.discovery().cacheAffinityNodes(aff.cacheId(), fut.topologyVersion());
- DynamicCacheDescriptor cacheDesc = registeredCaches.get(aff.cacheId());
-
- assert cacheDesc != null : aff.cacheName();
-
- return fut.cacheStarted(aff.cacheId()) ||
+ return fut.cacheAddedOnExchange(aff.cacheId(), desc.receivedFrom()) ||
!fut.exchangeId().nodeId().equals(cctx.localNodeId()) ||
- cctx.localNodeId().equals(cacheDesc.receivedFrom()) ||
(affNodes.size() == 1 && affNodes.contains(cctx.localNode()));
}
@@ -898,7 +927,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
waitRebalanceInfo = initAffinityOnNodeJoin(fut, crd);
}
else
- initCachesAffinity(fut);
+ initAffinityNoLateAssignment(fut);
synchronized (mux) {
affCalcVer = fut.topologyVersion();
@@ -958,7 +987,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
}
else {
GridDhtAssignmentFetchFuture fetchFut = new GridDhtAssignmentFetchFuture(cctx,
- cacheCtx.name(),
+ cacheDesc,
topVer,
fut.discoCache());
@@ -971,7 +1000,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
for (int i = 0; i < fetchFuts.size(); i++) {
GridDhtAssignmentFetchFuture fetchFut = fetchFuts.get(i);
- Integer cacheId = fetchFut.key().get1();
+ Integer cacheId = fetchFut.cacheId();
fetchAffinity(fut, cctx.cacheContext(cacheId).affinity().affinityCache(), fetchFut);
}
@@ -1042,7 +1071,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
centralizedAff = true;
}
else {
- initCachesAffinity(fut);
+ initAffinityNoLateAssignment(fut);
centralizedAff = false;
}
@@ -1060,14 +1089,14 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
* @param fut Exchange future.
* @throws IgniteCheckedException If failed.
*/
- private void initCachesAffinity(GridDhtPartitionsExchangeFuture fut) throws IgniteCheckedException {
+ private void initAffinityNoLateAssignment(GridDhtPartitionsExchangeFuture fut) throws IgniteCheckedException {
assert !lateAffAssign;
for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
if (cacheCtx.isLocal())
continue;
- initAffinity(cacheCtx.affinity().affinityCache(), fut, false);
+ initAffinity(registeredCaches.get(cacheCtx.cacheId()), cacheCtx.affinity().affinityCache(), fut, false);
}
}
@@ -1099,7 +1128,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
cctx.io().addHandler(desc.cacheId(), GridDhtAffinityAssignmentResponse.class,
new IgniteBiInClosure<UUID, GridDhtAffinityAssignmentResponse>() {
@Override public void apply(UUID nodeId, GridDhtAffinityAssignmentResponse res) {
- processAffinityAssignmentResponse(cacheId, nodeId, res);
+ processAffinityAssignmentResponse(nodeId, res);
}
}
);
@@ -1125,7 +1154,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
assert prev.topologyVersion().compareTo(fut.topologyVersion()) < 0 : prev;
GridDhtAssignmentFetchFuture fetchFut = new GridDhtAssignmentFetchFuture(cctx,
- aff.cacheName(),
+ desc,
prev.topologyVersion(),
prev.discoCache());
@@ -1192,7 +1221,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
cctx.io().addHandler(cacheId, GridDhtAffinityAssignmentResponse.class,
new IgniteBiInClosure<UUID, GridDhtAffinityAssignmentResponse>() {
@Override public void apply(UUID nodeId, GridDhtAffinityAssignmentResponse res) {
- processAffinityAssignmentResponse(cacheId, nodeId, res);
+ processAffinityAssignmentResponse(nodeId, res);
}
}
);
@@ -1714,7 +1743,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
assert ccfg != null : cacheDesc;
assert ccfg.getCacheMode() != LOCAL : ccfg.getName();
- assert !cctx.discovery().cacheAffinityNodes(ccfg.getName(), fut.topologyVersion()).contains(cctx.localNode());
+ assert !cctx.discovery().cacheAffinityNodes(ccfg.getName(),
+ fut.topologyVersion()).contains(cctx.localNode()) : cacheDesc.cacheName();
AffinityFunction affFunc = cctx.cache().clone(ccfg.getAffinity());
http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClientReconnectDiscoveryData.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClientReconnectDiscoveryData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClientReconnectDiscoveryData.java
new file mode 100644
index 0000000..a30331f
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClientReconnectDiscoveryData.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.io.Serializable;
+import java.util.Map;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteUuid;
+
+/**
+ * Discovery data sent from client reconnecting to cluster.
+ */
+public class CacheClientReconnectDiscoveryData implements Serializable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private final Map<String, CacheInfo> clientCaches;
+
+ /**
+ * @param clientCaches Information about caches started on re-joining client node.
+ */
+ CacheClientReconnectDiscoveryData(Map<String, CacheInfo> clientCaches) {
+ this.clientCaches = clientCaches;
+ }
+
+ /**
+ * @return Information about caches started on re-joining client node.
+ */
+ Map<String, CacheInfo> clientCaches() {
+ return clientCaches;
+ }
+
+ /**
+ *
+ */
+ static class CacheInfo implements Serializable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private final CacheConfiguration ccfg;
+
+ /** */
+ private final CacheType cacheType;
+
+ /** */
+ private final IgniteUuid deploymentId;
+
+ /** */
+ private final boolean nearCache;
+
+ /** Flags added for future usage. */
+ private final byte flags;
+
+ /**
+ * @param ccfg Cache configuration.
+ * @param cacheType Cache type.
+ * @param deploymentId Cache deployment ID.
+ * @param nearCache Near cache flag.
+ * @param flags Flags (for future usage).
+ */
+ CacheInfo(CacheConfiguration ccfg,
+ CacheType cacheType,
+ IgniteUuid deploymentId,
+ boolean nearCache,
+ byte flags) {
+ assert ccfg != null;
+ assert cacheType != null;
+ assert deploymentId != null;
+
+ this.ccfg = ccfg;
+ this.cacheType = cacheType;
+ this.deploymentId = deploymentId;
+ this.nearCache = nearCache;
+ this.flags = flags;
+ }
+
+ /**
+ * @return Cache configuration.
+ */
+ CacheConfiguration config() {
+ return ccfg;
+ }
+
+ /**
+ * @return Cache type.
+ */
+ CacheType cacheType() {
+ return cacheType;
+ }
+
+ /**
+ * @return Cache deployment ID.
+ */
+ IgniteUuid deploymentId() {
+ return deploymentId;
+ }
+
+ /**
+ * @return Near cache flag.
+ */
+ boolean nearCache() {
+ return nearCache;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(CacheInfo.class, this);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(CacheClientReconnectDiscoveryData.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java
new file mode 100644
index 0000000..4768a9a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.io.Serializable;
+import java.util.UUID;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.processors.query.QuerySchema;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteUuid;
+
+/**
+ * Cache information sent in discovery data to joining node.
+ */
+public class CacheData implements Serializable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private final CacheConfiguration cacheCfg;
+
+ /** */
+ private final Integer cacheId;
+
+ /** */
+ private final CacheType cacheType;
+
+ /** */
+ private final IgniteUuid deploymentId;
+
+ /** */
+ private final QuerySchema schema;
+
+ /** */
+ private final UUID rcvdFrom;
+
+ /** */
+ private final boolean staticCfg;
+
+ /** */
+ private final boolean template;
+
+ /** Flags added for future usage. */
+ private final byte flags;
+
+ /**
+ * @param cacheCfg Cache configuration.
+ * @param cacheId Cache ID.
+ * @param cacheType Cache ID.
+ * @param deploymentId Cache deployment ID.
+ * @param schema Query schema.
+ * @param rcvdFrom Node ID cache was started from.
+ * @param staticCfg {@code True} if cache was statically configured.
+ * @param template {@code True} if this is cache template.
+ * @param flags Flags (added for future usage).
+ */
+ CacheData(CacheConfiguration cacheCfg,
+ int cacheId,
+ CacheType cacheType,
+ IgniteUuid deploymentId,
+ QuerySchema schema,
+ UUID rcvdFrom,
+ boolean staticCfg,
+ boolean template,
+ byte flags) {
+ assert cacheCfg != null;
+ assert rcvdFrom != null : cacheCfg.getName();
+ assert deploymentId != null : cacheCfg.getName();
+ assert template || cacheId != 0 : cacheCfg.getName();
+
+ this.cacheCfg = cacheCfg;
+ this.cacheId = cacheId;
+ this.cacheType = cacheType;
+ this.deploymentId = deploymentId;
+ this.schema = schema;
+ this.rcvdFrom = rcvdFrom;
+ this.staticCfg = staticCfg;
+ this.template = template;
+ this.flags = flags;
+ }
+
+ /**
+ * @return Cache ID.
+ */
+ public Integer cacheId() {
+ return cacheId;
+ }
+
+ /**
+ * @return {@code True} if this is template configuration.
+ */
+ public boolean template() {
+ return template;
+ }
+
+ /**
+ * @return Cache type.
+ */
+ public CacheType cacheType() {
+ return cacheType;
+ }
+
+ /**
+ * @return Start ID.
+ */
+ public IgniteUuid deploymentId() {
+ return deploymentId;
+ }
+
+ /**
+ * @return {@code True} if statically configured.
+ */
+ public boolean staticallyConfigured() {
+ return staticCfg;
+ }
+
+ /**
+ * @return Cache configuration.
+ */
+ public CacheConfiguration cacheConfiguration() {
+ return cacheCfg;
+ }
+
+ /**
+ * @return Schema.
+ */
+ public QuerySchema schema() {
+ return schema.copy();
+ }
+
+ /**
+ * @return ID of node provided cache configuration.
+ */
+ public UUID receivedFrom() {
+ return rcvdFrom;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(CacheData.class, this, "cacheName", cacheCfg.getName());
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheJoinNodeDiscoveryData.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheJoinNodeDiscoveryData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheJoinNodeDiscoveryData.java
new file mode 100644
index 0000000..ea24140
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheJoinNodeDiscoveryData.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.io.Serializable;
+import java.util.Map;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteUuid;
+
+/**
+ * Information about configured caches sent from joining node.
+ */
+class CacheJoinNodeDiscoveryData implements Serializable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ @GridToStringInclude
+ private final Map<String, CacheInfo> caches;
+
+ /** */
+ @GridToStringInclude
+ private final Map<String, CacheInfo> templates;
+
+ /** */
+ @GridToStringInclude
+ private final IgniteUuid cacheDeploymentId;
+
+ /** */
+ private final boolean startCaches;
+
+ /**
+ * @param cacheDeploymentId Deployment ID for started caches.
+ * @param caches Caches.
+ * @param templates Templates.
+ * @param startCaches {@code True} if required to start all caches on joining node.
+ */
+ CacheJoinNodeDiscoveryData(
+ IgniteUuid cacheDeploymentId,
+ Map<String, CacheJoinNodeDiscoveryData.CacheInfo> caches,
+ Map<String, CacheJoinNodeDiscoveryData.CacheInfo> templates,
+ boolean startCaches) {
+ this.cacheDeploymentId = cacheDeploymentId;
+ this.caches = caches;
+ this.templates = templates;
+ this.startCaches = startCaches;
+ }
+
+ /**
+ * @return {@code True} if required to start all caches on joining node.
+ */
+ boolean startCaches() {
+ return startCaches;
+ }
+
+ /**
+ * @return Deployment ID assigned on joining node.
+ */
+ IgniteUuid cacheDeploymentId() {
+ return cacheDeploymentId;
+ }
+
+ /**
+ * @return Templates configured on joining node.
+ */
+ Map<String, CacheInfo> templates() {
+ return templates;
+ }
+
+ /**
+ * @return Caches configured on joining node.
+ */
+ Map<String, CacheInfo> caches() {
+ return caches;
+ }
+
+ /**
+ *
+ */
+ static class CacheInfo implements Serializable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ @GridToStringInclude
+ private final CacheConfiguration ccfg;
+
+ /** */
+ @GridToStringInclude
+ private final CacheType cacheType;
+
+ /** Flags added for future usage. */
+ private final byte flags;
+
+ /**
+ * @param ccfg Cache configuration.
+ * @param cacheType Cache type.
+ * @param flags Flags (for future usage).
+ */
+ CacheInfo(CacheConfiguration ccfg, CacheType cacheType, byte flags) {
+ this.ccfg = ccfg;
+ this.cacheType = cacheType;
+ this.flags = flags;
+ }
+
+ /**
+ * @return Cache configuration.
+ */
+ CacheConfiguration config() {
+ return ccfg;
+ }
+
+ /**
+ * @return Cache type.
+ */
+ CacheType cacheType() {
+ return cacheType;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(CacheInfo.class, this);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(CacheJoinNodeDiscoveryData.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheNodeCommonDiscoveryData.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheNodeCommonDiscoveryData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheNodeCommonDiscoveryData.java
new file mode 100644
index 0000000..84a33dc
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheNodeCommonDiscoveryData.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * Cache information sent in discovery data to joining node.
+ */
+class CacheNodeCommonDiscoveryData implements Serializable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ @GridToStringInclude
+ private final Map<String, CacheData> caches;
+
+ /** */
+ @GridToStringInclude
+ private final Map<String, CacheData> templates;
+
+ /** */
+ private final Map<String, Map<UUID, Boolean>> clientNodesMap;
+
+ /**
+ * @param caches Started caches.
+ * @param templates Configured templates.
+ * @param clientNodesMap Information about cache client nodes.
+ */
+ CacheNodeCommonDiscoveryData(Map<String, CacheData> caches,
+ Map<String, CacheData> templates,
+ Map<String, Map<UUID, Boolean>> clientNodesMap) {
+ this.caches = caches;
+ this.templates = templates;
+ this.clientNodesMap = clientNodesMap;
+ }
+
+ /**
+ * @return Started caches.
+ */
+ Map<String, CacheData> caches() {
+ return caches;
+ }
+
+ /**
+ * @return Configured templates.
+ */
+ Map<String, CacheData> templates() {
+ return templates;
+ }
+
+ /**
+ * @return Information about cache client nodes.
+ */
+ Map<String, Map<UUID, Boolean>> clientNodesMap() {
+ return clientNodesMap;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(CacheNodeCommonDiscoveryData.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
new file mode 100644
index 0000000..28ec600
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
@@ -0,0 +1,903 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.CacheExistsException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.query.QuerySchema;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.spi.discovery.DiscoveryDataBag;
+
+import static org.apache.ignite.cache.CacheMode.LOCAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
+import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType.CACHE_PROC;
+
+/**
+ * Logic related to cache discovery date processing.
+ */
+class ClusterCachesInfo {
+ /** */
+ private final GridKernalContext ctx;
+
+ /** Dynamic caches. */
+ private final ConcurrentMap<String, DynamicCacheDescriptor> registeredCaches = new ConcurrentHashMap<>();
+
+ /** Cache templates. */
+ private final ConcurrentMap<String, DynamicCacheDescriptor> registeredTemplates = new ConcurrentHashMap<>();
+
+ /** */
+ private final IgniteLogger log;
+
+ /** */
+ private Map<String, DynamicCacheDescriptor> cachesOnDisconnect;
+
+ /** */
+ private CacheJoinNodeDiscoveryData joinDiscoData;
+
+ /** */
+ private CacheNodeCommonDiscoveryData gridData;
+
+ /** */
+ private List<T2<DynamicCacheDescriptor, NearCacheConfiguration>> locJoinStartCaches;
+
+ /** */
+ private Map<UUID, CacheClientReconnectDiscoveryData> clientReconnectReqs;
+
+ /**
+ * @param ctx Context.
+ */
+ ClusterCachesInfo(GridKernalContext ctx) {
+ this.ctx = ctx;
+
+ log = ctx.log(getClass());
+ }
+
+ /**
+ * @param joinDiscoData Information about configured caches and templates.
+ */
+ void onStart(CacheJoinNodeDiscoveryData joinDiscoData) {
+ this.joinDiscoData = joinDiscoData;
+
+ processJoiningNode(joinDiscoData, ctx.localNodeId());
+ }
+
+ /**
+ * @param checkConsistency {@code True} if need check cache configurations consistency.
+ * @throws IgniteCheckedException If failed.
+ */
+ void onKernalStart(boolean checkConsistency) throws IgniteCheckedException {
+ if (checkConsistency && joinDiscoData != null && gridData != null) {
+ for (CacheJoinNodeDiscoveryData.CacheInfo locCacheInfo : joinDiscoData.caches().values()) {
+ CacheConfiguration locCfg = locCacheInfo.config();
+
+ CacheData cacheData = gridData.caches().get(locCfg.getName());
+
+ if (cacheData != null)
+ checkCache(locCfg, cacheData.cacheConfiguration(), cacheData.receivedFrom());
+ }
+ }
+
+ joinDiscoData = null;
+ gridData = null;
+ }
+ /**
+ * Checks that remote caches has configuration compatible with the local.
+ *
+ * @param locCfg Local configuration.
+ * @param rmtCfg Remote configuration.
+ * @param rmt Remote node.
+ * @throws IgniteCheckedException If check failed.
+ */
+ private void checkCache(CacheConfiguration<?, ?> locCfg, CacheConfiguration<?, ?> rmtCfg, UUID rmt)
+ throws IgniteCheckedException {
+ GridCacheAttributes rmtAttr = new GridCacheAttributes(rmtCfg);
+ GridCacheAttributes locAttr = new GridCacheAttributes(locCfg);
+
+ CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "cacheMode", "Cache mode",
+ locAttr.cacheMode(), rmtAttr.cacheMode(), true);
+
+ if (rmtAttr.cacheMode() != LOCAL) {
+ CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "interceptor", "Cache Interceptor",
+ locAttr.interceptorClassName(), rmtAttr.interceptorClassName(), true);
+
+ CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "atomicityMode",
+ "Cache atomicity mode", locAttr.atomicityMode(), rmtAttr.atomicityMode(), true);
+
+ CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "cachePreloadMode",
+ "Cache preload mode", locAttr.cacheRebalanceMode(), rmtAttr.cacheRebalanceMode(), true);
+
+ ClusterNode rmtNode = ctx.discovery().node(rmt);
+
+ if (CU.affinityNode(ctx.discovery().localNode(), locCfg.getNodeFilter())
+ && rmtNode != null && CU.affinityNode(rmtNode, rmtCfg.getNodeFilter())) {
+ CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "storeFactory", "Store factory",
+ locAttr.storeFactoryClassName(), rmtAttr.storeFactoryClassName(), true);
+ }
+
+ CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "cacheAffinity", "Cache affinity",
+ locAttr.cacheAffinityClassName(), rmtAttr.cacheAffinityClassName(), true);
+
+ CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "cacheAffinityMapper",
+ "Cache affinity mapper", locAttr.cacheAffinityMapperClassName(),
+ rmtAttr.cacheAffinityMapperClassName(), true);
+
+ CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "affinityPartitionsCount",
+ "Affinity partitions count", locAttr.affinityPartitionsCount(),
+ rmtAttr.affinityPartitionsCount(), true);
+
+ CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "evictionFilter", "Eviction filter",
+ locAttr.evictionFilterClassName(), rmtAttr.evictionFilterClassName(), true);
+
+ CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "evictionPolicy", "Eviction policy",
+ locAttr.evictionPolicyClassName(), rmtAttr.evictionPolicyClassName(), true);
+
+ CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "transactionManagerLookup",
+ "Transaction manager lookup", locAttr.transactionManagerLookupClassName(),
+ rmtAttr.transactionManagerLookupClassName(), false);
+
+ CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "defaultLockTimeout",
+ "Default lock timeout", locAttr.defaultLockTimeout(), rmtAttr.defaultLockTimeout(), false);
+
+ CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "preloadBatchSize",
+ "Preload batch size", locAttr.rebalanceBatchSize(), rmtAttr.rebalanceBatchSize(), false);
+
+ CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "writeSynchronizationMode",
+ "Write synchronization mode", locAttr.writeSynchronization(), rmtAttr.writeSynchronization(),
+ true);
+
+ CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "writeBehindBatchSize",
+ "Write behind batch size", locAttr.writeBehindBatchSize(), rmtAttr.writeBehindBatchSize(),
+ false);
+
+ CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "writeBehindEnabled",
+ "Write behind enabled", locAttr.writeBehindEnabled(), rmtAttr.writeBehindEnabled(), false);
+
+ CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "writeBehindFlushFrequency",
+ "Write behind flush frequency", locAttr.writeBehindFlushFrequency(),
+ rmtAttr.writeBehindFlushFrequency(), false);
+
+ CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "writeBehindFlushSize",
+ "Write behind flush size", locAttr.writeBehindFlushSize(), rmtAttr.writeBehindFlushSize(),
+ false);
+
+ CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "writeBehindFlushThreadCount",
+ "Write behind flush thread count", locAttr.writeBehindFlushThreadCount(),
+ rmtAttr.writeBehindFlushThreadCount(), false);
+
+ if (locAttr.cacheMode() == PARTITIONED) {
+ CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "nearEvictionPolicy",
+ "Near eviction policy", locAttr.nearEvictionPolicyClassName(),
+ rmtAttr.nearEvictionPolicyClassName(), false);
+
+ CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "affinityIncludeNeighbors",
+ "Affinity include neighbors", locAttr.affinityIncludeNeighbors(),
+ rmtAttr.affinityIncludeNeighbors(), true);
+
+ CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "affinityKeyBackups",
+ "Affinity key backups", locAttr.affinityKeyBackups(),
+ rmtAttr.affinityKeyBackups(), true);
+ }
+ }
+ }
+
+ /**
+ * @param batch Cache change request.
+ * @param topVer Topology version.
+ * @return {@code True} if minor topology version should be increased.
+ */
+ boolean onCacheChangeRequested(DynamicCacheChangeBatch batch, AffinityTopologyVersion topVer) {
+ ExchangeActions exchangeActions = new ExchangeActions();
+
+ boolean incMinorTopVer = false;
+
+ List<DynamicCacheDescriptor> addedDescs = new ArrayList<>();
+
+ final List<T2<DynamicCacheChangeRequest, AffinityTopologyVersion>> reqsToComplete = new ArrayList<>();
+
+ for (DynamicCacheChangeRequest req : batch.requests()) {
+ if (req.template()) {
+ CacheConfiguration ccfg = req.startCacheConfiguration();
+
+ assert ccfg != null : req;
+
+ DynamicCacheDescriptor desc = registeredTemplates.get(req.cacheName());
+
+ if (desc == null) {
+ DynamicCacheDescriptor templateDesc = new DynamicCacheDescriptor(ctx,
+ ccfg,
+ req.cacheType(),
+ true,
+ req.initiatingNodeId(),
+ false,
+ req.deploymentId(),
+ req.schema());
+
+ DynamicCacheDescriptor old = registeredTemplates().put(ccfg.getName(), templateDesc);
+
+ assert old == null;
+
+ addedDescs.add(templateDesc);
+ }
+
+ ctx.cache().completeTemplateAddFuture(ccfg.getName(), req.deploymentId());
+
+ continue;
+ }
+
+ DynamicCacheDescriptor desc = req.globalStateChange() ? null : registeredCaches.get(req.cacheName());
+
+ boolean needExchange = false;
+
+ AffinityTopologyVersion waitTopVer = null;
+
+ if (req.start()) {
+ if (desc == null) {
+ if (req.clientStartOnly()) {
+ ctx.cache().completeCacheStartFuture(req, new IgniteCheckedException("Failed to start " +
+ "client cache (a cache with the given name is not started): " + req.cacheName()));
+ }
+ else {
+ CacheConfiguration<?, ?> ccfg = req.startCacheConfiguration();
+
+ assert req.cacheType() != null : req;
+ assert F.eq(ccfg.getName(), req.cacheName()) : req;
+
+ DynamicCacheDescriptor startDesc = new DynamicCacheDescriptor(ctx,
+ ccfg,
+ req.cacheType(),
+ false,
+ req.initiatingNodeId(),
+ false,
+ req.deploymentId(),
+ req.schema());
+
+ DynamicCacheDescriptor old = registeredCaches.put(ccfg.getName(), startDesc);
+
+ assert old == null;
+
+ ctx.discovery().setCacheFilter(
+ ccfg.getName(),
+ ccfg.getNodeFilter(),
+ ccfg.getNearConfiguration() != null,
+ ccfg.getCacheMode());
+
+ ctx.discovery().addClientNode(req.cacheName(),
+ req.initiatingNodeId(),
+ req.nearCacheConfiguration() != null);
+
+ addedDescs.add(startDesc);
+
+ exchangeActions.addCacheToStart(req, startDesc);
+
+ needExchange = true;
+ }
+ }
+ else {
+ assert req.initiatingNodeId() != null : req;
+
+ // Cache already exists, exchange is needed only if client cache should be created.
+ ClusterNode node = ctx.discovery().node(req.initiatingNodeId());
+
+ boolean clientReq = node != null &&
+ !ctx.discovery().cacheAffinityNode(node, req.cacheName());
+
+ if (req.clientStartOnly()) {
+ needExchange = clientReq && ctx.discovery().addClientNode(req.cacheName(),
+ req.initiatingNodeId(),
+ req.nearCacheConfiguration() != null);
+ }
+ else {
+ if (req.failIfExists()) {
+ ctx.cache().completeCacheStartFuture(req,
+ new CacheExistsException("Failed to start cache " +
+ "(a cache with the same name is already started): " + req.cacheName()));
+ }
+ else {
+ needExchange = clientReq && ctx.discovery().addClientNode(req.cacheName(),
+ req.initiatingNodeId(),
+ req.nearCacheConfiguration() != null);
+ }
+ }
+
+ if (needExchange) {
+ req.clientStartOnly(true);
+
+ desc.clientCacheStartVersion(topVer.nextMinorVersion());
+
+ exchangeActions.addClientCacheToStart(req, desc);
+ }
+ }
+
+ if (!needExchange && desc != null) {
+ if (desc.clientCacheStartVersion() != null)
+ waitTopVer = desc.clientCacheStartVersion();
+ else
+ waitTopVer = desc.startTopologyVersion();
+ }
+ }
+ else if (req.globalStateChange())
+ exchangeActions.newClusterState(req.state());
+ else if (req.resetLostPartitions()) {
+ if (desc != null) {
+ needExchange = true;
+
+ exchangeActions.addCacheToResetLostPartitions(req, desc);
+ }
+ }
+ else if (req.stop()) {
+ assert req.stop() ^ req.close() : req;
+
+ if (desc != null) {
+ DynamicCacheDescriptor old = registeredCaches.remove(req.cacheName());
+
+ assert old != null : "Dynamic cache map was concurrently modified [req=" + req + ']';
+
+ ctx.discovery().removeCacheFilter(req.cacheName());
+
+ needExchange = true;
+
+ exchangeActions.addCacheToStop(req, desc);
+ }
+ }
+ else if (req.close()) {
+ if (desc != null) {
+ needExchange = ctx.discovery().onClientCacheClose(req.cacheName(), req.initiatingNodeId());
+
+ if (needExchange)
+ exchangeActions.addCacheToClose(req, desc);
+ }
+ }
+ else
+ assert false : req;
+
+ if (!needExchange) {
+ if (req.initiatingNodeId().equals(ctx.localNodeId()))
+ reqsToComplete.add(new T2<>(req, waitTopVer));
+ }
+ else
+ incMinorTopVer = true;
+ }
+
+ if (!F.isEmpty(addedDescs)) {
+ AffinityTopologyVersion startTopVer = incMinorTopVer ? topVer.nextMinorVersion() : topVer;
+
+ for (DynamicCacheDescriptor desc : addedDescs) {
+ assert desc.template() || incMinorTopVer;
+
+ desc.startTopologyVersion(startTopVer);
+ }
+ }
+
+ if (!F.isEmpty(reqsToComplete)) {
+ ctx.closure().callLocalSafe(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ for (T2<DynamicCacheChangeRequest, AffinityTopologyVersion> t :reqsToComplete) {
+ final DynamicCacheChangeRequest req = t.get1();
+ AffinityTopologyVersion waitTopVer = t.get2();
+
+ IgniteInternalFuture<?> fut = waitTopVer != null ?
+ ctx.cache().context().exchange().affinityReadyFuture(waitTopVer) : null;
+
+ if (fut == null || fut.isDone())
+ ctx.cache().completeCacheStartFuture(req, null);
+ else {
+ fut.listen(new IgniteInClosure<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> fut) {
+ ctx.cache().completeCacheStartFuture(req, null);
+ }
+ });
+ }
+ }
+
+ return null;
+ }
+ });
+ }
+
+ if (incMinorTopVer) {
+ assert !exchangeActions.empty() : exchangeActions;
+
+ batch.exchangeActions(exchangeActions);
+ }
+
+ return incMinorTopVer;
+ }
+
+ /**
+ * @param dataBag Discovery data bag.
+ */
+ void collectJoiningNodeData(DiscoveryDataBag dataBag) {
+ if (!ctx.isDaemon())
+ dataBag.addJoiningNodeData(CACHE_PROC.ordinal(), joinDiscoveryData());
+ }
+
+ /**
+ * @return Discovery date sent on local node join.
+ */
+ private Serializable joinDiscoveryData() {
+ if (cachesOnDisconnect != null) {
+ Map<String, CacheClientReconnectDiscoveryData.CacheInfo> cachesInfo = new HashMap<>();
+
+ for (IgniteInternalCache cache : ctx.cache().caches()) {
+ DynamicCacheDescriptor desc = cachesOnDisconnect.get(cache.name());
+
+ assert desc != null : cache.name();
+
+ cachesInfo.put(cache.name(), new CacheClientReconnectDiscoveryData.CacheInfo(desc.cacheConfiguration(),
+ desc.cacheType(),
+ desc.deploymentId(),
+ cache.context().isNear(),
+ (byte)0));
+ }
+
+ return new CacheClientReconnectDiscoveryData(cachesInfo);
+ }
+ else {
+ assert ctx.config().isDaemon() || joinDiscoData != null || !ctx.state().active();
+
+ return joinDiscoData;
+ }
+ }
+
+ /**
+ * Called from exchange worker.
+ *
+ * @return Caches to be started when this node starts.
+ */
+ List<T2<DynamicCacheDescriptor, NearCacheConfiguration>> cachesToStartOnLocalJoin() {
+ if (ctx.isDaemon())
+ return Collections.emptyList();
+
+ assert locJoinStartCaches != null;
+
+ List<T2<DynamicCacheDescriptor, NearCacheConfiguration>> locJoinStartCaches = this.locJoinStartCaches;
+
+ this.locJoinStartCaches = null;
+
+ return locJoinStartCaches;
+ }
+
+ /**
+ * @param joinedNodeId Joined node ID.
+ * @return New caches received from joined node.
+ */
+ List<DynamicCacheDescriptor> cachesReceivedFromJoin(UUID joinedNodeId) {
+ assert joinedNodeId != null;
+
+ List<DynamicCacheDescriptor> started = null;
+
+ if (!ctx.isDaemon()) {
+ for (DynamicCacheDescriptor desc : registeredCaches.values()) {
+ if (desc.staticallyConfigured()) {
+ assert desc.receivedFrom() != null : desc;
+
+ if (joinedNodeId.equals(desc.receivedFrom())) {
+ if (started == null)
+ started = new ArrayList<>();
+
+ started.add(desc);
+ }
+ }
+ }
+ }
+
+ return started != null ? started : Collections.<DynamicCacheDescriptor>emptyList();
+ }
+
+ /**
+ * Discovery event callback, executed from discovery thread.
+ *
+ * @param type Event type.
+ * @param node Event node.
+ * @param topVer Topology version.
+ */
+ void onDiscoveryEvent(int type, ClusterNode node, AffinityTopologyVersion topVer) {
+ if (type == EVT_NODE_JOINED && !ctx.isDaemon()) {
+ for (DynamicCacheDescriptor desc : registeredCaches.values()) {
+ if (node.id().equals(desc.receivedFrom()))
+ desc.receivedFromStartVersion(topVer);
+ }
+
+ for (DynamicCacheDescriptor desc : registeredTemplates.values()) {
+ if (node.id().equals(desc.receivedFrom()))
+ desc.receivedFromStartVersion(topVer);
+ }
+
+ if (node.id().equals(ctx.discovery().localNode().id())) {
+ if (gridData == null) { // First node starts.
+ assert joinDiscoData != null || !ctx.state().active();
+
+ initStartCachesForLocalJoin(true);
+ }
+ }
+ }
+ }
+
+ /**
+ * @param dataBag Discovery data bag.
+ */
+ void collectGridNodeData(DiscoveryDataBag dataBag) {
+ if (ctx.isDaemon())
+ return;
+
+ if (!dataBag.commonDataCollectedFor(CACHE_PROC.ordinal()))
+ dataBag.addGridCommonData(CACHE_PROC.ordinal(), collectCommonDiscoveryData());
+ }
+
+ /**
+ * @return Information about started caches.
+ */
+ private CacheNodeCommonDiscoveryData collectCommonDiscoveryData() {
+ Map<String, CacheData> caches = new HashMap<>();
+
+ for (DynamicCacheDescriptor desc : registeredCaches.values()) {
+ CacheData cacheData = new CacheData(desc.cacheConfiguration(),
+ desc.cacheId(),
+ desc.cacheType(),
+ desc.deploymentId(),
+ desc.schema(),
+ desc.receivedFrom(),
+ desc.staticallyConfigured(),
+ false,
+ (byte)0);
+
+ caches.put(desc.cacheName(), cacheData);
+ }
+
+ Map<String, CacheData> templates = new HashMap<>();
+
+ for (DynamicCacheDescriptor desc : registeredTemplates.values()) {
+ CacheData cacheData = new CacheData(desc.cacheConfiguration(),
+ 0,
+ desc.cacheType(),
+ desc.deploymentId(),
+ desc.schema(),
+ desc.receivedFrom(),
+ desc.staticallyConfigured(),
+ true,
+ (byte)0);
+
+ templates.put(desc.cacheName(), cacheData);
+ }
+
+ return new CacheNodeCommonDiscoveryData(caches, templates, ctx.discovery().clientNodesMap());
+ }
+
+ /**
+ * @param data Discovery data.
+ */
+ void onGridDataReceived(DiscoveryDataBag.GridDiscoveryData data) {
+ if (ctx.isDaemon() || data.commonData() == null)
+ return;
+
+ assert joinDiscoData != null || disconnectedState() || !ctx.state().active();
+ assert data.commonData() instanceof CacheNodeCommonDiscoveryData : data;
+
+ CacheNodeCommonDiscoveryData cachesData = (CacheNodeCommonDiscoveryData)data.commonData();
+
+ for (CacheData cacheData : cachesData.templates().values()) {
+ DynamicCacheDescriptor desc = new DynamicCacheDescriptor(
+ ctx,
+ cacheData.cacheConfiguration(),
+ cacheData.cacheType(),
+ true,
+ cacheData.receivedFrom(),
+ cacheData.staticallyConfigured(),
+ cacheData.deploymentId(),
+ cacheData.schema());
+
+ registeredTemplates.put(cacheData.cacheConfiguration().getName(), desc);
+ }
+
+ for (CacheData cacheData : cachesData.caches().values()) {
+ CacheConfiguration<?, ?> cfg = cacheData.cacheConfiguration();
+
+ DynamicCacheDescriptor desc = new DynamicCacheDescriptor(
+ ctx,
+ cacheData.cacheConfiguration(),
+ cacheData.cacheType(),
+ false,
+ cacheData.receivedFrom(),
+ cacheData.staticallyConfigured(),
+ cacheData.deploymentId(),
+ cacheData.schema());
+
+ desc.receivedOnDiscovery(true);
+
+ registeredCaches.put(cacheData.cacheConfiguration().getName(), desc);
+
+ ctx.discovery().setCacheFilter(
+ cfg.getName(),
+ cfg.getNodeFilter(),
+ cfg.getNearConfiguration() != null,
+ cfg.getCacheMode());
+ }
+
+ if (!F.isEmpty(cachesData.clientNodesMap())) {
+ for (Map.Entry<String, Map<UUID, Boolean>> entry : cachesData.clientNodesMap().entrySet()) {
+ String cacheName = entry.getKey();
+
+ for (Map.Entry<UUID, Boolean> tup : entry.getValue().entrySet())
+ ctx.discovery().addClientNode(cacheName, tup.getKey(), tup.getValue());
+ }
+ }
+
+ gridData = cachesData;
+
+ if (!disconnectedState())
+ initStartCachesForLocalJoin(false);
+ else
+ locJoinStartCaches = Collections.emptyList();
+ }
+
+ /**
+ * @param firstNode {@code True} if first node in cluster starts.
+ */
+ private void initStartCachesForLocalJoin(boolean firstNode) {
+ assert locJoinStartCaches == null;
+
+ locJoinStartCaches = new ArrayList<>();
+
+ if (joinDiscoData != null) {
+ for (DynamicCacheDescriptor desc : registeredCaches.values()) {
+ if (firstNode && !joinDiscoData.caches().containsKey(desc.cacheName()))
+ continue;
+
+ CacheConfiguration<?, ?> cfg = desc.cacheConfiguration();
+
+ CacheJoinNodeDiscoveryData.CacheInfo locCfg = joinDiscoData.caches().get(cfg.getName());
+
+ NearCacheConfiguration nearCfg = null;
+
+ if (locCfg != null) {
+ nearCfg = locCfg.config().getNearConfiguration();
+
+ DynamicCacheDescriptor desc0 = new DynamicCacheDescriptor(ctx,
+ locCfg.config(),
+ desc.cacheType(),
+ desc.template(),
+ desc.receivedFrom(),
+ desc.staticallyConfigured(),
+ desc.deploymentId(),
+ desc.schema());
+
+ desc0.startTopologyVersion(desc.startTopologyVersion());
+ desc0.receivedFromStartVersion(desc.receivedFromStartVersion());
+ desc0.clientCacheStartVersion(desc.clientCacheStartVersion());
+
+ desc = desc0;
+ }
+
+ if (locCfg != null || joinDiscoData.startCaches() || CU.affinityNode(ctx.discovery().localNode(), cfg.getNodeFilter()))
+ locJoinStartCaches.add(new T2<>(desc, nearCfg));
+ }
+ }
+ }
+
+ /**
+ * @param data Joining node data.
+ */
+ void onJoiningNodeDataReceived(DiscoveryDataBag.JoiningNodeDiscoveryData data) {
+ if (data.hasJoiningNodeData()) {
+ Serializable joiningNodeData = data.joiningNodeData();
+
+ if (joiningNodeData instanceof CacheClientReconnectDiscoveryData) {
+ if (disconnectedState()) {
+ if (clientReconnectReqs == null)
+ clientReconnectReqs = new LinkedHashMap<>();
+
+ clientReconnectReqs.put(data.joiningNodeId(), (CacheClientReconnectDiscoveryData)joiningNodeData);
+ }
+ else
+ processClientReconnectData((CacheClientReconnectDiscoveryData) joiningNodeData, data.joiningNodeId());
+ }
+ else if (joiningNodeData instanceof CacheJoinNodeDiscoveryData)
+ processJoiningNode((CacheJoinNodeDiscoveryData)joiningNodeData, data.joiningNodeId());
+ }
+ }
+
+ /**
+ * @param clientData Discovery data.
+ * @param clientNodeId Client node ID.
+ */
+ private void processClientReconnectData(CacheClientReconnectDiscoveryData clientData, UUID clientNodeId) {
+ for (CacheClientReconnectDiscoveryData.CacheInfo cacheInfo : clientData.clientCaches().values()) {
+ String cacheName = cacheInfo.config().getName();
+
+ if (surviveReconnect(cacheName))
+ ctx.discovery().addClientNode(cacheName, clientNodeId, false);
+ else {
+ DynamicCacheDescriptor desc = registeredCaches.get(cacheName);
+
+ if (desc != null && desc.deploymentId().equals(cacheInfo.deploymentId()))
+ ctx.discovery().addClientNode(cacheName, clientNodeId, cacheInfo.nearCache());
+ }
+ }
+ }
+
+ /**
+ * @param joinData Joined node discovery data.
+ * @param nodeId Joined node ID.
+ */
+ private void processJoiningNode(CacheJoinNodeDiscoveryData joinData, UUID nodeId) {
+ for (CacheJoinNodeDiscoveryData.CacheInfo cacheInfo : joinData.templates().values()) {
+ CacheConfiguration<?, ?> cfg = cacheInfo.config();
+
+ if (!registeredTemplates.containsKey(cfg.getName())) {
+ DynamicCacheDescriptor desc = new DynamicCacheDescriptor(ctx,
+ cfg,
+ cacheInfo.cacheType(),
+ true,
+ nodeId,
+ true,
+ joinData.cacheDeploymentId(),
+ new QuerySchema(cfg.getQueryEntities()));
+
+ DynamicCacheDescriptor old = registeredTemplates.put(cfg.getName(), desc);
+
+ assert old == null : old;
+ }
+ }
+
+ for (CacheJoinNodeDiscoveryData.CacheInfo cacheInfo : joinData.caches().values()) {
+ CacheConfiguration<?, ?> cfg = cacheInfo.config();
+
+ if (!registeredCaches.containsKey(cfg.getName())) {
+ DynamicCacheDescriptor desc = new DynamicCacheDescriptor(ctx,
+ cfg,
+ cacheInfo.cacheType(),
+ false,
+ nodeId,
+ true,
+ joinData.cacheDeploymentId(),
+ new QuerySchema(cfg.getQueryEntities()));
+
+ DynamicCacheDescriptor old = registeredCaches.put(cfg.getName(), desc);
+
+ assert old == null : old;
+
+ ctx.discovery().setCacheFilter(
+ cfg.getName(),
+ cfg.getNodeFilter(),
+ cfg.getNearConfiguration() != null,
+ cfg.getCacheMode());
+ }
+
+ ctx.discovery().addClientNode(cfg.getName(), nodeId, cfg.getNearConfiguration() != null);
+ }
+
+ if (joinData.startCaches()) {
+ for (DynamicCacheDescriptor desc : registeredCaches.values()) {
+ ctx.discovery().addClientNode(desc.cacheName(),
+ nodeId,
+ desc.cacheConfiguration().getNearConfiguration() != null);
+ }
+ }
+ }
+
+ /**
+ * @return Registered caches.
+ */
+ ConcurrentMap<String, DynamicCacheDescriptor> registeredCaches() {
+ return registeredCaches;
+ }
+
+ /**
+ * @return Registered cache templates.
+ */
+ ConcurrentMap<String, DynamicCacheDescriptor> registeredTemplates() {
+ return registeredTemplates;
+ }
+
+ /**
+ *
+ */
+ void onDisconnect() {
+ cachesOnDisconnect = new HashMap<>(registeredCaches);
+
+ registeredCaches.clear();
+ registeredTemplates.clear();
+
+ clientReconnectReqs = null;
+ }
+
+ /**
+ * @return Stopped caches names.
+ */
+ Set<String> onReconnected() {
+ assert disconnectedState();
+
+ Set<String> stoppedCaches = new HashSet<>();
+
+ for(Map.Entry<String, DynamicCacheDescriptor> e : cachesOnDisconnect.entrySet()) {
+ DynamicCacheDescriptor desc = e.getValue();
+
+ String cacheName = e.getKey();
+
+ boolean stopped;
+
+ if (!surviveReconnect(cacheName)) {
+ DynamicCacheDescriptor newDesc = registeredCaches.get(cacheName);
+
+ stopped = newDesc == null || !desc.deploymentId().equals(newDesc.deploymentId());
+ }
+ else
+ stopped = false;
+
+ if (stopped)
+ stoppedCaches.add(cacheName);
+ }
+
+ if (clientReconnectReqs != null) {
+ for (Map.Entry<UUID, CacheClientReconnectDiscoveryData> e : clientReconnectReqs.entrySet())
+ processClientReconnectData(e.getValue(), e.getKey());
+
+ clientReconnectReqs = null;
+ }
+
+ cachesOnDisconnect = null;
+
+ return stoppedCaches;
+ }
+
+ /**
+ * @return {@code True} if client node is currently in disconnected state.
+ */
+ private boolean disconnectedState() {
+ return cachesOnDisconnect != null;
+ }
+
+ /**
+ * @param cacheName Cache name.
+ * @return {@code True} if cache with given name if system cache which should always survive client node disconnect.
+ */
+ private boolean surviveReconnect(String cacheName) {
+ return CU.isUtilityCache(cacheName) || CU.isAtomicsCache(cacheName);
+ }
+
+ /**
+ *
+ */
+ void clearCaches() {
+ registeredCaches.clear();
+ }
+}
[3/6] ignite git commit: Moved logic related to caches discovery data
handling to ClusterCachesInfo. Start of statically configured caches in the
same way as dynamic ones: from GridDhtPartitionsExchangeFuture.
Posted by sb...@apache.org.
http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 87aaee0..06ad62d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.processors.cache;
-import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -26,7 +25,6 @@ import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.IdentityHashMap;
-import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
@@ -61,13 +59,6 @@ import org.apache.ignite.configuration.NearCacheConfiguration;
import org.apache.ignite.configuration.TransactionConfiguration;
import org.apache.ignite.events.EventType;
import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.processors.query.QuerySchema;
-import org.apache.ignite.internal.processors.query.QueryUtils;
-import org.apache.ignite.internal.processors.query.schema.message.SchemaAbstractDiscoveryMessage;
-import org.apache.ignite.internal.processors.query.schema.SchemaExchangeWorkerTask;
-import org.apache.ignite.internal.processors.query.schema.SchemaNodeLeaveExchangeWorkerTask;
-import org.apache.ignite.internal.processors.query.schema.message.SchemaProposeDiscoveryMessage;
-import org.apache.ignite.internal.suggestions.GridPerformanceSuggestions;
import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
import org.apache.ignite.internal.IgniteComponentType;
import org.apache.ignite.internal.IgniteInternalFuture;
@@ -110,14 +101,23 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionManager;
import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;
import org.apache.ignite.internal.processors.plugin.CachePluginManager;
+import org.apache.ignite.internal.processors.query.QuerySchema;
+import org.apache.ignite.internal.processors.query.QueryUtils;
+import org.apache.ignite.internal.processors.query.schema.SchemaExchangeWorkerTask;
+import org.apache.ignite.internal.processors.query.schema.SchemaNodeLeaveExchangeWorkerTask;
+import org.apache.ignite.internal.processors.query.schema.message.SchemaAbstractDiscoveryMessage;
+import org.apache.ignite.internal.processors.query.schema.message.SchemaProposeDiscoveryMessage;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
+import org.apache.ignite.internal.suggestions.GridPerformanceSuggestions;
import org.apache.ignite.internal.util.F0;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.IgniteOutClosureX;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.CIX1;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -153,7 +153,6 @@ import static org.apache.ignite.configuration.DeploymentMode.CONTINUOUS;
import static org.apache.ignite.configuration.DeploymentMode.ISOLATED;
import static org.apache.ignite.configuration.DeploymentMode.PRIVATE;
import static org.apache.ignite.configuration.DeploymentMode.SHARED;
-import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType.CACHE_PROC;
import static org.apache.ignite.internal.IgniteComponentType.JTA;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_CONSISTENCY_CHECK_SKIPPED;
@@ -193,11 +192,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
/** Template configuration add futures. */
private ConcurrentMap<String, IgniteInternalFuture> pendingTemplateFuts = new ConcurrentHashMap<>();
- /** Dynamic caches. */
- private ConcurrentMap<String, DynamicCacheDescriptor> registeredCaches = new ConcurrentHashMap<>();
-
- /** Cache templates. */
- private ConcurrentMap<String, DynamicCacheDescriptor> registeredTemplates = new ConcurrentHashMap<>();
+ /** */
+ private ClusterCachesInfo cachesInfo;
/** */
private IdentityHashMap<CacheStore, ThreadLocal> sesHolders = new IdentityHashMap<>();
@@ -208,12 +204,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
/** Count down latch for caches. */
private final CountDownLatch cacheStartedLatch = new CountDownLatch(1);
- /** */
- private Map<String, DynamicCacheDescriptor> cachesOnDisconnect;
-
- /** */
- private Map<UUID, DynamicCacheChangeBatch> clientReconnectReqs;
-
/** Internal cache names. */
private final Set<String> internalCaches;
@@ -391,16 +381,13 @@ public class GridCacheProcessor extends GridProcessorAdapter {
* @param cc Cache Configuration.
* @return {@code true} if cache is starting on client node and this node is affinity node for the cache.
*/
- private boolean storesLocallyOnClient(IgniteConfiguration c,
- CacheConfiguration cc) {
+ private boolean storesLocallyOnClient(IgniteConfiguration c, CacheConfiguration cc) {
if (c.isClientMode() && c.getMemoryConfiguration() == null) {
if (cc.getCacheMode() == LOCAL)
return true;
- if (ctx.discovery().cacheAffinityNode(ctx.discovery().localNode(), cc.getName()))
- return true;
+ return ctx.discovery().cacheAffinityNode(ctx.discovery().localNode(), cc.getName());
- return false;
}
else
return false;
@@ -627,6 +614,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
/** {@inheritDoc} */
@SuppressWarnings({"unchecked"})
@Override public void start(boolean activeOnStart) throws IgniteCheckedException {
+ cachesInfo = new ClusterCachesInfo(ctx);
+
DeploymentMode depMode = ctx.config().getDeploymentMode();
if (!F.isEmpty(ctx.config().getCacheConfiguration())) {
@@ -647,72 +636,42 @@ public class GridCacheProcessor extends GridProcessorAdapter {
for (GridCacheSharedManager mgr : sharedCtx.managers())
mgr.start(sharedCtx);
- //if inActivate on start then skip registrate caches
- if (!activeOnStart)
- return;
-
- CacheConfiguration[] cfgs = ctx.config().getCacheConfiguration();
-
- registerCacheFromConfig(cfgs);
+ if (activeOnStart && !ctx.config().isDaemon()) {
+ Map<String, CacheJoinNodeDiscoveryData.CacheInfo> caches = new HashMap<>();
- registerCacheFromPersistentStore(cfgs);
-
- if (log.isDebugEnabled())
- log.debug("Started cache processor.");
- }
+ Map<String, CacheJoinNodeDiscoveryData.CacheInfo> templates = new HashMap<>();
- /**
- * @param cfgs Cache configurations.
- * @throws IgniteCheckedException If failed.
- */
- private void registerCacheFromConfig(CacheConfiguration[] cfgs) throws IgniteCheckedException {
- for (int i = 0; i < cfgs.length; i++) {
- if (ctx.config().isDaemon())
- continue;
+ addCacheOnJoinFromConfig(caches, templates);
- CacheConfiguration<?, ?> cfg = new CacheConfiguration(cfgs[i]);
+ addCacheOnJoinFromPersistentStore(caches, templates);
- cfgs[i] = cfg; // Replace original configuration value.
+ CacheJoinNodeDiscoveryData discoData = new CacheJoinNodeDiscoveryData(IgniteUuid.randomUuid(),
+ caches,
+ templates,
+ startAllCachesOnClientStart());
- registerCache(cfg);
+ cachesInfo.onStart(discoData);
}
- }
-
- /**
- * @param cfgs Cache configurations.
- * @throws IgniteCheckedException If failed.
- */
- private void registerCacheFromPersistentStore(CacheConfiguration[] cfgs) throws IgniteCheckedException {
- if (sharedCtx.pageStore() != null &&
- sharedCtx.database().persistenceEnabled() &&
- !ctx.config().isDaemon()) {
-
- Set<String> savedCacheNames = sharedCtx.pageStore().savedCacheNames();
-
- for (CacheConfiguration cfg : cfgs)
- savedCacheNames.remove(cfg.getName());
-
- for (String name : internalCaches)
- savedCacheNames.remove(name);
-
- if (!F.isEmpty(savedCacheNames)) {
- log.info("Registrate persistent caches: " + savedCacheNames);
-
- for (String name : savedCacheNames) {
- CacheConfiguration cfg = sharedCtx.pageStore().readConfiguration(name);
-
- if (cfg != null)
- registerCache(cfg);
- }
- }
+ else {
+ cachesInfo.onStart(new CacheJoinNodeDiscoveryData(IgniteUuid.randomUuid(),
+ Collections.<String, CacheJoinNodeDiscoveryData.CacheInfo>emptyMap(),
+ Collections.<String, CacheJoinNodeDiscoveryData.CacheInfo>emptyMap(),
+ false));
}
+
+ if (log.isDebugEnabled())
+ log.debug("Started cache processor.");
}
/**
* @param cfg Cache configuration.
+ * @param caches Caches map.
+ * @param templates Templates map.
* @throws IgniteCheckedException If failed.
*/
- private void registerCache(CacheConfiguration<?, ?> cfg) throws IgniteCheckedException {
+ private void addCacheOnJoin(CacheConfiguration cfg,
+ Map<String, CacheJoinNodeDiscoveryData.CacheInfo> caches,
+ Map<String, CacheJoinNodeDiscoveryData.CacheInfo> templates) throws IgniteCheckedException {
CU.validateCacheName(cfg.getName());
cloneCheckSerializable(cfg);
@@ -722,75 +681,88 @@ public class GridCacheProcessor extends GridProcessorAdapter {
// Initialize defaults.
initialize(cfg, cacheObjCtx);
- String cacheName = cfg.getName();
+ boolean template = cfg.getName().endsWith("*");
- if (cacheDescriptor(cfg.getName()) != null) {
- throw new IgniteCheckedException("Duplicate cache name found (check configuration and " +
- "assign unique name to each cache): " + cacheName);
- }
+ if (!template) {
+ if (caches.containsKey(cfg.getName())) {
+ throw new IgniteCheckedException("Duplicate cache name found (check configuration and " +
+ "assign unique name to each cache): " + cfg.getName());
+ }
- CacheType cacheType;
+ CacheType cacheType;
- if (CU.isUtilityCache(cfg.getName()))
+ if (CU.isUtilityCache(cfg.getName()))
cacheType = CacheType.UTILITY;
else if (internalCaches.contains(cfg.getName()))
cacheType = CacheType.INTERNAL;
else
cacheType = CacheType.USER;
- if (cacheType != CacheType.USER && cfg.getMemoryPolicyName() == null)
- cfg.setMemoryPolicyName(sharedCtx.database().systemMemoryPolicyName());
+ if (cacheType != CacheType.USER && cfg.getMemoryPolicyName() == null)
+ cfg.setMemoryPolicyName(sharedCtx.database().systemMemoryPolicyName());
- boolean template = cfg.getName() != null && cfg.getName().endsWith("*");
+ if (!cacheType.userCache())
+ stopSeq.addLast(cfg.getName());
+ else
+ stopSeq.addFirst(cfg.getName());
- DynamicCacheDescriptor desc = new DynamicCacheDescriptor(ctx,
- cfg,
- cacheType,
- template,
- IgniteUuid.randomUuid(),
- new QuerySchema(cfg.getQueryEntities()));
+ caches.put(cfg.getName(), new CacheJoinNodeDiscoveryData.CacheInfo(cfg, cacheType, (byte)0));
+ }
+ else
+ templates.put(cfg.getName(), new CacheJoinNodeDiscoveryData.CacheInfo(cfg, CacheType.USER, (byte)0));
+ }
- desc.locallyConfigured(true);
- desc.staticallyConfigured(true);
- desc.receivedFrom(ctx.localNodeId());
+ /**
+ * @param caches Caches map.
+ * @param templates Templates map.
+ * @throws IgniteCheckedException If failed.
+ */
+ private void addCacheOnJoinFromConfig(
+ Map<String, CacheJoinNodeDiscoveryData.CacheInfo> caches,
+ Map<String, CacheJoinNodeDiscoveryData.CacheInfo> templates
+ ) throws IgniteCheckedException {
+ assert !ctx.config().isDaemon();
- if (!template) {
- cacheDescriptor(cfg.getName(), desc);
+ CacheConfiguration[] cfgs = ctx.config().getCacheConfiguration();
- ctx.discovery().setCacheFilter(
- cfg.getName(),
- cfg.getNodeFilter(),
- cfg.getNearConfiguration() != null && cfg.getCacheMode() == PARTITIONED,
- cfg.getCacheMode());
+ for (int i = 0; i < cfgs.length; i++) {
+ CacheConfiguration<?, ?> cfg = new CacheConfiguration(cfgs[i]);
- ctx.discovery().addClientNode(cfg.getName(),
- ctx.localNodeId(),
- cfg.getNearConfiguration() != null);
+ cfgs[i] = cfg; // Replace original configuration value.
- if (!cacheType.userCache())
- stopSeq.addLast(cfg.getName());
- else
- stopSeq.addFirst(cfg.getName());
+ addCacheOnJoin(cfg, caches, templates);
}
- else {
- if (log.isDebugEnabled())
- log.debug("Use cache configuration as template: " + cfg);
+ }
- registeredTemplates.put(cacheName, desc);
- }
+ /**
+ * @param caches Caches map.
+ * @param templates Templates map.
+ * @throws IgniteCheckedException If failed.
+ */
+ private void addCacheOnJoinFromPersistentStore(
+ Map<String, CacheJoinNodeDiscoveryData.CacheInfo> caches,
+ Map<String, CacheJoinNodeDiscoveryData.CacheInfo> templates
+ ) throws IgniteCheckedException {
+ assert !ctx.config().isDaemon();
- if (cfg.getName() == null) { // Use cache configuration with null name as template.
- DynamicCacheDescriptor desc0 = new DynamicCacheDescriptor(ctx,
- cfg,
- cacheType,
- true,
- IgniteUuid.randomUuid(),
- new QuerySchema(cfg.getQueryEntities()));
+ if (sharedCtx.pageStore() != null && sharedCtx.database().persistenceEnabled()) {
+ Set<String> savedCacheNames = sharedCtx.pageStore().savedCacheNames();
+
+ savedCacheNames.removeAll(caches.keySet());
+
+ savedCacheNames.removeAll(internalCaches);
- desc0.locallyConfigured(true);
- desc0.staticallyConfigured(true);
+ if (!F.isEmpty(savedCacheNames)) {
+ if (log.isInfoEnabled())
+ log.info("Register persistent caches: " + savedCacheNames);
+
+ for (String name : savedCacheNames) {
+ CacheConfiguration cfg = sharedCtx.pageStore().readConfiguration(name);
- registeredTemplates.put(cacheName, desc0);
+ if (cfg != null)
+ addCacheOnJoin(cfg, caches, templates);
+ }
+ }
}
}
@@ -819,7 +791,13 @@ public class GridCacheProcessor extends GridProcessorAdapter {
ClusterNode locNode = ctx.discovery().localNode();
try {
- checkConsistency();
+ boolean checkConsistency =
+ !ctx.config().isDaemon() && !getBoolean(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK);
+
+ if (checkConsistency)
+ checkConsistency();
+
+ cachesInfo.onKernalStart(checkConsistency);
boolean currStatus = ctx.state().active();
@@ -865,88 +843,47 @@ public class GridCacheProcessor extends GridProcessorAdapter {
ctx.query().onCacheKernalStart();
- // Start dynamic caches received from collect discovery data.
- for (DynamicCacheDescriptor desc : cacheDescriptors()) {
- if (ctx.config().isDaemon())
- continue;
-
- desc.clearRemoteConfigurations();
-
- CacheConfiguration ccfg = desc.cacheConfiguration();
-
- IgnitePredicate filter = ccfg.getNodeFilter();
-
- boolean loc = desc.locallyConfigured();
-
- if (loc || (desc.receivedOnDiscovery() &&
- (startAllCachesOnClientStart() || CU.affinityNode(locNode, filter)))) {
- boolean started = desc.onStart();
-
- assert started : "Failed to change started flag for locally configured cache: " + desc;
-
- CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(ccfg);
-
- CachePluginManager pluginMgr = desc.pluginManager();
-
- GridCacheContext ctx = createCache(
- ccfg, pluginMgr, desc.cacheType(), cacheObjCtx, desc.updatesAllowed());
-
- ctx.dynamicDeploymentId(desc.deploymentId());
-
- sharedCtx.addCacheContext(ctx);
-
- GridCacheAdapter cache = ctx.cache();
-
- String name = ccfg.getName();
-
- caches.put(name, cache);
-
- startCache(cache, desc.schema());
-
- jCacheProxies.put(name, new IgniteCacheProxy(ctx, cache, null, false));
- }
+ for (GridCacheSharedManager<?, ?> mgr : sharedCtx.managers()) {
+ if (sharedCtx.database() != mgr)
+ mgr.onKernalStart(false);
}
}
finally {
cacheStartedLatch.countDown();
}
- // Must call onKernalStart on shared managers after creation of fetched caches.
- for (GridCacheSharedManager<?, ?> mgr : sharedCtx.managers())
- if (sharedCtx.database() != mgr)
- mgr.onKernalStart(false);
-
// Escape if start active on start false
if (!activeOnStart)
return;
- for (GridCacheAdapter<?, ?> cache : caches.values())
- onKernalStart(cache);
-
if (!ctx.config().isDaemon())
ctx.cacheObjects().onUtilityCacheStarted();
ctx.service().onUtilityCacheStarted();
- // Wait for caches in SYNC preload mode.
- for (DynamicCacheDescriptor desc : cacheDescriptors()) {
- CacheConfiguration cfg = desc.cacheConfiguration();
+ final AffinityTopologyVersion startTopVer =
+ new AffinityTopologyVersion(ctx.discovery().localJoinEvent().topologyVersion(), 0);
- IgnitePredicate filter = cfg.getNodeFilter();
+ final List<IgniteInternalFuture> syncFuts = new ArrayList<>(caches.size());
- if (desc.locallyConfigured() || (desc.receivedOnDiscovery() && CU.affinityNode(locNode, filter))) {
- GridCacheAdapter cache = caches.get(cfg.getName());
+ sharedCtx.forAllCaches(new CIX1<GridCacheContext>() {
+ @Override public void applyx(GridCacheContext cctx) throws IgniteCheckedException {
+ CacheConfiguration cfg = cctx.config();
- if (cache != null) {
- if (cfg.getRebalanceMode() == SYNC) {
- CacheMode cacheMode = cfg.getCacheMode();
+ if (cctx.affinityNode() &&
+ cfg.getRebalanceMode() == SYNC &&
+ startTopVer.equals(cctx.startTopologyVersion())) {
+ CacheMode cacheMode = cfg.getCacheMode();
- if (cacheMode == REPLICATED || (cacheMode == PARTITIONED && cfg.getRebalanceDelay() >= 0))
- cache.preloader().syncFuture().get();
- }
+ if (cacheMode == REPLICATED || (cacheMode == PARTITIONED && cfg.getRebalanceDelay() >= 0))
+ // Need to wait outside to avoid a deadlock
+ syncFuts.add(cctx.preloader().syncFuture());
}
}
- }
+ });
+
+ for (int i = 0, size = syncFuts.size(); i < size; i++)
+ syncFuts.get(i).get();
assert ctx.config().isDaemon() || caches.containsKey(CU.UTILITY_CACHE_NAME) : "Utility cache should be started";
@@ -962,38 +899,20 @@ public class GridCacheProcessor extends GridProcessorAdapter {
}
/**
- *
+ * @throws IgniteCheckedException if check failed.
*/
private void checkConsistency() throws IgniteCheckedException {
- if (!ctx.config().isDaemon() && !getBoolean(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK)) {
- for (ClusterNode n : ctx.discovery().remoteNodes()) {
- if (n.attribute(ATTR_CONSISTENCY_CHECK_SKIPPED))
- continue;
-
- checkTransactionConfiguration(n);
-
- DeploymentMode locDepMode = ctx.config().getDeploymentMode();
- DeploymentMode rmtDepMode = n.attribute(IgniteNodeAttributes.ATTR_DEPLOYMENT_MODE);
-
- CU.checkAttributeMismatch(
- log, null, n.id(), "deploymentMode", "Deployment mode",
- locDepMode, rmtDepMode, true);
-
- for (DynamicCacheDescriptor desc : cacheDescriptors()) {
- CacheConfiguration rmtCfg = desc.remoteConfiguration(n.id());
-
- if (rmtCfg != null) {
- CacheConfiguration locCfg = desc.cacheConfiguration();
+ for (ClusterNode n : ctx.discovery().remoteNodes()) {
+ if (n.attribute(ATTR_CONSISTENCY_CHECK_SKIPPED))
+ continue;
- checkCache(locCfg, rmtCfg, n);
+ checkTransactionConfiguration(n);
- // Check plugin cache configurations.
- CachePluginManager pluginMgr = desc.pluginManager();
+ DeploymentMode locDepMode = ctx.config().getDeploymentMode();
+ DeploymentMode rmtDepMode = n.attribute(IgniteNodeAttributes.ATTR_DEPLOYMENT_MODE);
- pluginMgr.validateRemotes(rmtCfg, n);
- }
- }
- }
+ CU.checkAttributeMismatch(log, null, n.id(), "deploymentMode", "Deployment mode",
+ locDepMode, rmtDepMode, true);
}
}
@@ -1034,7 +953,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
stopCache(cache, cancel, false);
}
- registeredCaches.clear();
+ cachesInfo.clearCaches();
}
/**
@@ -1105,8 +1024,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
/** {@inheritDoc} */
@Override public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException {
- cachesOnDisconnect = new HashMap<>(registeredCaches);
-
IgniteClientDisconnectedCheckedException err = new IgniteClientDisconnectedCheckedException(
ctx.cluster().clientReconnectFuture(),
"Failed to execute dynamic cache change request, client node disconnected.");
@@ -1133,9 +1050,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
sharedCtx.onDisconnected(reconnectFut);
- registeredCaches.clear();
-
- registeredTemplates.clear();
+ cachesInfo.onDisconnect();
}
/** {@inheritDoc} */
@@ -1144,24 +1059,10 @@ public class GridCacheProcessor extends GridProcessorAdapter {
GridCompoundFuture<?, ?> stopFut = null;
- for (final GridCacheAdapter cache : caches.values()) {
- String name = cache.name();
-
- boolean stopped;
-
- boolean sysCache = CU.isUtilityCache(name) || CU.isAtomicsCache(name);
+ Set<String> stoppedCaches = cachesInfo.onReconnected();
- if (!sysCache) {
- DynamicCacheDescriptor oldDesc = cachesOnDisconnect.get(name);
-
- assert oldDesc != null : "No descriptor for cache: " + name;
-
- DynamicCacheDescriptor newDesc = cacheDescriptor(name);
-
- stopped = newDesc == null || !oldDesc.deploymentId().equals(newDesc.deploymentId());
- }
- else
- stopped = false;
+ for (final GridCacheAdapter cache : caches.values()) {
+ boolean stopped = stoppedCaches.contains(cache.name());
if (stopped) {
cache.context().gate().reconnected(true);
@@ -1188,11 +1089,11 @@ public class GridCacheProcessor extends GridProcessorAdapter {
reconnected.add(cache);
- if (!sysCache) {
+ if (cache.context().userCache()) {
// Re-create cache structures inside indexing in order to apply recent schema changes.
GridCacheContext cctx = cache.context();
- DynamicCacheDescriptor desc = cacheDescriptor(name);
+ DynamicCacheDescriptor desc = cacheDescriptor(cctx.name());
assert desc != null;
@@ -1202,20 +1103,11 @@ public class GridCacheProcessor extends GridProcessorAdapter {
}
}
- if (clientReconnectReqs != null) {
- for (Map.Entry<UUID, DynamicCacheChangeBatch> e : clientReconnectReqs.entrySet())
- processClientReconnectData(e.getKey(), e.getValue());
-
- clientReconnectReqs = null;
- }
-
sharedCtx.onReconnected();
for (GridCacheAdapter cache : reconnected)
cache.context().gate().reconnected(false);
- cachesOnDisconnect = null;
-
if (stopFut != null)
stopFut.markInitialized();
@@ -1438,16 +1330,20 @@ public class GridCacheProcessor extends GridProcessorAdapter {
/**
* @param cfg Cache configuration to use to create cache.
* @param pluginMgr Cache plugin manager.
- * @param cacheType Cache type.
+ * @param desc Cache descriptor.
+ * @param locStartTopVer Current topology version.
* @param cacheObjCtx Cache object context.
+ * @param affNode {@code True} if local node affinity node.
* @param updatesAllowed Updates allowed flag.
* @return Cache context.
* @throws IgniteCheckedException If failed to create cache.
*/
private GridCacheContext createCache(CacheConfiguration<?, ?> cfg,
@Nullable CachePluginManager pluginMgr,
- CacheType cacheType,
+ DynamicCacheDescriptor desc,
+ AffinityTopologyVersion locStartTopVer,
CacheObjectContext cacheObjCtx,
+ boolean affNode,
boolean updatesAllowed)
throws IgniteCheckedException {
assert cfg != null;
@@ -1465,7 +1361,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
QueryUtils.prepareCacheConfiguration(cfg);
- validate(ctx.config(), cfg, cacheType, cfgStore);
+ validate(ctx.config(), cfg, desc.cacheType(), cfgStore);
if (pluginMgr == null)
pluginMgr = new CachePluginManager(ctx, cfg);
@@ -1475,7 +1371,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
sharedCtx.jta().registerCache(cfg);
// Skip suggestions for internal caches.
- if (cacheType.userCache())
+ if (desc.cacheType().userCache())
suggestOptimizations(cfg, cfgStore != null);
Collection<Object> toPrepare = new ArrayList<>();
@@ -1508,8 +1404,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
storeMgr.initialize(cfgStore, sesHolders);
- boolean affNode = cfg.getCacheMode() == LOCAL || CU.affinityNode(ctx.discovery().localNode(), cfg.getNodeFilter());
-
String memPlcName = cfg.getMemoryPolicyName();
MemoryPolicy memPlc = sharedCtx.database().memoryPolicy(memPlcName);
@@ -1520,7 +1414,9 @@ public class GridCacheProcessor extends GridProcessorAdapter {
ctx,
sharedCtx,
cfg,
- cacheType,
+ desc.cacheType(),
+ locStartTopVer,
+ desc.receivedFrom(),
affNode,
updatesAllowed,
memPlc,
@@ -1651,7 +1547,9 @@ public class GridCacheProcessor extends GridProcessorAdapter {
ctx,
sharedCtx,
cfg,
- cacheType,
+ desc.cacheType(),
+ locStartTopVer,
+ desc.receivedFrom(),
affNode,
true,
memPlc,
@@ -1738,17 +1636,11 @@ public class GridCacheProcessor extends GridProcessorAdapter {
* @return Collection of started cache names.
*/
public Collection<String> cacheNames() {
- return F.viewReadOnly(cacheDescriptors(),
- new IgniteClosure<DynamicCacheDescriptor, String>() {
- @Override public String apply(DynamicCacheDescriptor desc) {
- return desc.cacheConfiguration().getName();
- }
- },
- new IgnitePredicate<DynamicCacheDescriptor>() {
- @Override public boolean apply(DynamicCacheDescriptor desc) {
- return desc.started();
- }
- });
+ return F.viewReadOnly(cacheDescriptors(), new IgniteClosure<DynamicCacheDescriptor, String>() {
+ @Override public String apply(DynamicCacheDescriptor desc) {
+ return desc.cacheConfiguration().getName();
+ }
+ });
}
/**
@@ -1771,7 +1663,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
}
if (start) {
- for (Map.Entry<String, DynamicCacheDescriptor> e : registeredCaches.entrySet()) {
+ for (Map.Entry<String, DynamicCacheDescriptor> e : cachesInfo.registeredCaches().entrySet()) {
DynamicCacheDescriptor desc = e.getValue();
CacheConfiguration ccfg = desc.cacheConfiguration();
@@ -1822,140 +1714,138 @@ public class GridCacheProcessor extends GridProcessorAdapter {
}
/**
- * @param req Cache start request.
- * @param topVer Topology version.
+ * @param cacheDesc Cache start request.
+ * @param nearCfg Near cache configuration.
+ * @param exchTopVer Current exchange version.
* @throws IgniteCheckedException If failed.
*/
- public void prepareCacheStart(DynamicCacheChangeRequest req, AffinityTopologyVersion topVer)
+ void prepareCacheStart(DynamicCacheDescriptor cacheDesc,
+ @Nullable NearCacheConfiguration nearCfg,
+ AffinityTopologyVersion exchTopVer)
throws IgniteCheckedException {
- assert req.start() : req;
- assert req.cacheType() != null : req;
+ prepareCacheStart(
+ cacheDesc.cacheConfiguration(),
+ nearCfg,
+ cacheDesc,
+ exchTopVer,
+ cacheDesc.schema()
+ );
+ }
- DynamicCacheDescriptor desc = cacheDescriptor(req.cacheName());
+ /**
+ * @param exchTopVer Current exchange version.
+ * @throws IgniteCheckedException If failed.
+ */
+ public void startCachesOnLocalJoin(AffinityTopologyVersion exchTopVer) throws IgniteCheckedException {
+ List<T2<DynamicCacheDescriptor, NearCacheConfiguration>> caches = cachesInfo.cachesToStartOnLocalJoin();
- if (desc != null)
- desc.onStart();
+ if (!F.isEmpty(caches)) {
+ for (T2<DynamicCacheDescriptor, NearCacheConfiguration> t : caches) {
+ DynamicCacheDescriptor desc = t.get1();
- prepareCacheStart(
- req.startCacheConfiguration(),
- req.nearCacheConfiguration(),
- req.cacheType(),
- req.clientStartOnly(),
- req.initiatingNodeId(),
- req.deploymentId(),
- topVer,
- desc != null ? desc.schema() : null
- );
+ prepareCacheStart(
+ desc.cacheConfiguration(),
+ t.get2(),
+ desc,
+ exchTopVer,
+ desc.schema()
+ );
+ }
+ }
}
/**
* Starts statically configured caches received from remote nodes during exchange.
*
- * @param topVer Topology version.
+ * @param nodeId Joining node ID.
+ * @param exchTopVer Current exchange version.
* @return Started caches descriptors.
* @throws IgniteCheckedException If failed.
*/
- public Collection<DynamicCacheDescriptor> startReceivedCaches(AffinityTopologyVersion topVer)
+ public Collection<DynamicCacheDescriptor> startReceivedCaches(UUID nodeId, AffinityTopologyVersion exchTopVer)
throws IgniteCheckedException {
- List<DynamicCacheDescriptor> started = null;
-
- for (DynamicCacheDescriptor desc : cacheDescriptors()) {
- if (!desc.started() && desc.staticallyConfigured() && !desc.locallyConfigured()) {
- if (desc.receivedFrom() != null) {
- AffinityTopologyVersion startVer = desc.receivedFromStartVersion();
+ List<DynamicCacheDescriptor> started = cachesInfo.cachesReceivedFromJoin(nodeId);
- if (startVer == null || startVer.compareTo(topVer) > 0)
- continue;
- }
-
- if (desc.onStart()) {
- if (started == null)
- started = new ArrayList<>();
-
- started.add(desc);
+ if (started != null) {
+ for (DynamicCacheDescriptor desc : started) {
+ IgnitePredicate<ClusterNode> filter = desc.cacheConfiguration().getNodeFilter();
+ if (CU.affinityNode(ctx.discovery().localNode(), filter)) {
prepareCacheStart(
desc.cacheConfiguration(),
null,
- desc.cacheType(),
- false,
- null,
- desc.deploymentId(),
- topVer,
+ desc,
+ exchTopVer,
desc.schema()
);
}
}
}
- return started;
+ return started != null ? started : Collections.<DynamicCacheDescriptor>emptyList();
}
/**
- * @param cfg Start configuration.
- * @param nearCfg Near configuration.
- * @param cacheType Cache type.
- * @param clientStartOnly Client only start request.
- * @param initiatingNodeId Initiating node ID.
- * @param deploymentId Deployment ID.
- * @param topVer Topology version.
+ * @param startCfg Start configuration.
+ * @param reqNearCfg Near configuration if specified for client cache start request.
+ * @param desc Cache descriptor.
+ * @param exchTopVer Current exchange version.
* @param schema Query schema.
* @throws IgniteCheckedException If failed.
*/
private void prepareCacheStart(
- CacheConfiguration cfg,
- NearCacheConfiguration nearCfg,
- CacheType cacheType,
- boolean clientStartOnly,
- UUID initiatingNodeId,
- IgniteUuid deploymentId,
- AffinityTopologyVersion topVer,
+ CacheConfiguration startCfg,
+ @Nullable NearCacheConfiguration reqNearCfg,
+ DynamicCacheDescriptor desc,
+ AffinityTopologyVersion exchTopVer,
@Nullable QuerySchema schema
) throws IgniteCheckedException {
- CacheConfiguration ccfg = new CacheConfiguration(cfg);
-
- IgnitePredicate nodeFilter = ccfg.getNodeFilter();
+ assert !caches.containsKey(startCfg.getName()) : startCfg.getName();
- ClusterNode locNode = ctx.discovery().localNode();
+ CacheConfiguration ccfg = new CacheConfiguration(startCfg);
- boolean affNodeStart = !clientStartOnly && CU.affinityNode(locNode, nodeFilter);
- boolean clientNodeStart = locNode.id().equals(initiatingNodeId);
+ CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(ccfg);
- if (sharedCtx.cacheContext(CU.cacheId(cfg.getName())) != null)
- return;
+ boolean affNode;
- if (affNodeStart || clientNodeStart || CU.isSystemCache(cfg.getName())) {
- if (clientNodeStart && !affNodeStart) {
- if (nearCfg != null)
- ccfg.setNearConfiguration(nearCfg);
- else
- ccfg.setNearConfiguration(null);
- }
+ if (ccfg.getCacheMode() == LOCAL) {
+ affNode = true;
- CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(ccfg);
+ ccfg.setNearConfiguration(null);
+ }
+ else if (CU.affinityNode(ctx.discovery().localNode(), ccfg.getNodeFilter()))
+ affNode = true;
+ else {
+ affNode = false;
- GridCacheContext cacheCtx = createCache(ccfg, null, cacheType, cacheObjCtx, true);
+ ccfg.setNearConfiguration(reqNearCfg);
+ }
- cacheCtx.startTopologyVersion(topVer);
+ GridCacheContext cacheCtx = createCache(ccfg,
+ null,
+ desc,
+ exchTopVer,
+ cacheObjCtx,
+ affNode,
+ true);
- cacheCtx.dynamicDeploymentId(deploymentId);
+ cacheCtx.dynamicDeploymentId(desc.deploymentId());
- GridCacheAdapter cache = cacheCtx.cache();
+ GridCacheAdapter cache = cacheCtx.cache();
- sharedCtx.addCacheContext(cacheCtx);
+ sharedCtx.addCacheContext(cacheCtx);
- caches.put(cacheCtx.name(), cache);
+ caches.put(cacheCtx.name(), cache);
- startCache(cache, schema != null ? schema : new QuerySchema());
+ startCache(cache, schema != null ? schema : new QuerySchema());
- onKernalStart(cache);
- }
+ onKernalStart(cache);
}
/**
* @param req Stop request.
*/
- public void blockGateway(DynamicCacheChangeRequest req) {
+ void blockGateway(DynamicCacheChangeRequest req) {
assert req.stop() || req.close();
if (req.stop() || (req.close() && req.initiatingNodeId().equals(ctx.localNodeId()))) {
@@ -1997,9 +1887,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
sharedCtx.removeCacheContext(ctx);
- assert req.deploymentId().equals(ctx.dynamicDeploymentId()) : "Different deployment IDs [req=" + req +
- ", ctxDepId=" + ctx.dynamicDeploymentId() + ']';
-
onKernalStop(cache, req.destroy());
stopCache(cache, true, req.destroy());
@@ -2010,52 +1897,52 @@ public class GridCacheProcessor extends GridProcessorAdapter {
* Callback invoked when first exchange future for dynamic cache is completed.
*
* @param topVer Completed topology version.
- * @param reqs Change requests.
+ * @param exchActions Change requests.
* @param err Error.
*/
@SuppressWarnings("unchecked")
public void onExchangeDone(
AffinityTopologyVersion topVer,
- Collection<DynamicCacheChangeRequest> reqs,
+ @Nullable ExchangeActions exchActions,
Throwable err
) {
for (GridCacheAdapter<?, ?> cache : caches.values()) {
GridCacheContext<?, ?> cacheCtx = cache.context();
- if (F.eq(cacheCtx.startTopologyVersion(), topVer)) {
+ if (cacheCtx.startTopologyVersion().equals(topVer)) {
+ jCacheProxies.put(cacheCtx.name(), new IgniteCacheProxy(cache.context(), cache, null, false));
+
if (cacheCtx.preloader() != null)
cacheCtx.preloader().onInitialExchangeComplete(err);
+ }
+ }
- String masked = cacheCtx.name();
+ if (exchActions != null && err == null) {
+ for (ExchangeActions.ActionData action : exchActions.cacheStopRequests()) {
+ stopGateway(action.request());
- jCacheProxies.put(masked, new IgniteCacheProxy(cache.context(), cache, null, false));
+ prepareCacheStop(action.request());
}
- }
- if (!F.isEmpty(reqs) && err == null) {
- for (DynamicCacheChangeRequest req : reqs) {
- String masked = req.cacheName();
+ for (DynamicCacheChangeRequest req : exchActions.closeRequests(ctx.localNodeId())) {
+ String cacheName = req.cacheName();
- if (req.stop()) {
- stopGateway(req);
+ IgniteCacheProxy<?, ?> proxy = jCacheProxies.get(cacheName);
- prepareCacheStop(req);
- }
- else if (req.close() && req.initiatingNodeId().equals(ctx.localNodeId())) {
- IgniteCacheProxy<?, ?> proxy = jCacheProxies.remove(masked);
+ if (proxy != null) {
+ if (proxy.context().affinityNode()) {
+ GridCacheAdapter<?, ?> cache = caches.get(cacheName);
- if (proxy != null) {
- if (proxy.context().affinityNode()) {
- GridCacheAdapter<?, ?> cache = caches.get(masked);
+ assert cache != null : cacheName;
- if (cache != null)
- jCacheProxies.put(masked, new IgniteCacheProxy(cache.context(), cache, null, false));
- }
- else {
- proxy.context().gate().onStopped();
+ jCacheProxies.put(cacheName, new IgniteCacheProxy(cache.context(), cache, null, false));
+ }
+ else {
+ jCacheProxies.remove(cacheName);
- prepareCacheStop(req);
- }
+ proxy.context().gate().onStopped();
+
+ prepareCacheStop(req);
}
}
}
@@ -2063,20 +1950,31 @@ public class GridCacheProcessor extends GridProcessorAdapter {
}
/**
- * @param req Request to complete future for.
+ * @param cacheName Cache name.
+ * @param deploymentId Future deployment ID.
*/
- public void completeStartFuture(DynamicCacheChangeRequest req) {
- DynamicCacheStartFuture fut = (DynamicCacheStartFuture)pendingFuts.get(req.requestId());
-
- assert req.deploymentId() != null || req.globalStateChange() || req.resetLostPartitions();
- assert fut == null || fut.deploymentId != null || req.globalStateChange() || req.resetLostPartitions();
+ void completeTemplateAddFuture(String cacheName, IgniteUuid deploymentId) {
+ GridCacheProcessor.TemplateConfigurationFuture fut =
+ (GridCacheProcessor.TemplateConfigurationFuture)pendingTemplateFuts.get(cacheName);
- if (fut != null && F.eq(fut.deploymentId(), req.deploymentId()) &&
- F.eq(req.initiatingNodeId(), ctx.localNodeId()))
+ if (fut != null && fut.deploymentId().equals(deploymentId))
fut.onDone();
}
/**
+ * @param req Request to complete future for.
+ * @param err Error if any.
+ */
+ void completeCacheStartFuture(DynamicCacheChangeRequest req, @Nullable Exception err) {
+ if (req.initiatingNodeId().equals(ctx.localNodeId())) {
+ DynamicCacheStartFuture fut = (DynamicCacheStartFuture)pendingFuts.get(req.requestId());
+
+ if (fut != null)
+ fut.onDone(null, err);
+ }
+ }
+
+ /**
* Creates shared context.
*
* @param kernalCtx Kernal context.
@@ -2132,322 +2030,40 @@ public class GridCacheProcessor extends GridProcessorAdapter {
/** {@inheritDoc} */
@Override public void collectJoiningNodeData(DiscoveryDataBag dataBag) {
- dataBag.addJoiningNodeData(CACHE_PROC.ordinal(), getDiscoveryData(dataBag.joiningNodeId()));
+ cachesInfo.collectJoiningNodeData(dataBag);
}
/** {@inheritDoc} */
@Override public void collectGridNodeData(DiscoveryDataBag dataBag) {
- dataBag.addNodeSpecificData(CACHE_PROC.ordinal(), getDiscoveryData(dataBag.joiningNodeId()));
+ cachesInfo.collectGridNodeData(dataBag);
}
/**
- * @param joiningNodeId Joining node id.
+ * @return {@code True} if need locally start all existing caches on client node start.
*/
- private Serializable getDiscoveryData(UUID joiningNodeId) {
- boolean reconnect = ctx.localNodeId().equals(joiningNodeId) && cachesOnDisconnect != null;
+ private boolean startAllCachesOnClientStart() {
+ return START_CLIENT_CACHES && ctx.clientNode();
+ }
- // Collect dynamically started caches to a single object.
- Collection<DynamicCacheChangeRequest> reqs;
+ /** {@inheritDoc} */
+ @Override public void onJoiningNodeDataReceived(JoiningNodeDiscoveryData data) {
+ cachesInfo.onJoiningNodeDataReceived(data);
+ }
- Map<String, Map<UUID, Boolean>> clientNodesMap;
+ /** {@inheritDoc} */
+ @Override public void onGridDataReceived(GridDiscoveryData data) {
+ cachesInfo.onGridDataReceived(data);
+ }
- if (reconnect) {
- reqs = new ArrayList<>(caches.size() + 1);
-
- clientNodesMap = U.newHashMap(caches.size());
-
- collectDataOnReconnectingNode(reqs, clientNodesMap, joiningNodeId);
- }
- else {
- reqs = new ArrayList<>(registeredCaches.size() + registeredTemplates.size() + 1);
-
- clientNodesMap = ctx.discovery().clientNodesMap();
-
- collectDataOnGridNode(reqs);
- }
-
- DynamicCacheChangeBatch batch = new DynamicCacheChangeBatch(reqs);
-
- batch.clientNodes(clientNodesMap);
-
- batch.clientReconnect(reconnect);
-
- if (ctx.localNodeId().equals(joiningNodeId))
- batch.startCaches(startAllCachesOnClientStart());
-
- // Reset random batch ID so that serialized batches with the same descriptors will be exactly the same.
- batch.id(null);
-
- return batch;
- }
-
- /**
- * @param reqs requests.
- */
- private void collectDataOnGridNode(Collection<DynamicCacheChangeRequest> reqs) {
- for (DynamicCacheDescriptor desc : cacheDescriptors()) {
- // RequestId must be null because on different node will be different byte [] and
- // we get duplicate discovery data, for more details see
- // TcpDiscoveryNodeAddedMessage#addDiscoveryData.
- DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(null, desc.cacheConfiguration().getName(),
- null);
-
- req.startCacheConfiguration(desc.cacheConfiguration());
- req.cacheType(desc.cacheType());
- req.deploymentId(desc.deploymentId());
- req.receivedFrom(desc.receivedFrom());
- req.schema(desc.schema());
-
- reqs.add(req);
- }
-
- for (DynamicCacheDescriptor desc : registeredTemplates.values()) {
- // RequestId must be null because on different node will be different byte [] and
- // we get duplicate discovery data, for more details see
- // TcpDiscoveryNodeAddedMessage#addDiscoveryData.
- DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(null, desc.cacheConfiguration().getName(),
- null);
-
- req.startCacheConfiguration(desc.cacheConfiguration());
- req.schema(desc.schema());
-
- req.template(true);
-
- reqs.add(req);
- }
- }
-
- /**
- * @param reqs requests.
- * @param clientNodesMap Client nodes map.
- * @param nodeId Node id.
- */
- private void collectDataOnReconnectingNode(
- Collection<DynamicCacheChangeRequest> reqs,
- Map<String, Map<UUID, Boolean>> clientNodesMap,
- UUID nodeId
- ) {
- for (GridCacheAdapter<?, ?> cache : caches.values()) {
- DynamicCacheDescriptor desc = cachesOnDisconnect.get(cache.name());
-
- if (desc == null)
- continue;
-
- DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(null, cache.name(), null);
-
- req.startCacheConfiguration(desc.cacheConfiguration());
- req.cacheType(desc.cacheType());
- req.deploymentId(desc.deploymentId());
- req.receivedFrom(desc.receivedFrom());
- req.schema(desc.schema());
-
- reqs.add(req);
-
- Boolean nearEnabled = cache.isNear();
-
- Map<UUID, Boolean> map = U.newHashMap(1);
-
- map.put(nodeId, nearEnabled);
-
- clientNodesMap.put(cache.name(), map);
- }
- }
-
- /**
- * @return {@code True} if need locally start all existing caches on client node start.
- */
- private boolean startAllCachesOnClientStart() {
- return START_CLIENT_CACHES && ctx.clientNode();
- }
-
- /** {@inheritDoc} */
- @Override public void onJoiningNodeDataReceived(JoiningNodeDiscoveryData data) {
- if (data.hasJoiningNodeData()) {
- Serializable joiningNodeData = data.joiningNodeData();
- if (joiningNodeData instanceof DynamicCacheChangeBatch)
- onDiscoDataReceived(
- data.joiningNodeId(),
- data.joiningNodeId(),
- (DynamicCacheChangeBatch) joiningNodeData, true);
- }
- }
-
- /** {@inheritDoc} */
- @Override public void onGridDataReceived(GridDiscoveryData data) {
- Map<UUID, Serializable> nodeSpecData = data.nodeSpecificData();
-
- if (nodeSpecData != null) {
- for (Map.Entry<UUID, Serializable> e : nodeSpecData.entrySet()) {
- if (e.getValue() != null && e.getValue() instanceof DynamicCacheChangeBatch) {
- DynamicCacheChangeBatch batch = (DynamicCacheChangeBatch) e.getValue();
-
- onDiscoDataReceived(data.joiningNodeId(), e.getKey(), batch, false);
- }
- }
- }
- }
-
- /**
- * @param joiningNodeId Joining node id.
- * @param rmtNodeId Rmt node id.
- * @param batch Batch.
- * @param join Whether this is data from joining node.
- */
- private void onDiscoDataReceived(UUID joiningNodeId, UUID rmtNodeId, DynamicCacheChangeBatch batch, boolean join) {
- if (batch.clientReconnect()) {
- if (ctx.clientDisconnected()) {
- if (clientReconnectReqs == null)
- clientReconnectReqs = new LinkedHashMap<>();
-
- clientReconnectReqs.put(joiningNodeId, batch);
-
- return;
- }
-
- processClientReconnectData(joiningNodeId, batch);
- }
- else {
- for (DynamicCacheChangeRequest req : batch.requests()) {
- initReceivedCacheConfiguration(req);
-
- if (req.template()) {
- CacheConfiguration ccfg = req.startCacheConfiguration();
-
- assert ccfg != null : req;
-
- DynamicCacheDescriptor existing = registeredTemplates.get(req.cacheName());
-
- if (existing == null) {
- DynamicCacheDescriptor desc = new DynamicCacheDescriptor(
- ctx,
- ccfg,
- req.cacheType(),
- true,
- req.deploymentId(),
- req.schema());
-
- registeredTemplates.put(req.cacheName(), desc);
- }
-
- continue;
- }
-
- DynamicCacheDescriptor existing = cacheDescriptor(req.cacheName());
-
- if (req.start() && !req.clientStartOnly()) {
- CacheConfiguration ccfg = req.startCacheConfiguration();
-
- if (existing != null) {
- if (joiningNodeId.equals(ctx.localNodeId())) {
- existing.receivedFrom(req.receivedFrom());
- existing.deploymentId(req.deploymentId());
- }
-
- if (existing.locallyConfigured()) {
- existing.addRemoteConfiguration(rmtNodeId, req.startCacheConfiguration());
-
- if (!join)
- // Overwrite existing with remote.
- existing.schema(req.schema());
-
- ctx.discovery().setCacheFilter(
- req.cacheName(),
- ccfg.getNodeFilter(),
- ccfg.getNearConfiguration() != null,
- ccfg.getCacheMode());
- }
- }
- else {
- assert req.cacheType() != null : req;
-
- DynamicCacheDescriptor desc = new DynamicCacheDescriptor(
- ctx,
- ccfg,
- req.cacheType(),
- false,
- req.deploymentId(),
- req.schema());
-
- // Received statically configured cache.
- if (req.initiatingNodeId() == null)
- desc.staticallyConfigured(true);
-
- if (joiningNodeId.equals(ctx.localNodeId()))
- desc.receivedOnDiscovery(true);
-
- desc.receivedFrom(req.receivedFrom());
-
- DynamicCacheDescriptor old = cacheDescriptor(req.cacheName(), desc);
-
- assert old == null : old;
-
- ctx.discovery().setCacheFilter(
- req.cacheName(),
- ccfg.getNodeFilter(),
- ccfg.getNearConfiguration() != null,
- ccfg.getCacheMode());
- }
- }
- }
-
- if (!F.isEmpty(batch.clientNodes())) {
- for (Map.Entry<String, Map<UUID, Boolean>> entry : batch.clientNodes().entrySet()) {
- String cacheName = entry.getKey();
-
- for (Map.Entry<UUID, Boolean> tup : entry.getValue().entrySet())
- ctx.discovery().addClientNode(cacheName, tup.getKey(), tup.getValue());
- }
- }
-
- if (batch.startCaches()) {
- for (Map.Entry<String, DynamicCacheDescriptor> entry : registeredCaches.entrySet())
- ctx.discovery().addClientNode(entry.getKey(), joiningNodeId, false);
- }
- }
- }
-
- /**
- * @param clientNodeId Client node ID.
- * @param batch Cache change batch.
- */
- private void processClientReconnectData(UUID clientNodeId, DynamicCacheChangeBatch batch) {
- assert batch.clientReconnect() : batch;
-
- for (DynamicCacheChangeRequest req : batch.requests()) {
- assert !req.template() : req;
-
- initReceivedCacheConfiguration(req);
-
- String name = req.cacheName();
-
- boolean sysCache = CU.isUtilityCache(name) || CU.isAtomicsCache(name);
-
- if (!sysCache) {
- DynamicCacheDescriptor desc = cacheDescriptor(req.cacheName());
-
- if (desc != null && desc.deploymentId().equals(req.deploymentId())) {
- Map<UUID, Boolean> nodes = batch.clientNodes().get(name);
-
- assert nodes != null : req;
- assert nodes.containsKey(clientNodeId) : nodes;
-
- ctx.discovery().addClientNode(req.cacheName(), clientNodeId, nodes.get(clientNodeId));
- }
- }
- else
- ctx.discovery().addClientNode(req.cacheName(), clientNodeId, false);
- }
- }
-
- /**
- * Dynamically starts cache using template configuration.
- *
- * @param cacheName Cache name.
- * @return Future that will be completed when cache is deployed.
- */
- public IgniteInternalFuture<?> createFromTemplate(String cacheName) {
- try {
- CacheConfiguration cfg = createConfigFromTemplate(cacheName);
+ /**
+ * Dynamically starts cache using template configuration.
+ *
+ * @param cacheName Cache name.
+ * @return Future that will be completed when cache is deployed.
+ */
+ public IgniteInternalFuture<?> createFromTemplate(String cacheName) {
+ try {
+ CacheConfiguration cfg = createConfigFromTemplate(cacheName);
return dynamicStartCache(cfg, cacheName, null, true, true, true);
}
@@ -2491,7 +2107,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
List<CacheConfiguration> wildcardNameCfgs = null;
- for (DynamicCacheDescriptor desc : registeredTemplates.values()) {
+ for (DynamicCacheDescriptor desc : cachesInfo.registeredTemplates().values()) {
assert desc.template();
CacheConfiguration cfg = desc.cacheConfiguration();
@@ -2703,12 +2319,9 @@ public class GridCacheProcessor extends GridProcessorAdapter {
if (checkThreadTx)
checkEmptyTransactions();
- DynamicCacheChangeRequest t = new DynamicCacheChangeRequest(UUID.randomUUID(), cacheName, ctx.localNodeId());
-
- t.stop(true);
- t.destroy(true);
+ DynamicCacheChangeRequest req = DynamicCacheChangeRequest.stopRequest(ctx, cacheName, true);
- return F.first(initiateCacheChanges(F.asList(t), false));
+ return F.first(initiateCacheChanges(F.asList(req), false));
}
/**
@@ -2723,11 +2336,9 @@ public class GridCacheProcessor extends GridProcessorAdapter {
List<DynamicCacheChangeRequest> reqs = new ArrayList<>(cacheNames.size());
for (String cacheName : cacheNames) {
- DynamicCacheChangeRequest t = new DynamicCacheChangeRequest(UUID.randomUUID(), cacheName, ctx.localNodeId());
+ DynamicCacheChangeRequest req = DynamicCacheChangeRequest.stopRequest(ctx, cacheName, true);
- t.stop(true);
-
- reqs.add(t);
+ reqs.add(req);
}
GridCompoundFuture<?, ?> compoundFut = new GridCompoundFuture<>();
@@ -2744,7 +2355,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
* @param cacheName Cache name to close.
* @return Future that will be completed when cache is closed.
*/
- public IgniteInternalFuture<?> dynamicCloseCache(String cacheName) {
+ IgniteInternalFuture<?> dynamicCloseCache(String cacheName) {
assert cacheName != null;
IgniteCacheProxy<?, ?> proxy = jCacheProxies.get(cacheName);
@@ -2754,9 +2365,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
checkEmptyTransactions();
- DynamicCacheChangeRequest t = new DynamicCacheChangeRequest(UUID.randomUUID(), cacheName, ctx.localNodeId());
-
- t.close(true);
+ DynamicCacheChangeRequest t = DynamicCacheChangeRequest.closeRequest(ctx, cacheName);
return F.first(initiateCacheChanges(F.asList(t), false));
}
@@ -2771,7 +2380,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
checkEmptyTransactions();
if (F.isEmpty(cacheNames))
- cacheNames = registeredCaches.keySet();
+ cacheNames = cachesInfo.registeredCaches().keySet();
Collection<DynamicCacheChangeRequest> reqs = new ArrayList<>(cacheNames.size());
@@ -2779,16 +2388,12 @@ public class GridCacheProcessor extends GridProcessorAdapter {
DynamicCacheDescriptor desc = cacheDescriptor(cacheName);
if (desc == null) {
- log.warning("Reset lost partition will not be executed, " +
- "because cache with name:" + cacheName + " doesn't not exist");
+ U.warn(log, "Failed to find cache for reset lost partition request, cache does not exist: " + cacheName);
continue;
}
- DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(
- UUID.randomUUID(), cacheName, ctx.localNodeId());
-
- req.markResetLostPartitions();
+ DynamicCacheChangeRequest req = DynamicCacheChangeRequest.resetLostPartitions(ctx, cacheName);
reqs.add(req);
}
@@ -2841,14 +2446,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
List<DynamicCacheChangeRequest> reqs = new ArrayList<>();
for (String cacheName : cacheNames()) {
- DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(
- UUID.randomUUID(), cacheName, ctx.localNodeId());
-
- DynamicCacheDescriptor desc = cacheDescriptor(cacheName);
-
- req.deploymentId(desc.deploymentId());
- req.stop(true);
- req.destroy(false);
+ DynamicCacheChangeRequest req = DynamicCacheChangeRequest.stopRequest(ctx, cacheName, false);
reqs.add(req);
}
@@ -2876,11 +2474,10 @@ public class GridCacheProcessor extends GridProcessorAdapter {
String cacheName = cfg.getName();
- DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(
- UUID.randomUUID(), cacheName, ctx.localNodeId());
+ DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(UUID.randomUUID(), cacheName, ctx.localNodeId());
req.startCacheConfiguration(cfg);
- req.template(cfg.getName() != null && cfg.getName().endsWith("*"));
+ req.template(cfg.getName().endsWith("*"));
req.nearCacheConfiguration(cfg.getNearConfiguration());
req.deploymentId(IgniteUuid.randomUuid());
req.schema(new QuerySchema(cfg.getQueryEntities()));
@@ -2910,7 +2507,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
Collection<DynamicCacheChangeRequest> sndReqs = new ArrayList<>(reqs.size());
for (DynamicCacheChangeRequest req : reqs) {
- DynamicCacheStartFuture fut = new DynamicCacheStartFuture(req.cacheName(), req.deploymentId(), req);
+ DynamicCacheStartFuture fut = new DynamicCacheStartFuture(req.cacheName(), req);
try {
if (req.stop() || req.close()) {
@@ -2927,14 +2524,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
req.stop(true);
}
-
- IgniteUuid dynamicDeploymentId = desc.deploymentId();
-
- assert dynamicDeploymentId != null : desc;
-
- // Save deployment ID to avoid concurrent stops.
- req.deploymentId(dynamicDeploymentId);
- fut.deploymentId = dynamicDeploymentId;
}
}
@@ -2944,7 +2533,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
DynamicCacheStartFuture old = (DynamicCacheStartFuture)pendingFuts.putIfAbsent(
req.requestId(), fut);
- assert old == null; //TODO : check failIfExists.
+ assert old == null;
if (fut.isDone())
continue;
@@ -2993,12 +2582,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
* @param topVer Topology version.
*/
public void onDiscoveryEvent(int type, ClusterNode node, AffinityTopologyVersion topVer) {
- if (type == EVT_NODE_JOINED) {
- for (DynamicCacheDescriptor cacheDesc : cacheDescriptors()) {
- if (node.id().equals(cacheDesc.receivedFrom()))
- cacheDesc.receivedFromStartVersion(topVer);
- }
- }
+ cachesInfo.onDiscoveryEvent(type, node, topVer);
sharedCtx.affinity().onDiscoveryEvent(type, node, topVer);
}
@@ -3025,198 +2609,12 @@ public class GridCacheProcessor extends GridProcessorAdapter {
return true;
if (msg instanceof DynamicCacheChangeBatch)
- return onCacheChangeRequested((DynamicCacheChangeBatch)msg, topVer);
+ return cachesInfo.onCacheChangeRequested((DynamicCacheChangeBatch)msg, topVer);
return false;
}
/**
- * @param batch Change request batch.
- * @param topVer Current topology version.
- * @return {@code True} if minor topology version should be increased.
- */
- private boolean onCacheChangeRequested(
- DynamicCacheChangeBatch batch,
- AffinityTopologyVersion topVer
- ) {
- AffinityTopologyVersion newTopVer = null;
-
- boolean incMinorTopVer = false;
-
- for (DynamicCacheChangeRequest req : batch.requests()) {
- initReceivedCacheConfiguration(req);
-
- if (req.template()) {
- CacheConfiguration ccfg = req.startCacheConfiguration();
-
- assert ccfg != null : req;
-
- DynamicCacheDescriptor desc = registeredTemplates.get(req.cacheName());
-
- if (desc == null) {
- DynamicCacheDescriptor templateDesc = new DynamicCacheDescriptor(ctx, ccfg, req.cacheType(), true,
- req.deploymentId(), req.schema());
-
- DynamicCacheDescriptor old = registeredTemplates.put(ccfg.getName(), templateDesc);
-
- assert old == null :
- "Dynamic cache map was concurrently modified [new=" + templateDesc + ", old=" + old + ']';
- }
-
- TemplateConfigurationFuture fut =
- (TemplateConfigurationFuture)pendingTemplateFuts.get(ccfg.getName());
-
- if (fut != null && fut.deploymentId().equals(req.deploymentId()))
- fut.onDone();
-
- continue;
- }
-
- DynamicCacheDescriptor desc = cacheDescriptor(req.cacheName());
-
- DynamicCacheStartFuture fut = null;
-
- if (ctx.localNodeId().equals(req.initiatingNodeId())) {
- fut = (DynamicCacheStartFuture)pendingFuts.get(req.requestId());
-
- if (fut != null && !F.eq(req.deploymentId(), fut.deploymentId()))
- fut = null;
- }
-
- boolean needExchange = false;
-
- if (req.start()) {
- if (desc == null) {
- if (req.clientStartOnly()) {
- if (fut != null)
- fut.onDone(new IgniteCheckedException("Failed to start client cache " +
- "(a cache with the given name is not started): " + U.maskName(req.cacheName())));
- }
- else {
- CacheConfiguration ccfg = req.startCacheConfiguration();
-
- assert req.cacheType() != null : req;
- assert F.eq(ccfg.getName(), req.cacheName()) : req;
-
- DynamicCacheDescriptor startDesc = new DynamicCacheDescriptor(ctx, ccfg, req.cacheType(), false,
- req.deploymentId(), req.schema());
-
- if (newTopVer == null) {
- newTopVer = new AffinityTopologyVersion(topVer.topologyVersion(),
- topVer.minorTopologyVersion() + 1);
- }
-
- startDesc.startTopologyVersion(newTopVer);
-
- DynamicCacheDescriptor old = cacheDescriptor(ccfg.getName(), startDesc);
-
- assert old == null :
- "Dynamic cache map was concurrently modified [new=" + startDesc + ", old=" + old + ']';
-
- ctx.discovery().setCacheFilter(
- ccfg.getName(),
- ccfg.getNodeFilter(),
- ccfg.getNearConfiguration() != null,
- ccfg.getCacheMode());
-
- ctx.discovery().addClientNode(req.cacheName(),
- req.initiatingNodeId(),
- req.nearCacheConfiguration() != null);
-
- needExchange = true;
- }
- }
- else {
- assert req.initiatingNodeId() != null : req;
-
- // Cache already exists, exchange is needed only if client cache should be created.
- ClusterNode node = ctx.discovery().node(req.initiatingNodeId());
-
- boolean clientReq = node != null &&
- !ctx.discovery().cacheAffinityNode(node, req.cacheName());
-
- if (req.clientStartOnly()) {
- needExchange = clientReq && ctx.discovery().addClientNode(req.cacheName(),
- req.initiatingNodeId(),
- req.nearCacheConfiguration() != null);
- }
- else {
- if (req.failIfExists()) {
- if (fut != null)
- fut.onDone(new CacheExistsException("Failed to start cache " +
- "(a cache with the same name is already started): " + U.maskName(req.cacheName())));
- }
- else {
- needExchange = clientReq && ctx.discovery().addClientNode(req.cacheName(),
- req.initiatingNodeId(),
- req.nearCacheConfiguration() != null);
-
- if (needExchange)
- req.clientStartOnly(true);
- }
- }
-
- if (needExchange) {
- if (newTopVer == null) {
- newTopVer = new AffinityTopologyVersion(topVer.topologyVersion(),
- topVer.minorTopologyVersion() + 1);
- }
-
- desc.clientCacheStartVersion(newTopVer);
- }
- }
-
- if (!needExchange && desc != null) {
- if (desc.clientCacheStartVersion() != null)
- req.cacheFutureTopologyVersion(desc.clientCacheStartVersion());
- else
- req.cacheFutureTopologyVersion(desc.startTopologyVersion());
- }
- }
- else if (req.globalStateChange() || req.resetLostPartitions())
- needExchange = true;
- else {
- assert req.stop() ^ req.close() : req;
-
- if (desc != null) {
- if (req.stop()) {
- DynamicCacheDescriptor old = registeredCaches.remove(req.cacheName());
-
- assert old != null : "Dynamic cache map was concurrently modified [req=" + req + ']';
-
- ctx.discovery().removeCacheFilter(req.cacheName());
-
- needExchange = true;
- }
- else {
- assert req.close() : req;
-
- needExchange = ctx.discovery().onClientCacheClose(req.cacheName(), req.initiatingNodeId());
- }
- }
- }
-
- req.exchangeNeeded(needExchange);
-
- incMinorTopVer |= needExchange;
- }
-
- return incMinorTopVer;
- }
-
- /**
- * @param req Cache change request.
- */
- private void initReceivedCacheConfiguration(DynamicCacheChangeRequest req) {
- if (req.startCacheConfiguration() != null) {
- CacheConfiguration ccfg = req.startCacheConfiguration();
-
- if (ccfg.isStoreKeepBinary() == null)
- ccfg.setStoreKeepBinary(CacheConfiguration.DFLT_STORE_KEEP_BINARY);
- }
- }
-
- /**
* Checks that preload-order-dependant caches has SYNC or ASYNC preloading mode.
*
* @param cfgs Caches.
@@ -3291,110 +2689,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
}
/**
- * Checks that remote caches has configuration compatible with the local.
- *
- * @param locCfg Local configuration.
- * @param rmtCfg Remote configuration.
- * @param rmtNode Remote node.
- * @throws IgniteCheckedException If check failed.
- */
- private void checkCache(CacheConfiguration locCfg, CacheConfiguration rmtCfg, ClusterNode rmtNode) throws IgniteCheckedException {
- ClusterNode locNode = ctx.discovery().localNode();
-
- UUID rmt = rmtNode.id();
-
- GridCacheAttributes rmtAttr = new GridCacheAttributes(rmtCfg);
- GridCacheAttributes locAttr = new GridCacheAttributes(locCfg);
-
- boolean isLocAff = CU.affinityNode(locNode, locCfg.getNodeFilter());
- boolean isRmtAff = CU.affinityNode(rmtNode, rmtCfg.getNodeFilter());
-
- CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "cacheMode", "Cache mode",
- locAttr.cacheMode(), rmtAttr.cacheMode(), true);
-
- if (rmtAttr.cacheMode() != LOCAL) {
- CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "interceptor", "Cache Interceptor",
- locAttr.interceptorClassName(), rmtAttr.interceptorClassName(), true);
-
- CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "atomicityMode",
- "Cache atomicity mode", locAttr.atomicityMode(), rmtAttr.atomicityMode(), true);
-
- CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "cachePreloadMode",
- "Cache preload mode", locAttr.cacheRebalanceMode(), rmtAttr.cacheRebalanceMode(), true);
-
- boolean checkStore = isLocAff && isRmtAff;
-
- if (checkStore)
- CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "storeFactory", "Store factory",
- locAttr.storeFactoryClassName(), rmtAttr.storeFactoryClassName(), true);
-
- CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "cacheAffinity", "Cache affinity",
- locAttr.cacheAffinityClassName(), rmtAttr.cacheAffinityClassName(), true);
-
- CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "cacheAffinityMapper",
- "Cache affinity mapper", locAttr.cacheAffinityMapperClassName(),
- rmtAttr.cacheAffinityMapperClassName(), true);
-
- CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "affinityPartitionsCount",
- "Affinity partitions count", locAttr.affinityPartitionsCount(),
- rmtAttr.affinityPartitionsCount(), true);
-
- CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "evictionFilter", "Eviction filter",
- locAttr.evictionFilterClassName(), rmtAttr.evictionFilterClassName(), true);
-
- CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "evictionPolicy", "Eviction policy",
- locAttr.evictionPolicyClassName(), rmtAttr.evictionPolicyClassName(), true);
-
- CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "transactionManagerLookup",
- "Transaction manager lookup", locAttr.transactionManagerLookupClassName(),
- rmtAttr.transactionManagerLookupClassName(), false);
-
- CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "defaultLockTimeout",
- "Default lock timeout", locAttr.defaultLockTimeout(), rmtAttr.defaultLockTimeout(), false);
-
- CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "preloadBatchSize",
- "Preload batch size", locAttr.rebalanceBatchSize(), rmtAttr.rebalanceBatchSize(), false);
-
- CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "writeSynchronizationMode",
- "Write synchronization mode", locAttr.writeSynchronization(), rmtAttr.writeSynchronization(),
- true);
-
- CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "writeBehindBatchSize",
- "Write behind batch size", locAttr.writeBehindBatchSize(), rmtAttr.writeBehindBatchSize(),
- false);
-
- CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "writeBehindEnabled",
- "Write behind enabled", locAttr.writeBehindEnabled(), rmtAttr.writeBehindEnabled(), false);
-
- CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "writeBehindFlushFrequency",
- "Write behind flush frequency", locAttr.writeBehindFlushFrequency(),
- rmtAttr.writeBehindFlushFrequency(), false);
-
- CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "writeBehindFlushSize",
- "Write behind flush size", locAttr.writeBehindFlushSize(), rmtAttr.writeBehindFlushSize(),
- false);
-
- CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "writeBehindFlushThreadCount",
- "Write behind flush thread count", locAttr.writeBehindFlushThreadCount(),
- rmtAttr.writeBehindFlushThreadCount(), false);
-
- if (locAttr.cacheMode() == PARTITIONED) {
- CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "nearEvictionPolicy",
- "Near eviction policy", locAttr.nearEvictionPolicyClassName(),
- rmtAttr.nearEvictionPolicyClassName(), false);
-
- CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "affinityIncludeNeighbors",
- "Affinity include neighbors", locAttr.affinityIncludeNeighbors(),
- rmtAttr.affinityIncludeNeighbors(), true);
-
- CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "affinityKeyBackups",
- "Affinity key backups", locAttr.affinityKeyBackups(),
- rmtAttr.affinityKeyBackups(), true);
- }
- }
- }
-
- /**
* @param rmt Remote node to check.
* @throws IgniteCheckedException If check failed.
*/
@@ -3635,27 +2929,14 @@ public class GridCacheProcessor extends GridProcessorAdapter {
* @return Descriptor.
*/
public DynamicCacheDescriptor cacheDescriptor(String name) {
- return name != null ? registeredCaches.get(name) : null;
- }
-
- /**
- * Put registered cache descriptor.
- *
- * @param name Name.
- * @param desc Descriptor.
- * @return Old descriptor (if any).
- */
- private DynamicCacheDescriptor cacheDescriptor(String name, DynamicCacheDescriptor desc) {
- assert name != null;
-
- return registeredCaches.put(name, desc);
+ return cachesInfo.registeredCaches().get(name);
}
/**
* @return Cache descriptors.
*/
public Collection<DynamicCacheDescriptor> cacheDescriptors() {
- return registeredCaches.values();
+ return cachesInfo.registeredCaches().values();
}
/**
@@ -3682,23 +2963,14 @@ public class GridCacheProcessor extends GridProcessorAdapter {
public void addCacheConfiguration(CacheConfiguration cacheCfg) throws IgniteCheckedException {
assert cacheCfg.getName() != null;
- String masked = cacheCfg.getName();
+ String name = cacheCfg.getName();
- DynamicCacheDescriptor desc = registeredTemplates.get(masked);
+ DynamicCacheDescriptor desc = cachesInfo.registeredTemplates().get(name);
if (desc != null)
return;
- DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(UUID.randomUUID(), cacheCfg.getName(), ctx.localNodeId());
-
- CacheConfiguration cfg = new CacheConfiguration(cacheCfg);
-
- req.template(true);
-
- req.startCacheConfiguration(cfg);
- req.schema(new QuerySchema(cfg.getQueryEntities()));
-
- req.deploymentId(IgniteUuid.randomUuid());
+ DynamicCacheChangeRequest req = DynamicCacheChangeRequest.addTemplateRequest(ctx, cacheCfg);
TemplateConfigurationFuture fut = new TemplateConfigurationFuture(req.cacheName(), req.deploymentId());
@@ -3749,6 +3021,14 @@ public class GridCacheProcessor extends GridProcessorAdapter {
}
/**
+ * @param name Cache name.
+ * @return Cache proxy.
+ */
+ @Nullable public IgniteCacheProxy jcacheProxy(String name) {
+ return jCacheProxies.get(name);
+ }
+
+ /**
* @return All configured public cache instances.
*/
public Collection<IgniteCacheProxy<?, ?>> publicCaches() {
@@ -3859,7 +3139,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
* @throws IgniteCheckedException In case of error.
*/
public void createMissingQueryCaches() throws IgniteCheckedException {
- for (Map.Entry<String, DynamicCacheDescriptor> e : registeredCaches.entrySet()) {
+ for (Map.Entry<String, DynamicCacheDescriptor> e : cachesInfo.registeredCaches().entrySet()) {
DynamicCacheDescriptor desc = e.getValue();
if (isMissingQueryCache(desc))
@@ -4171,10 +3451,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
*/
@SuppressWarnings("ExternalizableWithoutPublicNoArgConstructor")
private class DynamicCacheStartFuture extends GridFutureAdapter<Object> {
- /** Start ID. */
- @GridToStringInclude
- private IgniteUuid deploymentId;
-
/** Cache name. */
private String cacheName;
@@ -4184,23 +3460,14 @@ public class GridCacheProcessor extends GridProcessorAdapter {
/**
* @param cacheName Cache name.
- * @param deploymentId Deployment ID.
* @param req Cache start request.
*/
- private DynamicCacheStartFuture(String cacheName, IgniteUuid deploymentId, DynamicCacheChangeRequest req) {
- this.deploymentId = deploymentId;
+ private DynamicCacheStartFuture(String cacheName, DynamicCacheChangeRequest req) {
this.cacheName = cacheName;
this.req = req;
}
/**
- * @return Start ID.
- */
- public IgniteUuid deploymentId() {
- return deploymentId;
- }
-
- /**
* @return Request.
*/
public DynamicCacheChangeRequest request() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index f3c3a1b..1de64c5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -281,7 +281,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
long updateSeq = this.updateSeq.incrementAndGet();
// If this is the oldest node.
- if (oldest.id().equals(loc.id()) || exchFut.isCacheAdded(cacheId, exchId.topologyVersion())) {
+ if (oldest.id().equals(loc.id()) || exchFut.dynamicCacheStarted(cacheId)) {
if (node2part == null) {
node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq);
http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java
index 94f11ed..f80adc5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java
@@ -23,7 +23,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheMessage;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
-import org.jetbrains.annotations.NotNull;
/**
* Affinity assignment request.
@@ -32,6 +31,9 @@ public class GridDhtAffinityAssignmentRequest extends GridCacheMessage {
/** */
private static final long serialVersionUID = 0L;
+ /** */
+ private long futId;
+
/** Topology version being queried. */
private AffinityTopologyVersion topVer;
@@ -43,14 +45,28 @@ public class GridDhtAffinityAssignmentRequest extends GridCacheMessage {
}
/**
+ * @param futId Future ID.
* @param cacheId Cache ID.
* @param topVer Topology version.
*/
- public GridDhtAffinityAssignmentRequest(int cacheId, @NotNull AffinityTopologyVersion topVer) {
+ public GridDhtAffinityAssignmentRequest(
+ long futId,
+ int cacheId,
+ AffinityTopologyVersion topVer) {
+ assert topVer != null;
+
+ this.futId = futId;
this.cacheId = cacheId;
this.topVer = topVer;
}
+ /**
+ * @return Future ID.
+ */
+ public long futureId() {
+ return futId;
+ }
+
/** {@inheritDoc} */
@Override public boolean addDeploymentInfo() {
return false;
@@ -75,7 +91,7 @@ public class GridDhtAffinityAssignmentRequest extends GridCacheMessage {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 4;
+ return 5;
}
/** {@inheritDoc} */
@@ -94,6 +110,12 @@ public class GridDhtAffinityAssignmentRequest extends GridCacheMessage {
switch (writer.state()) {
case 3:
+ if (!writer.writeLong("futId", futId))
+ return false;
+
+ writer.incrementState();
+
+ case 4:
if (!writer.writeMessage("topVer", topVer))
return false;
@@ -116,6 +138,14 @@ public class GridDhtAffinityAssignmentRequest extends GridCacheMessage {
switch (reader.state()) {
case 3:
+ futId = reader.readLong("futId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 4:
topVer = reader.readMessage("topVer");
if (!reader.isLastRead())
[4/6] ignite git commit: Moved logic related to caches discovery data
handling to ClusterCachesInfo. Start of statically configured caches in the
same way as dynamic ones: from GridDhtPartitionsExchangeFuture.
Posted by sb...@apache.org.
http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
index 66e780f..3c65326 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
@@ -18,10 +18,9 @@
package org.apache.ignite.internal.processors.cache;
import java.util.Collection;
-import java.util.Map;
-import java.util.UUID;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;
@@ -33,29 +32,22 @@ public class DynamicCacheChangeBatch implements DiscoveryCustomMessage {
/** */
private static final long serialVersionUID = 0L;
+ /** Discovery custom message ID. */
+ private IgniteUuid id = IgniteUuid.randomUuid();
+
/** Change requests. */
@GridToStringInclude
private Collection<DynamicCacheChangeRequest> reqs;
- /** Client nodes map. Used in discovery data exchange. */
- @GridToStringInclude
- private Map<String, Map<UUID, Boolean>> clientNodes;
-
- /** Custom message ID. */
- private IgniteUuid id = IgniteUuid.randomUuid();
-
- /** */
- private boolean clientReconnect;
-
- /** */
- private boolean startCaches;
+ /** Cache updates to be executed on exchange. */
+ private transient ExchangeActions exchangeActions;
/**
* @param reqs Requests.
*/
- public DynamicCacheChangeBatch(
- Collection<DynamicCacheChangeRequest> reqs
- ) {
+ public DynamicCacheChangeBatch(Collection<DynamicCacheChangeRequest> reqs) {
+ assert !F.isEmpty(reqs) : reqs;
+
this.reqs = reqs;
}
@@ -64,34 +56,6 @@ public class DynamicCacheChangeBatch implements DiscoveryCustomMessage {
return id;
}
- /**
- * @param id Message ID.
- */
- public void id(IgniteUuid id) {
- this.id = id;
- }
-
- /**
- * @return Collection of change requests.
- */
- public Collection<DynamicCacheChangeRequest> requests() {
- return reqs;
- }
-
- /**
- * @return Client nodes map.
- */
- public Map<String, Map<UUID, Boolean>> clientNodes() {
- return clientNodes;
- }
-
- /**
- * @param clientNodes Client nodes map.
- */
- public void clientNodes(Map<String, Map<UUID, Boolean>> clientNodes) {
- this.clientNodes = clientNodes;
- }
-
/** {@inheritDoc} */
@Nullable @Override public DiscoveryCustomMessage ackMessage() {
return null;
@@ -103,45 +67,33 @@ public class DynamicCacheChangeBatch implements DiscoveryCustomMessage {
}
/**
- * @param clientReconnect {@code True} if this is discovery data sent on client reconnect.
- */
- public void clientReconnect(boolean clientReconnect) {
- this.clientReconnect = clientReconnect;
- }
-
- /**
- * @return {@code True} if this is discovery data sent on client reconnect.
+ * @return Collection of change requests.
*/
- public boolean clientReconnect() {
- return clientReconnect;
+ public Collection<DynamicCacheChangeRequest> requests() {
+ return reqs;
}
/**
- * @return {@code True} if required to start all caches on client node.
+ * @return {@code True} if request should trigger partition exchange.
*/
- public boolean startCaches() {
- return startCaches;
+ public boolean exchangeNeeded() {
+ return exchangeActions != null;
}
/**
- * @param startCaches {@code True} if required to start all caches on client node.
+ * @return Cache updates to be executed on exchange.
*/
- public void startCaches(boolean startCaches) {
- this.startCaches = startCaches;
+ ExchangeActions exchangeActions() {
+ return exchangeActions;
}
/**
- * @return {@code True} if request should trigger partition exchange.
+ * @param exchangeActions Cache updates to be executed on exchange.
*/
- public boolean exchangeNeeded() {
- if (reqs != null) {
- for (DynamicCacheChangeRequest req : reqs) {
- if (req.exchangeNeeded())
- return true;
- }
- }
+ void exchangeActions(ExchangeActions exchangeActions) {
+ assert exchangeActions != null && !exchangeActions.empty() : exchangeActions;
- return false;
+ this.exchangeActions = exchangeActions;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
index 9d2563d..f8c2c7d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
@@ -19,7 +19,7 @@ package org.apache.ignite.internal.processors.cache;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.NearCacheConfiguration;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.query.QuerySchema;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -88,78 +88,114 @@ public class DynamicCacheChangeRequest implements Serializable {
/** Dynamic schema. */
private QuerySchema schema;
- /** */
- private transient boolean exchangeNeeded;
-
- /** */
- private transient AffinityTopologyVersion cacheFutTopVer;
-
/**
- * Constructor creates cache stop request.
- *
+ * @param reqId Unique request ID.
* @param cacheName Cache stop name.
* @param initiatingNodeId Initiating node ID.
*/
public DynamicCacheChangeRequest(UUID reqId, String cacheName, UUID initiatingNodeId) {
+ assert reqId != null;
+ assert cacheName != null;
+ assert initiatingNodeId != null;
+
this.reqId = reqId;
this.cacheName = cacheName;
this.initiatingNodeId = initiatingNodeId;
}
/**
- * @return Request ID.
+ * @param reqId Unique request ID.
+ * @param state New cluster state.
+ * @param initiatingNodeId Initiating node ID.
*/
- public UUID requestId() {
- return reqId;
+ public DynamicCacheChangeRequest(UUID reqId, ClusterState state, UUID initiatingNodeId) {
+ assert reqId != null;
+ assert state != null;
+ assert initiatingNodeId != null;
+
+ this.reqId = reqId;
+ this.state = state;
+ this.initiatingNodeId = initiatingNodeId;
}
/**
- * @return {@code True} if request should trigger partition exchange.
+ * @param ctx Context.
+ * @param cacheName Cache name.
+ * @return Request to reset lost partitions.
*/
- public boolean exchangeNeeded() {
- return exchangeNeeded;
+ static DynamicCacheChangeRequest resetLostPartitions(GridKernalContext ctx, String cacheName) {
+ DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(UUID.randomUUID(), cacheName, ctx.localNodeId());
+
+ req.markResetLostPartitions();
+
+ return req;
}
/**
- * @return State.
+ * @param ctx Context.
+ * @param cfg0 Template configuration.
+ * @return Request to add template.
*/
- public ClusterState state() {
- return state;
+ static DynamicCacheChangeRequest addTemplateRequest(GridKernalContext ctx, CacheConfiguration<?, ?> cfg0) {
+ CacheConfiguration<?, ?> cfg = new CacheConfiguration<>(cfg0);
+
+ DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(UUID.randomUUID(), cfg.getName(), ctx.localNodeId());
+
+ req.template(true);
+ req.startCacheConfiguration(cfg);
+ req.schema(new QuerySchema(cfg.getQueryEntities()));
+ req.deploymentId(IgniteUuid.randomUuid());
+
+ return req;
}
/**
- * @param state State.
+ * @param ctx Context.
+ * @param cacheName Cache name.
+ * @return Request to close client cache.
*/
- public void state(ClusterState state) {
- this.state = state;
+ static DynamicCacheChangeRequest closeRequest(GridKernalContext ctx, String cacheName) {
+ DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(UUID.randomUUID(), cacheName, ctx.localNodeId());
+
+ req.close(true);
+
+ return req;
}
/**
- * @return {@code True} if global caches state is changes.
+ * @param ctx Context.
+ * @param cacheName Cache name.
+ * @param destroy Destroy flag.
+ * @return Cache stop request.
*/
- public boolean globalStateChange() {
- return state != null;
+ static DynamicCacheChangeRequest stopRequest(GridKernalContext ctx, String cacheName, boolean destroy) {
+ DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(UUID.randomUUID(), cacheName, ctx.localNodeId());
+
+ req.stop(true);
+ req.destroy(destroy);
+
+ return req;
}
/**
- * @param cacheFutTopVer Ready topology version when dynamic cache future should be completed.
+ * @return Request ID.
*/
- public void cacheFutureTopologyVersion(AffinityTopologyVersion cacheFutTopVer) {
- this.cacheFutTopVer = cacheFutTopVer;
+ public UUID requestId() {
+ return reqId;
}
/**
- * @return Ready topology version when dynamic cache future should be completed.
+ * @return State.
*/
- @Nullable public AffinityTopologyVersion cacheFutureTopologyVersion() {
- return cacheFutTopVer;
+ public ClusterState state() {
+ return state;
}
/**
- * @param exchangeNeeded {@code True} if request should trigger partition exchange.
+ * @return {@code True} if global caches state is changes.
*/
- public void exchangeNeeded(boolean exchangeNeeded) {
- this.exchangeNeeded = exchangeNeeded;
+ public boolean globalStateChange() {
+ return state != null;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
index 09b4c3a..40d3706 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
@@ -17,10 +17,9 @@
package org.apache.ignite.internal.processors.cache;
-import java.util.HashMap;
-import java.util.Map;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
@@ -46,21 +45,12 @@ public class DynamicCacheDescriptor {
@GridToStringExclude
private CacheConfiguration cacheCfg;
- /** Locally configured flag. */
- private boolean locCfg;
-
/** Statically configured flag. */
- private boolean staticCfg;
-
- /** Started flag. */
- private boolean started;
+ private final boolean staticCfg;
/** Cache type. */
private CacheType cacheType;
- /** */
- private volatile Map<UUID, CacheConfiguration> rmtCfgs;
-
/** Template configuration flag. */
private boolean template;
@@ -71,19 +61,10 @@ public class DynamicCacheDescriptor {
private boolean updatesAllowed = true;
/** */
- private AffinityTopologyVersion startTopVer;
-
- /** */
- private boolean rcvdOnDiscovery;
-
- /** */
private Integer cacheId;
/** */
- private UUID rcvdFrom;
-
- /** */
- private AffinityTopologyVersion rcvdFromVer;
+ private final UUID rcvdFrom;
/** Mutex. */
private final Object mux = new Object();
@@ -92,7 +73,16 @@ public class DynamicCacheDescriptor {
private volatile CacheObjectContext objCtx;
/** */
- private transient AffinityTopologyVersion clientCacheStartVer;
+ private boolean rcvdOnDiscovery;
+
+ /** */
+ private AffinityTopologyVersion startTopVer;
+
+ /** */
+ private AffinityTopologyVersion rcvdFromVer;
+
+ /** */
+ private AffinityTopologyVersion clientCacheStartVer;
/** Mutex to control schema. */
private final Object schemaMux = new Object();
@@ -105,21 +95,34 @@ public class DynamicCacheDescriptor {
* @param cacheCfg Cache configuration.
* @param cacheType Cache type.
* @param template {@code True} if this is template configuration.
+ * @param rcvdFrom ID of node provided cache configuration
+ * @param staticCfg {@code True} if cache statically configured.
* @param deploymentId Deployment ID.
+ * @param schema Query schema.
*/
@SuppressWarnings("unchecked")
public DynamicCacheDescriptor(GridKernalContext ctx,
CacheConfiguration cacheCfg,
CacheType cacheType,
boolean template,
+ UUID rcvdFrom,
+ boolean staticCfg,
IgniteUuid deploymentId,
QuerySchema schema) {
assert cacheCfg != null;
assert schema != null;
+ if (cacheCfg.getCacheMode() == CacheMode.REPLICATED && cacheCfg.getNearConfiguration() != null) {
+ cacheCfg = new CacheConfiguration(cacheCfg);
+
+ cacheCfg.setNearConfiguration(null);
+ }
+
this.cacheCfg = cacheCfg;
this.cacheType = cacheType;
this.template = template;
+ this.rcvdFrom = rcvdFrom;
+ this.staticCfg = staticCfg;
this.deploymentId = deploymentId;
pluginMgr = new CachePluginManager(ctx, cacheCfg);
@@ -139,20 +142,6 @@ public class DynamicCacheDescriptor {
}
/**
- * @return Start topology version.
- */
- @Nullable public AffinityTopologyVersion startTopologyVersion() {
- return startTopVer;
- }
-
- /**
- * @param startTopVer Start topology version.
- */
- public void startTopologyVersion(AffinityTopologyVersion startTopVer) {
- this.startTopVer = startTopVer;
- }
-
- /**
* @return {@code True} if this is template configuration.
*/
public boolean template() {
@@ -174,27 +163,6 @@ public class DynamicCacheDescriptor {
}
/**
- * @param deploymentId Deployment ID.
- */
- public void deploymentId(IgniteUuid deploymentId) {
- this.deploymentId = deploymentId;
- }
-
- /**
- * @return Locally configured flag.
- */
- public boolean locallyConfigured() {
- return locCfg;
- }
-
- /**
- * @param locCfg Locally configured flag.
- */
- public void locallyConfigured(boolean locCfg) {
- this.locCfg = locCfg;
- }
-
- /**
* @return {@code True} if statically configured.
*/
public boolean staticallyConfigured() {
@@ -202,30 +170,12 @@ public class DynamicCacheDescriptor {
}
/**
- * @param staticCfg {@code True} if statically configured.
+ * @return Cache name.
*/
- public void staticallyConfigured(boolean staticCfg) {
- this.staticCfg = staticCfg;
- }
+ public String cacheName() {
+ assert cacheCfg != null : this;
- /**
- * @return {@code True} if started flag was flipped by this call.
- */
- public boolean onStart() {
- if (!started) {
- started = true;
-
- return true;
- }
-
- return false;
- }
-
- /**
- * @return Started flag.
- */
- public boolean started() {
- return started;
+ return cacheCfg.getName();
}
/**
@@ -239,6 +189,7 @@ public class DynamicCacheDescriptor {
* Creates and caches cache object context if needed.
*
* @param proc Object processor.
+ * @return Cache object context.
*/
public CacheObjectContext cacheObjectContext(IgniteCacheObjectProcessor proc) throws IgniteCheckedException {
if (objCtx == null) {
@@ -259,36 +210,6 @@ public class DynamicCacheDescriptor {
}
/**
- * @param nodeId Remote node ID.
- * @return Configuration.
- */
- public CacheConfiguration remoteConfiguration(UUID nodeId) {
- Map<UUID, CacheConfiguration> cfgs = rmtCfgs;
-
- return cfgs == null ? null : cfgs.get(nodeId);
- }
-
- /**
- * @param nodeId Remote node ID.
- * @param cfg Remote node configuration.
- */
- public void addRemoteConfiguration(UUID nodeId, CacheConfiguration cfg) {
- Map<UUID, CacheConfiguration> cfgs = rmtCfgs;
-
- if (cfgs == null)
- rmtCfgs = cfgs = new HashMap<>();
-
- cfgs.put(nodeId, cfg);
- }
-
- /**
- *
- */
- public void clearRemoteConfigurations() {
- rmtCfgs = null;
- }
-
- /**
* @return Updates allowed flag.
*/
public boolean updatesAllowed() {
@@ -305,43 +226,51 @@ public class DynamicCacheDescriptor {
/**
* @return {@code True} if received in discovery data.
*/
- public boolean receivedOnDiscovery() {
+ boolean receivedOnDiscovery() {
return rcvdOnDiscovery;
}
/**
* @param rcvdOnDiscovery {@code True} if received in discovery data.
*/
- public void receivedOnDiscovery(boolean rcvdOnDiscovery) {
+ void receivedOnDiscovery(boolean rcvdOnDiscovery) {
this.rcvdOnDiscovery = rcvdOnDiscovery;
}
/**
- * @param nodeId ID of node provided cache configuration in discovery data.
+ * @return ID of node provided cache configuration in discovery data.
*/
- public void receivedFrom(UUID nodeId) {
- rcvdFrom = nodeId;
+ @Nullable public UUID receivedFrom() {
+ return rcvdFrom;
}
/**
* @return Topology version when node provided cache configuration was started.
*/
- @Nullable public AffinityTopologyVersion receivedFromStartVersion() {
+ @Nullable AffinityTopologyVersion receivedFromStartVersion() {
return rcvdFromVer;
}
/**
* @param rcvdFromVer Topology version when node provided cache configuration was started.
*/
- public void receivedFromStartVersion(AffinityTopologyVersion rcvdFromVer) {
+ void receivedFromStartVersion(AffinityTopologyVersion rcvdFromVer) {
this.rcvdFromVer = rcvdFromVer;
}
+
/**
- * @return ID of node provided cache configuration in discovery data.
+ * @return Start topology version.
*/
- @Nullable public UUID receivedFrom() {
- return rcvdFrom;
+ @Nullable public AffinityTopologyVersion startTopologyVersion() {
+ return startTopVer;
+ }
+
+ /**
+ * @param startTopVer Start topology version.
+ */
+ public void startTopologyVersion(AffinityTopologyVersion startTopVer) {
+ this.startTopVer = startTopVer;
}
/**
@@ -354,7 +283,7 @@ public class DynamicCacheDescriptor {
/**
* @param clientCacheStartVer Version when client cache on local node was started.
*/
- public void clientCacheStartVersion(AffinityTopologyVersion clientCacheStartVer) {
+ void clientCacheStartVersion(AffinityTopologyVersion clientCacheStartVer) {
this.clientCacheStartVer = clientCacheStartVer;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
new file mode 100644
index 0000000..eac1120
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.ignite.internal.util.typedef.F;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Cache change requests to execute when receive {@link DynamicCacheChangeBatch} event.
+ */
+public class ExchangeActions {
+ /** */
+ private Map<String, ActionData> cachesToStart;
+
+ /** */
+ private Map<String, ActionData> clientCachesToStart;
+
+ /** */
+ private Map<String, ActionData> cachesToStop;
+
+ /** */
+ private Map<String, ActionData> cachesToClose;
+
+ /** */
+ private Map<String, ActionData> cachesToResetLostParts;
+
+ /** */
+ private ClusterState newState;
+
+ /**
+ * @return {@code True} if server nodes should not participate in exchange.
+ */
+ boolean clientOnlyExchange() {
+ return F.isEmpty(cachesToStart) &&
+ F.isEmpty(cachesToStop) &&
+ F.isEmpty(cachesToResetLostParts);
+ }
+
+ /**
+ * @param nodeId Local node ID.
+ * @return Close cache requests.
+ */
+ List<DynamicCacheChangeRequest> closeRequests(UUID nodeId) {
+ List<DynamicCacheChangeRequest> res = null;
+
+ if (cachesToClose != null) {
+ for (ActionData req : cachesToClose.values()) {
+ if (nodeId.equals(req.req.initiatingNodeId())) {
+ if (res == null)
+ res = new ArrayList<>(cachesToClose.size());
+
+ res.add(req.req);
+ }
+ }
+ }
+
+ return res != null ? res : Collections.<DynamicCacheChangeRequest>emptyList();
+ }
+
+ /**
+ * @return New caches start requests.
+ */
+ Collection<ActionData> cacheStartRequests() {
+ return cachesToStart != null ? cachesToStart.values() : Collections.<ActionData>emptyList();
+ }
+
+ /**
+ * @return Start cache requests.
+ */
+ Collection<ActionData> newAndClientCachesStartRequests() {
+ if (cachesToStart != null || clientCachesToStart != null) {
+ List<ActionData> res = new ArrayList<>();
+
+ if (cachesToStart != null)
+ res.addAll(cachesToStart.values());
+
+ if (clientCachesToStart != null)
+ res.addAll(clientCachesToStart.values());
+
+ return res;
+ }
+
+ return Collections.emptyList();
+ }
+
+ /**
+ * @return Stop cache requests.
+ */
+ Collection<ActionData> cacheStopRequests() {
+ return cachesToStop != null ? cachesToStop.values() : Collections.<ActionData>emptyList();
+ }
+
+ /**
+ * @param ctx Context.
+ */
+ public void completeRequestFutures(GridCacheSharedContext ctx) {
+ completeRequestFutures(cachesToStart, ctx);
+ completeRequestFutures(cachesToStop, ctx);
+ completeRequestFutures(cachesToClose, ctx);
+ completeRequestFutures(clientCachesToStart, ctx);
+ completeRequestFutures(cachesToResetLostParts, ctx);
+ }
+
+ /**
+ * @param map Actions map.
+ * @param ctx Context.
+ */
+ private void completeRequestFutures(Map<String, ActionData> map, GridCacheSharedContext ctx) {
+ if (map != null) {
+ for (ActionData req : map.values())
+ ctx.cache().completeCacheStartFuture(req.req, null);
+ }
+ }
+
+ /**
+ * @return {@code True} if have cache stop requests.
+ */
+ public boolean hasStop() {
+ return !F.isEmpty(cachesToStop);
+ }
+
+ /**
+ * @return Caches to reset lost partitions for.
+ */
+ public Set<String> cachesToResetLostPartitions() {
+ Set<String> caches = null;
+
+ if (cachesToResetLostParts != null)
+ caches = new HashSet<>(cachesToResetLostParts.keySet());
+
+ return caches != null ? caches : Collections.<String>emptySet();
+ }
+
+ /**
+ * @param cacheId Cache ID.
+ * @return {@code True} if cache stop was requested.
+ */
+ public boolean cacheStopped(int cacheId) {
+ if (cachesToStop != null) {
+ for (ActionData cache : cachesToStop.values()) {
+ if (cache.desc.cacheId() == cacheId)
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ /**
+ * @param cacheId Cache ID.
+ * @return {@code True} if cache start was requested.
+ */
+ public boolean cacheStarted(int cacheId) {
+ if (cachesToStart != null) {
+ for (ActionData cache : cachesToStart.values()) {
+ if (cache.desc.cacheId() == cacheId)
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ /**
+ * @param nodeId Local node ID.
+ * @return {@code True} if client cache was started.
+ */
+ public boolean clientCacheStarted(UUID nodeId) {
+ if (clientCachesToStart != null) {
+ for (ActionData cache : clientCachesToStart.values()) {
+ if (nodeId.equals(cache.req.initiatingNodeId()))
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ /**
+ * @param state New cluster state.
+ */
+ void newClusterState(ClusterState state) {
+ assert state != null;
+
+ newState = state;
+ }
+
+ /**
+ * @return New cluster state if state change was requested.
+ */
+ @Nullable public ClusterState newClusterState() {
+ return newState;
+ }
+
+ /**
+ * @param map Actions map.
+ * @param req Request.
+ * @param desc Cache descriptor.
+ * @return Actions map.
+ */
+ private Map<String, ActionData> add(Map<String, ActionData> map,
+ DynamicCacheChangeRequest req,
+ DynamicCacheDescriptor desc) {
+ assert req != null;
+ assert desc != null;
+
+ if (map == null)
+ map = new HashMap<>();
+
+ ActionData old = map.put(req.cacheName(), new ActionData(req, desc));
+
+ assert old == null : old;
+
+ return map;
+ }
+
+ /**
+ * @param req Request.
+ * @param desc Cache descriptor.
+ */
+ void addCacheToStart(DynamicCacheChangeRequest req, DynamicCacheDescriptor desc) {
+ assert req.start() : req;
+
+ cachesToStart = add(cachesToStart, req, desc);
+ }
+
+ /**
+ * @param req Request.
+ * @param desc Cache descriptor.
+ */
+ void addClientCacheToStart(DynamicCacheChangeRequest req, DynamicCacheDescriptor desc) {
+ assert req.start() : req;
+
+ clientCachesToStart = add(clientCachesToStart, req, desc);
+ }
+
+ /**
+ * @param req Request.
+ * @param desc Cache descriptor.
+ */
+ void addCacheToStop(DynamicCacheChangeRequest req, DynamicCacheDescriptor desc) {
+ assert req.stop() : req;
+
+ cachesToStop = add(cachesToStop, req, desc);
+ }
+
+ /**
+ * @param req Request.
+ * @param desc Cache descriptor.
+ */
+ void addCacheToClose(DynamicCacheChangeRequest req, DynamicCacheDescriptor desc) {
+ assert req.close() : req;
+
+ cachesToClose = add(cachesToClose, req, desc);
+ }
+
+ /**
+ * @param req Request.
+ * @param desc Cache descriptor.
+ */
+ void addCacheToResetLostPartitions(DynamicCacheChangeRequest req, DynamicCacheDescriptor desc) {
+ assert req.resetLostPartitions() : req;
+
+ cachesToResetLostParts = add(cachesToResetLostParts, req, desc);
+ }
+
+ /**
+ * @return {@code True} if there are no cache change actions.
+ */
+ public boolean empty() {
+ return F.isEmpty(cachesToStart) &&
+ F.isEmpty(clientCachesToStart) &&
+ F.isEmpty(cachesToStop) &&
+ F.isEmpty(cachesToClose) &&
+ F.isEmpty(cachesToResetLostParts);
+ }
+
+ /**
+ *
+ */
+ static class ActionData {
+ /** */
+ private DynamicCacheChangeRequest req;
+
+ /** */
+ private DynamicCacheDescriptor desc;
+
+ /**
+ * @param req Request.
+ * @param desc Cache descriptor.
+ */
+ ActionData(DynamicCacheChangeRequest req, DynamicCacheDescriptor desc) {
+ assert req != null;
+ assert desc != null;
+
+ this.req = req;
+ this.desc = desc;
+ }
+
+ /**
+ * @return Request.
+ */
+ public DynamicCacheChangeRequest request() {
+ return req;
+ }
+
+ /**
+ * @return Cache descriptor.
+ */
+ public DynamicCacheDescriptor descriptor() {
+ return desc;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index a0489fc..aa503b1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -234,8 +234,11 @@ public class GridCacheContext<K, V> implements Externalizable {
/** */
private CountDownLatch startLatch = new CountDownLatch(1);
- /** Start topology version. */
- private AffinityTopologyVersion startTopVer;
+ /** Topology version when cache was started on local node. */
+ private AffinityTopologyVersion locStartTopVer;
+
+ /** */
+ private UUID rcvdFrom;
/** Dynamic cache deployment ID. */
private IgniteUuid dynamicDeploymentId;
@@ -289,6 +292,8 @@ public class GridCacheContext<K, V> implements Externalizable {
GridCacheSharedContext sharedCtx,
CacheConfiguration cacheCfg,
CacheType cacheType,
+ AffinityTopologyVersion locStartTopVer,
+ UUID rcvdFrom,
boolean affNode,
boolean updatesAllowed,
MemoryPolicy memPlc,
@@ -316,6 +321,7 @@ public class GridCacheContext<K, V> implements Externalizable {
assert ctx != null;
assert sharedCtx != null;
assert cacheCfg != null;
+ assert locStartTopVer != null : cacheCfg.getName();
assert evtMgr != null;
assert storeMgr != null;
@@ -333,6 +339,8 @@ public class GridCacheContext<K, V> implements Externalizable {
this.sharedCtx = sharedCtx;
this.cacheCfg = cacheCfg;
this.cacheType = cacheType;
+ this.locStartTopVer = locStartTopVer;
+ this.rcvdFrom = rcvdFrom;
this.affNode = affNode;
this.updatesAllowed = updatesAllowed;
this.depEnabled = ctx.deploy().enabled() && !cacheObjects().isBinaryEnabled(cacheCfg);
@@ -452,17 +460,19 @@ public class GridCacheContext<K, V> implements Externalizable {
}
/**
- * @return Start topology version.
+ * @return Node ID cache was received from.
*/
- public AffinityTopologyVersion startTopologyVersion() {
- return startTopVer;
+ public UUID receivedFrom() {
+ return rcvdFrom;
}
/**
- * @param startTopVer Start topology version.
+ * @return Topology version when cache was started on local node.
*/
- public void startTopologyVersion(AffinityTopologyVersion startTopVer) {
- this.startTopVer = startTopVer;
+ public AffinityTopologyVersion startTopologyVersion() {
+ assert locStartTopVer != null : name();
+
+ return locStartTopVer;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 4775ea1..04c647f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -235,34 +235,12 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
if (customMsg instanceof DynamicCacheChangeBatch) {
DynamicCacheChangeBatch batch = (DynamicCacheChangeBatch)customMsg;
- Collection<DynamicCacheChangeRequest> valid = new ArrayList<>(batch.requests().size());
-
- // Validate requests to check if event should trigger partition exchange.
- for (final DynamicCacheChangeRequest req : batch.requests()) {
- if (req.exchangeNeeded())
- valid.add(req);
- else {
- IgniteInternalFuture<?> fut = null;
-
- if (req.cacheFutureTopologyVersion() != null)
- fut = affinityReadyFuture(req.cacheFutureTopologyVersion());
-
- if (fut == null || fut.isDone())
- cctx.cache().completeStartFuture(req);
- else {
- fut.listen(new CI1<IgniteInternalFuture<?>>() {
- @Override public void apply(IgniteInternalFuture<?> fut) {
- cctx.cache().completeStartFuture(req);
- }
- });
- }
- }
- }
+ ExchangeActions exchActions = batch.exchangeActions();
- if (!F.isEmpty(valid) && !(valid.size() == 1 && valid.iterator().next().globalStateChange())) {
+ if (exchActions != null) {
exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt.type());
- exchFut = exchangeFuture(exchId, evt, cache, valid, null);
+ exchFut = exchangeFuture(exchId, evt, cache, exchActions, null);
}
}
else if (customMsg instanceof CacheAffinityChangeMessage) {
@@ -385,10 +363,10 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
assert startTime > 0;
// Generate dummy discovery event for local node joining.
- T2<DiscoveryEvent, DiscoCache> localJoin = cctx.discovery().localJoin();
+ T2<DiscoveryEvent, DiscoCache> locJoin = cctx.discovery().localJoin();
- DiscoveryEvent discoEvt = localJoin.get1();
- DiscoCache discoCache = localJoin.get2();
+ DiscoveryEvent discoEvt = locJoin.get1();
+ DiscoCache discoCache = locJoin.get2();
GridDhtPartitionExchangeId exchId = initialExchangeId();
@@ -488,8 +466,10 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
}
+ AffinityTopologyVersion nodeStartVer = new AffinityTopologyVersion(discoEvt.topologyVersion(), 0);
+
for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
- if (cacheCtx.startTopologyVersion() == null)
+ if (nodeStartVer.equals(cacheCtx.startTopologyVersion()))
cacheCtx.preloader().onInitialExchangeComplete(null);
}
@@ -917,7 +897,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
if (exchId != null) {
AffinityTopologyVersion startTopVer = cacheCtx.startTopologyVersion();
- ready = startTopVer == null || startTopVer.compareTo(exchId.topologyVersion()) <= 0;
+ ready = startTopVer.compareTo(exchId.topologyVersion()) <= 0;
}
else
ready = cacheCtx.started();
@@ -1123,25 +1103,25 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
* @param exchId Exchange ID.
* @param discoEvt Discovery event.
* @param cache Discovery data cache.
- * @param reqs Cache change requests.
+ * @param exchActions Cache change actions.
* @param affChangeMsg Affinity change message.
* @return Exchange future.
*/
private GridDhtPartitionsExchangeFuture exchangeFuture(GridDhtPartitionExchangeId exchId,
@Nullable DiscoveryEvent discoEvt,
@Nullable DiscoCache cache,
- @Nullable Collection<DynamicCacheChangeRequest> reqs,
+ @Nullable ExchangeActions exchActions,
@Nullable CacheAffinityChangeMessage affChangeMsg) {
GridDhtPartitionsExchangeFuture fut;
GridDhtPartitionsExchangeFuture old = exchFuts.addx(
- fut = new GridDhtPartitionsExchangeFuture(cctx, busyLock, exchId, reqs, affChangeMsg));
+ fut = new GridDhtPartitionsExchangeFuture(cctx, busyLock, exchId, exchActions, affChangeMsg));
if (old != null) {
fut = old;
- if (reqs != null)
- fut.cacheChangeRequests(reqs);
+ if (exchActions != null)
+ fut.exchangeActions(exchActions);
if (affChangeMsg != null)
fut.affinityChangeMessage(affChangeMsg);
@@ -1320,9 +1300,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId);
- if (cacheCtx != null && cacheCtx.startTopologyVersion() != null &&
- entry.getValue() != null &&
- entry.getValue().topologyVersion() != null && // Backward compatibility.
+ if (cacheCtx != null &&
cacheCtx.startTopologyVersion().compareTo(entry.getValue().topologyVersion()) > 0)
continue;
[2/6] ignite git commit: Moved logic related to caches discovery data
handling to ClusterCachesInfo. Start of statically configured caches in the
same way as dynamic ones: from GridDhtPartitionsExchangeFuture.
Posted by sb...@apache.org.
http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
index e8094e1..5d82171 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
@@ -44,6 +44,9 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage {
/** */
private static final long serialVersionUID = 0L;
+ /** */
+ private long futId;
+
/** Topology version. */
private AffinityTopologyVersion topVer;
@@ -69,19 +72,30 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage {
}
/**
+ * @param futId Future ID.
* @param cacheId Cache ID.
* @param topVer Topology version.
* @param affAssignment Affinity assignment.
*/
- public GridDhtAffinityAssignmentResponse(int cacheId,
+ public GridDhtAffinityAssignmentResponse(
+ long futId,
+ int cacheId,
@NotNull AffinityTopologyVersion topVer,
List<List<ClusterNode>> affAssignment) {
+ this.futId = futId;
this.cacheId = cacheId;
this.topVer = topVer;
affAssignmentIds = ids(affAssignment);
}
+ /**
+ * @return Future ID.
+ */
+ public long futureId() {
+ return futId;
+ }
+
/** {@inheritDoc} */
@Override public boolean partitionExchangeMessage() {
return true;
@@ -181,7 +195,7 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 6;
+ return 7;
}
/**
@@ -239,12 +253,18 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage {
writer.incrementState();
case 4:
- if (!writer.writeByteArray("idealAffAssignmentBytes", idealAffAssignmentBytes))
+ if (!writer.writeLong("futId", futId))
return false;
writer.incrementState();
case 5:
+ if (!writer.writeByteArray("idealAffAssignmentBytes", idealAffAssignmentBytes))
+ return false;
+
+ writer.incrementState();
+
+ case 6:
if (!writer.writeMessage("topVer", topVer))
return false;
@@ -275,7 +295,7 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage {
reader.incrementState();
case 4:
- idealAffAssignmentBytes = reader.readByteArray("idealAffAssignmentBytes");
+ futId = reader.readLong("futId");
if (!reader.isLastRead())
return false;
@@ -283,6 +303,14 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage {
reader.incrementState();
case 5:
+ idealAffAssignmentBytes = reader.readByteArray("idealAffAssignmentBytes");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 6:
topVer = reader.readMessage("topVer");
if (!reader.isLastRead())
http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
index 4f94ae2..741ca5e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
@@ -23,6 +23,7 @@ import java.util.Collections;
import java.util.LinkedList;
import java.util.Queue;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
@@ -32,12 +33,11 @@ import org.apache.ignite.internal.GridNodeOrderComparator;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.managers.discovery.DiscoCache;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.X;
-import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.Nullable;
@@ -48,9 +48,6 @@ import static org.apache.ignite.internal.managers.communication.GridIoPolicy.AFF
* Future that fetches affinity assignment from remote cache nodes.
*/
public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffinityAssignmentResponse> {
- /** */
- private static final long serialVersionUID = 0L;
-
/** Logger reference. */
private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
@@ -58,6 +55,9 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffin
private static IgniteLogger log;
/** */
+ private static final AtomicLong idGen = new AtomicLong();
+
+ /** */
private final GridCacheSharedContext ctx;
/** List of available nodes this future can fetch data from. */
@@ -68,26 +68,33 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffin
private ClusterNode pendingNode;
/** */
- @GridToStringInclude
- private final T2<Integer, AffinityTopologyVersion> key;
+ private final long id;
+
+ /** */
+ private final AffinityTopologyVersion topVer;
+
+ /** */
+ private final int cacheId;
/**
* @param ctx Context.
- * @param cacheName Cache name.
+ * @param cacheDesc Cache descriptor.
* @param topVer Topology version.
* @param discoCache Discovery cache.
*/
public GridDhtAssignmentFetchFuture(
GridCacheSharedContext ctx,
- String cacheName,
+ DynamicCacheDescriptor cacheDesc,
AffinityTopologyVersion topVer,
DiscoCache discoCache
) {
+ this.topVer = topVer;
+ this.cacheId = cacheDesc.cacheId();
this.ctx = ctx;
- int cacheId = CU.cacheId(cacheName);
- this.key = new T2<>(cacheId, topVer);
- Collection<ClusterNode> availableNodes = discoCache.cacheAffinityNodes(cacheId);
+ id = idGen.getAndIncrement();
+
+ Collection<ClusterNode> availableNodes = discoCache.cacheAffinityNodes(cacheDesc.cacheId());
LinkedList<ClusterNode> tmp = new LinkedList<>();
@@ -105,19 +112,26 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffin
}
/**
- * Initializes fetch future.
+ * @return Cache ID.
*/
- public void init() {
- ctx.affinity().addDhtAssignmentFetchFuture(this);
+ public int cacheId() {
+ return cacheId;
+ }
- requestFromNextNode();
+ /**
+ * @return Future ID.
+ */
+ public long id() {
+ return id;
}
/**
- * @return Future key.
+ * Initializes fetch future.
*/
- public T2<Integer, AffinityTopologyVersion> key() {
- return key;
+ public void init() {
+ ctx.affinity().addDhtAssignmentFetchFuture(this);
+
+ requestFromNextNode();
}
/**
@@ -125,14 +139,6 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffin
* @param res Response.
*/
public void onResponse(UUID nodeId, GridDhtAffinityAssignmentResponse res) {
- if (!res.topologyVersion().equals(key.get2())) {
- if (log.isDebugEnabled())
- log.debug("Received affinity assignment for wrong topology version (will ignore) " +
- "[node=" + nodeId + ", res=" + res + ", topVer=" + key.get2() + ']');
-
- return;
- }
-
GridDhtAffinityAssignmentResponse res0 = null;
synchronized (this) {
@@ -188,7 +194,8 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffin
log0.debug("Sending affinity fetch request to remote node [locNodeId=" + ctx.localNodeId() +
", node=" + node + ']');
- ctx.io().send(node, new GridDhtAffinityAssignmentRequest(key.get1(), key.get2()),
+ ctx.io().send(node,
+ new GridDhtAffinityAssignmentRequest(id, cacheId, topVer),
AFFINITY_POOL);
// Close window for listener notification.
http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 4e699b3..8e79eda 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -393,7 +393,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
int num = cctx.affinity().partitions();
if (cctx.rebalanceEnabled()) {
- boolean added = exchFut.isCacheAdded(cctx.cacheId(), exchId.topologyVersion());
+ boolean added = exchFut.cacheAddedOnExchange(cctx.cacheId(), cctx.receivedFrom());
boolean first = (loc.equals(oldest) && loc.id().equals(exchId.nodeId()) && exchId.isJoined()) || added;
@@ -541,7 +541,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
cntrMap.clear();
// If this is the oldest node.
- if (oldest != null && (loc.equals(oldest) || exchFut.isCacheAdded(cctx.cacheId(), exchId.topologyVersion()))) {
+ if (oldest != null && (loc.equals(oldest) || exchFut.cacheAddedOnExchange(cctx.cacheId(), cctx.receivedFrom()))) {
if (node2part == null) {
node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq);
@@ -1156,10 +1156,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
// If for some nodes current partition has a newer map,
// then we keep the newer value.
if (newPart != null &&
- (newPart.updateSequence() < part.updateSequence() || (
- cctx.startTopologyVersion() != null &&
- newPart.topologyVersion() != null && // Backward compatibility.
- cctx.startTopologyVersion().compareTo(newPart.topologyVersion()) > 0))
+ (newPart.updateSequence() < part.updateSequence() ||
+ (cctx.startTopologyVersion().compareTo(newPart.topologyVersion()) > 0))
) {
if (log.isDebugEnabled())
log.debug("Overriding partition map in full update map [exchId=" + exchId + ", curPart=" +
@@ -1169,7 +1167,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
}
}
- //remove entry if node left
+ // Remove entry if node left.
for (Iterator<UUID> it = partMap.keySet().iterator(); it.hasNext(); ) {
UUID nodeId = it.next();
http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
index 6fb7df6..579796d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
@@ -122,6 +122,8 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag
boolean keepBinary,
boolean skipStore
) {
+ assert topVer.topologyVersion() > 0 : topVer;
+
this.cacheId = cacheId;
this.nodeId = nodeId;
this.futId = futId;
http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 654d306..b4cb3c1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -59,8 +59,8 @@ import org.apache.ignite.internal.processors.cache.CacheInvalidStateException;
import org.apache.ignite.internal.processors.cache.ClusterState;
import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
import org.apache.ignite.internal.processors.cache.CachePartitionExchangeWorkerTask;
-import org.apache.ignite.internal.processors.cache.DynamicCacheChangeRequest;
import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
+import org.apache.ignite.internal.processors.cache.ExchangeActions;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
@@ -110,9 +110,6 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
public static final int DUMP_PENDING_OBJECTS_THRESHOLD =
IgniteSystemProperties.getInteger(IgniteSystemProperties.IGNITE_DUMP_PENDING_OBJECTS_THRESHOLD, 10);
- /** */
- private static final long serialVersionUID = 0L;
-
/** Dummy flag. */
private final boolean dummy;
@@ -190,8 +187,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
/** Logger. */
private final IgniteLogger log;
- /** Dynamic cache change requests. */
- private Collection<DynamicCacheChangeRequest> reqs;
+ /** Cache change requests. */
+ private ExchangeActions exchActions;
/** */
private CacheAffinityChangeMessage affChangeMsg;
@@ -284,19 +281,20 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
* @param cctx Cache context.
* @param busyLock Busy lock.
* @param exchId Exchange ID.
- * @param reqs Cache change requests.
+ * @param exchActions Cache change requests.
* @param affChangeMsg Affinity change message.
*/
public GridDhtPartitionsExchangeFuture(
GridCacheSharedContext cctx,
ReadWriteLock busyLock,
GridDhtPartitionExchangeId exchId,
- Collection<DynamicCacheChangeRequest> reqs,
+ ExchangeActions exchActions,
CacheAffinityChangeMessage affChangeMsg
) {
assert busyLock != null;
assert exchId != null;
assert exchId.topologyVersion() != null;
+ assert exchActions == null || !exchActions.empty();
dummy = false;
forcePreload = false;
@@ -305,7 +303,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
this.cctx = cctx;
this.busyLock = busyLock;
this.exchId = exchId;
- this.reqs = reqs;
+ this.exchActions = exchActions;
this.affChangeMsg = affChangeMsg;
log = cctx.logger(getClass());
@@ -317,10 +315,13 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
}
/**
- * @param reqs Cache change requests.
+ * @param exchActions Exchange actions.
*/
- public void cacheChangeRequests(Collection<DynamicCacheChangeRequest> reqs) {
- this.reqs = reqs;
+ public void exchangeActions(ExchangeActions exchActions) {
+ assert exchActions == null || !exchActions.empty() : exchActions;
+ assert evtLatch != null && evtLatch.getCount() == 1L : this;
+
+ this.exchActions = exchActions;
}
/**
@@ -379,33 +380,19 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
/**
* @param cacheId Cache ID to check.
- * @param topVer Topology version.
+ * @param rcvdFrom Topology version.
* @return {@code True} if cache was added during this exchange.
*/
- public boolean isCacheAdded(int cacheId, AffinityTopologyVersion topVer) {
- if (cacheStarted(cacheId))
- return true;
-
- GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId);
-
- return cacheCtx != null && F.eq(cacheCtx.startTopologyVersion(), topVer);
+ public boolean cacheAddedOnExchange(int cacheId, UUID rcvdFrom) {
+ return dynamicCacheStarted(cacheId) || (exchId.isJoined() && exchId.nodeId().equals(rcvdFrom));
}
/**
* @param cacheId Cache ID.
* @return {@code True} if non-client cache was added during this exchange.
*/
- public boolean cacheStarted(int cacheId) {
- if (!F.isEmpty(reqs)) {
- for (DynamicCacheChangeRequest req : reqs) {
- if (req.start() && !req.clientStartOnly()) {
- if (CU.cacheId(req.cacheName()) == cacheId)
- return true;
- }
- }
- }
-
- return false;
+ public boolean dynamicCacheStarted(int cacheId) {
+ return exchActions != null && exchActions.cacheStarted(cacheId);
}
/**
@@ -435,14 +422,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
*
*/
public ClusterState newClusterState() {
- if (!F.isEmpty(reqs)) {
- for (DynamicCacheChangeRequest req : reqs) {
- if (req.globalStateChange())
- return req.state();
- }
- }
-
- return null;
+ return exchActions != null ? exchActions.newClusterState() : null;
}
/**
@@ -524,7 +504,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
DiscoveryCustomMessage msg = ((DiscoveryCustomEvent)discoEvt).customMessage();
if (msg instanceof DynamicCacheChangeBatch){
- assert !F.isEmpty(reqs);
+ assert exchActions != null && !exchActions.empty();
exchange = onCacheChangeRequest(crdNode);
}
@@ -540,10 +520,15 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
}
else {
if (discoEvt.type() == EVT_NODE_JOINED) {
- Collection<DynamicCacheDescriptor> receivedCaches = cctx.cache().startReceivedCaches(topVer);
+ if (!discoEvt.eventNode().isLocal()) {
+ Collection<DynamicCacheDescriptor> receivedCaches = cctx.cache().startReceivedCaches(
+ discoEvt.eventNode().id(),
+ topVer);
- if (!discoEvt.eventNode().isLocal())
cctx.affinity().initStartedCaches(crdNode, this, receivedCaches);
+ }
+ else
+ cctx.cache().startCachesOnLocalJoin(topVer);
}
exchange = CU.clientNode(discoEvt.eventNode()) ?
@@ -553,20 +538,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
updateTopologies(crdNode);
- if (!F.isEmpty(reqs)) {
- boolean hasStop = false;
-
- for (DynamicCacheChangeRequest req : reqs) {
- if (req.stop()) {
- hasStop = true;
-
- break;
- }
- }
-
- if (hasStop)
- cctx.cache().context().database().beforeCachesStop();
- }
+ if (exchActions != null && exchActions.hasStop())
+ cctx.cache().context().database().beforeCachesStop();
switch (exchange) {
case ALL: {
@@ -654,8 +627,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
GridDhtPartitionTopology top = cacheCtx.topology();
if (crd) {
- boolean updateTop = !cacheCtx.isLocal() &&
- exchId.topologyVersion().equals(cacheCtx.startTopologyVersion());
+ boolean updateTop = exchId.topologyVersion().equals(cacheCtx.startTopologyVersion());
if (updateTop && clientTop != null)
top.update(exchId, clientTop.partitionMap(true), clientTop.updateCounters(false));
@@ -674,32 +646,21 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
* @throws IgniteCheckedException If failed.
*/
private ExchangeType onCacheChangeRequest(boolean crd) throws IgniteCheckedException {
- assert !F.isEmpty(reqs) : this;
+ assert exchActions != null && !exchActions.empty() : this;
GridClusterStateProcessor stateProc = cctx.kernalContext().state();
- if (exchangeOnChangeGlobalState = stateProc.changeGlobalState(reqs, topologyVersion())) {
+ if (exchangeOnChangeGlobalState = stateProc.changeGlobalState(exchActions, topologyVersion())) {
changeGlobalStateE = stateProc.onChangeGlobalState();
if (crd && changeGlobalStateE != null)
changeGlobalStateExceptions.put(cctx.localNodeId(), changeGlobalStateE);
}
- boolean clientOnly = cctx.affinity().onCacheChangeRequest(this, crd, reqs);
-
- if (clientOnly) {
- boolean clientCacheStarted = false;
-
- for (DynamicCacheChangeRequest req : reqs) {
- if (req.start() && req.clientStartOnly() && req.initiatingNodeId().equals(cctx.localNodeId())) {
- clientCacheStarted = true;
+ boolean clientOnly = cctx.affinity().onCacheChangeRequest(this, crd, exchActions);
- break;
- }
- }
-
- return clientCacheStarted ? ExchangeType.CLIENT : ExchangeType.NONE;
- }
+ if (clientOnly)
+ return exchActions.clientCacheStarted(cctx.localNodeId()) ? ExchangeType.CLIENT : ExchangeType.NONE;
else
return cctx.kernalContext().clientNode() ? ExchangeType.CLIENT : ExchangeType.ALL;
}
@@ -768,7 +729,6 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
private void clientOnlyExchange() throws IgniteCheckedException {
clientOnlyExchange = true;
- //todo checl invoke on client
if (crd != null) {
if (crd.isLocal()) {
for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
@@ -1046,19 +1006,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
* @return {@code True} if cache is stopping by this exchange.
*/
public boolean stopping(int cacheId) {
- boolean stopping = false;
-
- if (!F.isEmpty(reqs)) {
- for (DynamicCacheChangeRequest req : reqs) {
- if (cacheId == CU.cacheId(req.cacheName())) {
- stopping = req.stop();
-
- break;
- }
- }
- }
-
- return stopping;
+ return exchActions != null && exchActions.cacheStopped(cacheId);
}
/**
@@ -1069,13 +1017,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
assert node != null;
// Reset lost partition before send local partition to coordinator.
- if (!F.isEmpty(reqs)) {
- Set<String> caches = new HashSet<>();
-
- for (DynamicCacheChangeRequest req : reqs) {
- if (req.resetLostPartitions())
- caches.add(req.cacheName());
- }
+ if (exchActions != null) {
+ Set<String> caches = exchActions.cachesToResetLostPartitions();
if (!F.isEmpty(caches))
resetLostPartitions(caches);
@@ -1203,14 +1146,12 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
cacheValidRes = m;
}
- cctx.cache().onExchangeDone(exchId.topologyVersion(), reqs, err);
+ cctx.cache().onExchangeDone(exchId.topologyVersion(), exchActions, err);
cctx.exchange().onExchangeDone(this, err);
- if (!F.isEmpty(reqs) && err == null) {
- for (DynamicCacheChangeRequest req : reqs)
- cctx.cache().completeStartFuture(req);
- }
+ if (exchActions != null && err == null)
+ exchActions.completeRequestFutures(cctx);
if (exchangeOnChangeGlobalState && err == null)
cctx.kernalContext().state().onExchangeDone();
@@ -1227,7 +1168,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
cacheCtx.config().getAffinity().removeNode(exchId.nodeId());
}
- reqs = null;
+ exchActions = null;
if (discoEvt instanceof DiscoveryCustomEvent)
((DiscoveryCustomEvent)discoEvt).customMessage(null);
@@ -1615,20 +1556,15 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
assert discoEvt instanceof DiscoveryCustomEvent;
if (((DiscoveryCustomEvent)discoEvt).customMessage() instanceof DynamicCacheChangeBatch) {
- DynamicCacheChangeBatch batch = (DynamicCacheChangeBatch)((DiscoveryCustomEvent)discoEvt)
- .customMessage();
+ if (exchActions != null) {
+ if (exchActions.newClusterState() == ClusterState.ACTIVE)
+ assignPartitionsStates();
- Set<String> caches = new HashSet<>();
+ Set<String> caches = exchActions.cachesToResetLostPartitions();
- for (DynamicCacheChangeRequest req : batch.requests()) {
- if (req.resetLostPartitions())
- caches.add(req.cacheName());
- else if (req.globalStateChange() && req.state() != ClusterState.INACTIVE)
- assignPartitionsStates();
+ if (!F.isEmpty(caches))
+ resetLostPartitions(caches);
}
-
- if (!F.isEmpty(caches))
- resetLostPartitions(caches);
}
}
else if (discoEvt.type() == EVT_NODE_LEFT || discoEvt.type() == EVT_NODE_FAILED)
http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 9f1b96e..57616ed 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -608,7 +608,9 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
AffinityAssignment assignment = cctx.affinity().assignment(topVer);
- GridDhtAffinityAssignmentResponse res = new GridDhtAffinityAssignmentResponse(cctx.cacheId(),
+ GridDhtAffinityAssignmentResponse res = new GridDhtAffinityAssignmentResponse(
+ req.futureId(),
+ cctx.cacheId(),
topVer,
assignment.assignment());
http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java
index 5e3dc3f..8b538ee 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java
@@ -146,35 +146,8 @@ public class GridLocalCache<K, V> extends GridCacheAdapter<K, V> {
GridLocalLockFuture<K, V> fut = new GridLocalLockFuture<>(ctx, keys, tx, this, timeout, filter);
try {
- for (KeyCacheObject key : keys) {
- while (true) {
- GridLocalCacheEntry entry = null;
-
- try {
- entry = entryExx(key);
-
- entry.unswap(false);
-
- if (!ctx.isAll(entry, filter)) {
- fut.onFailed();
-
- return fut;
- }
-
- // Removed exception may be thrown here.
- GridCacheMvccCandidate cand = fut.addEntry(entry);
-
- if (cand == null && fut.isDone())
- return fut;
-
- break;
- }
- catch (GridCacheEntryRemovedException ignored) {
- if (log().isDebugEnabled())
- log().debug("Got removed entry in lockAsync(..) method (will retry): " + entry);
- }
- }
- }
+ if (!fut.addEntries(keys))
+ return fut;
if (!ctx.mvcc().addFuture(fut))
fut.onError(new IgniteCheckedException("Duplicate future ID (internal error): " + fut));
http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java
index 59d0adb..9641533 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java
@@ -144,12 +144,51 @@ public final class GridLocalLockFuture<K, V> extends GridCacheFutureAdapter<Bool
if (log == null)
log = U.logger(cctx.kernalContext(), logRef, GridLocalLockFuture.class);
+ }
+
+ /**
+ * @param keys Keys.
+ * @return {@code False} in case of error.
+ * @throws IgniteCheckedException If failed.
+ */
+ public boolean addEntries(Collection<KeyCacheObject> keys) throws IgniteCheckedException {
+ for (KeyCacheObject key : keys) {
+ while (true) {
+ GridLocalCacheEntry entry = null;
+
+ try {
+ entry = cache.entryExx(key);
+
+ entry.unswap(false);
+
+ if (!cctx.isAll(entry, filter)) {
+ onFailed();
+
+ return false;
+ }
+
+ // Removed exception may be thrown here.
+ GridCacheMvccCandidate cand = addEntry(entry);
+
+ if (cand == null && isDone())
+ return false;
+
+ break;
+ }
+ catch (GridCacheEntryRemovedException ignored) {
+ if (log.isDebugEnabled())
+ log.debug("Got removed entry in lockAsync(..) method (will retry): " + entry);
+ }
+ }
+ }
if (timeout > 0) {
timeoutObj = new LockTimeoutObject();
cctx.time().addTimeoutObject(timeoutObj);
}
+
+ return true;
}
/** {@inheritDoc} */
@@ -216,7 +255,7 @@ public final class GridLocalLockFuture<K, V> extends GridCacheFutureAdapter<Bool
* @return Lock candidate.
* @throws GridCacheEntryRemovedException If entry was removed.
*/
- @Nullable GridCacheMvccCandidate addEntry(GridLocalCacheEntry entry)
+ private @Nullable GridCacheMvccCandidate addEntry(GridLocalCacheEntry entry)
throws GridCacheEntryRemovedException {
// Add local lock first, as it may throw GridCacheEntryRemovedException.
GridCacheMvccCandidate c = entry.addLocal(
http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index 0f7b0df..fcf534c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -272,8 +272,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
qryTopVer = cctx.startTopologyVersion();
- if (qryTopVer == null)
- qryTopVer = new AffinityTopologyVersion(cctx.localNode().order(), 0);
+ assert qryTopVer != null : cctx.name();
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index efb02c6..e7706dd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -561,16 +561,19 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
}
/**
- * Wait topology.
+ * @param ctx Context.
+ * @throws IgniteCheckedException In case of error.
*/
- public void waitTopologyFuture(GridKernalContext ctx) throws IgniteCheckedException {
+ void waitTopologyFuture(GridKernalContext ctx) throws IgniteCheckedException {
GridCacheContext<K, V> cctx = cacheContext(ctx);
if (!cctx.isLocal()) {
- cacheContext(ctx).affinity().affinityReadyFuture(initTopVer).get();
+ AffinityTopologyVersion topVer = initTopVer;
+
+ cacheContext(ctx).affinity().affinityReadyFuture(topVer).get();
for (int partId = 0; partId < cacheContext(ctx).affinity().partitions(); partId++)
- getOrCreatePartitionRecovery(ctx, partId);
+ getOrCreatePartitionRecovery(ctx, partId, topVer);
}
}
@@ -736,7 +739,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
Collections.<CacheEntryEvent<? extends K, ? extends V>>emptyList();
}
- PartitionRecovery rec = getOrCreatePartitionRecovery(ctx, e.partition());
+ PartitionRecovery rec = getOrCreatePartitionRecovery(ctx, e.partition(), e.topologyVersion());
return rec.collectEntries(e, cctx, cache);
}
@@ -869,37 +872,40 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
/**
* @param ctx Context.
* @param partId Partition id.
+ * @param topVer Topology version for current operation.
* @return Partition recovery.
*/
- @NotNull private PartitionRecovery getOrCreatePartitionRecovery(GridKernalContext ctx, int partId) {
+ @NotNull private PartitionRecovery getOrCreatePartitionRecovery(GridKernalContext ctx,
+ int partId,
+ AffinityTopologyVersion topVer) {
+ assert topVer != null && topVer.topologyVersion() > 0 : topVer;
+
PartitionRecovery rec = rcvs.get(partId);
if (rec == null) {
T2<Long, Long> partCntrs = null;
- AffinityTopologyVersion initTopVer0 = initTopVer;
+ Map<UUID, Map<Integer, T2<Long, Long>>> initUpdCntrsPerNode = this.initUpdCntrsPerNode;
- if (initTopVer0 != null) {
+ if (initUpdCntrsPerNode != null) {
GridCacheContext<K, V> cctx = cacheContext(ctx);
GridCacheAffinityManager aff = cctx.affinity();
- if (initUpdCntrsPerNode != null) {
- for (ClusterNode node : aff.nodesByPartition(partId, initTopVer)) {
- Map<Integer, T2<Long, Long>> map = initUpdCntrsPerNode.get(node.id());
+ for (ClusterNode node : aff.nodesByPartition(partId, topVer)) {
+ Map<Integer, T2<Long, Long>> map = initUpdCntrsPerNode.get(node.id());
- if (map != null) {
- partCntrs = map.get(partId);
+ if (map != null) {
+ partCntrs = map.get(partId);
- break;
- }
+ break;
}
}
- else if (initUpdCntrs != null)
- partCntrs = initUpdCntrs.get(partId);
}
+ else if (initUpdCntrs != null)
+ partCntrs = initUpdCntrs.get(partId);
- rec = new PartitionRecovery(ctx.log(CU.CONTINUOUS_QRY_LOG_CATEGORY), initTopVer0,
+ rec = new PartitionRecovery(ctx.log(CU.CONTINUOUS_QRY_LOG_CATEGORY), topVer,
partCntrs != null ? partCntrs.get2() : null);
PartitionRecovery oldRec = rcvs.putIfAbsent(partId, rec);
http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index 8377754..acf351f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -55,6 +55,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheManagerAdapter;
+import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.database.CacheDataRow;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateFuture;
@@ -340,8 +341,14 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
updateCntr,
topVer);
- CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent<>(
- cctx.kernalContext().cache().jcache(cctx.name()), cctx, e0);
+ IgniteCacheProxy jcache = cctx.kernalContext().cache().jcacheProxy(cctx.name());
+
+ assert jcache != null : "Failed to get cache proxy [name=" + cctx.name() +
+ ", locStart=" + cctx.startTopologyVersion() +
+ ", locNode=" + cctx.localNode() +
+ ", stopping=" + cctx.kernalContext().isStopping();
+
+ CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent<>(jcache, cctx, e0);
lsnr.onEntryUpdated(evt, primary, recordIgniteEvt, fut);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
index 1286ba9..b25b229 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
@@ -49,6 +49,7 @@ import org.apache.ignite.internal.processors.cache.ChangeGlobalStateMessage;
import org.apache.ignite.internal.processors.cache.ClusterState;
import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
import org.apache.ignite.internal.processors.cache.DynamicCacheChangeRequest;
+import org.apache.ignite.internal.processors.cache.ExchangeActions;
import org.apache.ignite.internal.processors.cache.GridCacheProcessor;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.GridChangeGlobalStateMessageResponse;
@@ -279,9 +280,7 @@ public class GridClusterStateProcessor extends GridProcessorAdapter {
List<DynamicCacheChangeRequest> reqs = new ArrayList<>();
DynamicCacheChangeRequest changeGlobalStateReq = new DynamicCacheChangeRequest(
- requestId, null, ctx.localNodeId());
-
- changeGlobalStateReq.state(activate ? ACTIVE : INACTIVE);
+ requestId, activate ? ACTIVE : INACTIVE, ctx.localNodeId());
reqs.add(changeGlobalStateReq);
@@ -329,26 +328,25 @@ public class GridClusterStateProcessor extends GridProcessorAdapter {
}
/**
- * @param reqs Requests.
+ * @param exchActions Requests.
+ * @param topVer Exchange topology version.
*/
public boolean changeGlobalState(
- Collection<DynamicCacheChangeRequest> reqs,
+ ExchangeActions exchActions,
AffinityTopologyVersion topVer
) {
- assert !F.isEmpty(reqs);
+ assert exchActions != null;
assert topVer != null;
- for (DynamicCacheChangeRequest req : reqs)
- if (req.globalStateChange()) {
- ChangeGlobalStateContext cgsCtx = lastCgsCtx;
-
- assert cgsCtx != null : "reqs: " + Arrays.toString(reqs.toArray());
+ if (exchActions.newClusterState() != null) {
+ ChangeGlobalStateContext cgsCtx = lastCgsCtx;
- cgsCtx.topologyVersion(topVer);
+ assert cgsCtx != null : exchActions;
- return true;
- }
+ cgsCtx.topologyVersion(topVer);
+ return true;
+ }
return false;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
index 81f5c28..59c2656 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
@@ -427,7 +427,7 @@ public abstract class IgniteSpiAdapter implements IgniteSpi {
*/
protected final void unregisterMBean() throws IgniteSpiException {
// Unregister SPI MBean.
- if (spiMBean != null) {
+ if (spiMBean != null && ignite != null) {
MBeanServer jmx = ignite.configuration().getMBeanServer();
assert jmx != null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryDataBag.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryDataBag.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryDataBag.java
index 96df255..803beed 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryDataBag.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryDataBag.java
@@ -188,14 +188,15 @@ public class DiscoveryDataBag {
}
/**
- *
+ * @return ID of joining node.
*/
public UUID joiningNodeId() {
return joiningNodeId;
}
/**
- * @param cmpId component ID.
+ * @param cmpId Component ID.
+ * @return Discovery data for given component.
*/
public GridDiscoveryData gridDiscoveryData(int cmpId) {
if (gridData == null)
@@ -207,7 +208,8 @@ public class DiscoveryDataBag {
}
/**
- * @param cmpId component ID.
+ * @param cmpId Component ID.
+ * @return Joining node discovery data.
*/
public JoiningNodeDiscoveryData newJoinerDiscoveryData(int cmpId) {
if (newJoinerData == null)
@@ -219,7 +221,7 @@ public class DiscoveryDataBag {
}
/**
- * @param cmpId component ID.
+ * @param cmpId Component ID.
* @param data Data.
*/
public void addJoiningNodeData(Integer cmpId, Serializable data) {
@@ -227,7 +229,7 @@ public class DiscoveryDataBag {
}
/**
- * @param cmpId component ID.
+ * @param cmpId Component ID.
* @param data Data.
*/
public void addGridCommonData(Integer cmpId, Serializable data) {
@@ -235,7 +237,7 @@ public class DiscoveryDataBag {
}
/**
- * @param cmpId component ID.
+ * @param cmpId Component ID.
* @param data Data.
*/
public void addNodeSpecificData(Integer cmpId, Serializable data) {
@@ -246,7 +248,8 @@ public class DiscoveryDataBag {
}
/**
- * @param cmpId component ID.
+ * @param cmpId Component ID.
+ * @return {@code True} if common data collected for given component.
*/
public boolean commonDataCollectedFor(Integer cmpId) {
assert cmnDataInitializedCmps != null;
@@ -295,5 +298,4 @@ public class DiscoveryDataBag {
@Nullable public Map<Integer, Serializable> localNodeSpecificData() {
return nodeSpecificData.get(DEFAULT_KEY);
}
-
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index cc581e1..2a55412 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -1981,7 +1981,9 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi {
*
*/
void printStopInfo() {
- if (log.isDebugEnabled())
+ IgniteLogger log = this.log;
+
+ if (log != null && log.isDebugEnabled())
log.debug(stopInfo());
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/test/config/examples.properties
----------------------------------------------------------------------
diff --git a/modules/core/src/test/config/examples.properties b/modules/core/src/test/config/examples.properties
index c584f72..ea0d8ed 100644
--- a/modules/core/src/test/config/examples.properties
+++ b/modules/core/src/test/config/examples.properties
@@ -22,3 +22,4 @@ ScalarCacheExample=examples/config/example-ignite.xml
ScalarCacheQueryExample=examples/config/example-ignite.xml
ScalarCountGraphTrianglesExample=examples/config/example-ignite.xml
ScalarPopularNumbersRealTimeExample=examples/config/example-ignite.xml
+MemoryPolicyExample=examples/config/example-memory-policies.xml
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityFunctionBackupFilterAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityFunctionBackupFilterAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityFunctionBackupFilterAbstractSelfTest.java
index 2110c28..99e80ca 100644
--- a/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityFunctionBackupFilterAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityFunctionBackupFilterAbstractSelfTest.java
@@ -78,13 +78,13 @@ public abstract class AffinityFunctionBackupFilterAbstractSelfTest extends GridC
Map<String, Integer> backupAssignedAttribute = getAttributeStatistic(assigned);
- String nodeAttributeValue = node.attribute(SPLIT_ATTRIBUTE_NAME);
+ String nodeAttributeVal = node.attribute(SPLIT_ATTRIBUTE_NAME);
- if (FIRST_NODE_GROUP.equals(nodeAttributeValue)
+ if (FIRST_NODE_GROUP.equals(nodeAttributeVal)
&& backupAssignedAttribute.get(FIRST_NODE_GROUP) < 2)
return true;
- return backupAssignedAttribute.get(nodeAttributeValue).equals(0);
+ return backupAssignedAttribute.get(nodeAttributeVal).equals(0);
}
};
@@ -107,10 +107,11 @@ public abstract class AffinityFunctionBackupFilterAbstractSelfTest extends GridC
String val = assignedNode.attribute(SPLIT_ATTRIBUTE_NAME);
- Integer count = backupAssignedAttribute.get(val);
+ Integer cnt = backupAssignedAttribute.get(val);
- backupAssignedAttribute.put(val, count + 1);
+ backupAssignedAttribute.put(val, cnt + 1);
}
+
return backupAssignedAttribute;
}
@@ -157,6 +158,7 @@ public abstract class AffinityFunctionBackupFilterAbstractSelfTest extends GridC
*/
public void testPartitionDistribution() throws Exception {
backups = 1;
+
try {
for (int i = 0; i < 3; i++) {
splitAttrVal = "A";
@@ -205,6 +207,7 @@ public abstract class AffinityFunctionBackupFilterAbstractSelfTest extends GridC
*/
public void testPartitionDistributionWithAffinityBackupFilter() throws Exception {
backups = 3;
+
try {
for (int i = 0; i < 2; i++) {
splitAttrVal = FIRST_NODE_GROUP;
http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/test/java/org/apache/ignite/internal/GridNodeMetricsLogSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridNodeMetricsLogSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridNodeMetricsLogSelfTest.java
index 13fae24..b6114d1 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/GridNodeMetricsLogSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/GridNodeMetricsLogSelfTest.java
@@ -35,12 +35,6 @@ import org.apache.log4j.WriterAppender;
@SuppressWarnings({"ProhibitedExceptionDeclared"})
@GridCommonTest(group = "Kernal")
public class GridNodeMetricsLogSelfTest extends GridCommonAbstractTest {
- /** */
-
- public GridNodeMetricsLogSelfTest() {
- super(false);
- }
-
/** {@inheritDoc} */
@SuppressWarnings({"unchecked"})
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
@@ -51,6 +45,7 @@ public class GridNodeMetricsLogSelfTest extends GridCommonAbstractTest {
return cfg;
}
+ /** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
stopAllGrids();
}
@@ -80,11 +75,11 @@ public class GridNodeMetricsLogSelfTest extends GridCommonAbstractTest {
cache2.put(2, "two");
- Thread.sleep(10000);
+ Thread.sleep(10_000);
//Check that nodes are alie
- assert cache1.get(1).equals("one");
- assert cache2.get(2).equals("two");
+ assertEquals("one", cache1.get(1));
+ assertEquals("two", cache2.get(2));
String fullLog = strWr.toString();
http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractLocalStoreSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractLocalStoreSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractLocalStoreSelfTest.java
index 7dce36b..ae9986d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractLocalStoreSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractLocalStoreSelfTest.java
@@ -28,6 +28,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.cache.Cache;
+import javax.cache.configuration.Factory;
import javax.cache.expiry.CreatedExpiryPolicy;
import javax.cache.expiry.Duration;
import javax.cache.integration.CacheLoaderException;
@@ -53,6 +54,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
@@ -169,18 +171,7 @@ public abstract class GridCacheAbstractLocalStoreSelfTest extends GridCommonAbst
cacheCfg.setRebalanceMode(SYNC);
- if (igniteInstanceName.endsWith("1"))
- cacheCfg.setCacheStoreFactory(singletonFactory(LOCAL_STORE_1));
- else if (igniteInstanceName.endsWith("2"))
- cacheCfg.setCacheStoreFactory(singletonFactory(LOCAL_STORE_2));
- else if (igniteInstanceName.endsWith("3"))
- cacheCfg.setCacheStoreFactory(singletonFactory(LOCAL_STORE_3));
- else if (igniteInstanceName.endsWith("4"))
- cacheCfg.setCacheStoreFactory(singletonFactory(LOCAL_STORE_4));
- else if (igniteInstanceName.endsWith("5"))
- cacheCfg.setCacheStoreFactory(singletonFactory(LOCAL_STORE_5));
- else
- cacheCfg.setCacheStoreFactory(singletonFactory(LOCAL_STORE_6));
+ cacheCfg.setCacheStoreFactory(new StoreFactory());
cacheCfg.setWriteThrough(true);
cacheCfg.setReadThrough(true);
@@ -840,4 +831,30 @@ public abstract class GridCacheAbstractLocalStoreSelfTest extends GridCommonAbst
map.clear();
}
}
+
+ /**
+ *
+ */
+ static class StoreFactory implements Factory<CacheStore> {
+ /** */
+ @IgniteInstanceResource
+ private Ignite node;
+
+ @Override public CacheStore create() {
+ String igniteInstanceName = node.configuration().getIgniteInstanceName();
+
+ if (igniteInstanceName.endsWith("1"))
+ return LOCAL_STORE_1;
+ else if (igniteInstanceName.endsWith("2"))
+ return LOCAL_STORE_2;
+ else if (igniteInstanceName.endsWith("3"))
+ return LOCAL_STORE_3;
+ else if (igniteInstanceName.endsWith("4"))
+ return LOCAL_STORE_4;
+ else if (igniteInstanceName.endsWith("5"))
+ return LOCAL_STORE_5;
+ else
+ return LOCAL_STORE_6;
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingNearErrorTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingNearErrorTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingNearErrorTest.java
index a4a831f..546ec06 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingNearErrorTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingNearErrorTest.java
@@ -44,13 +44,13 @@ public class IgniteCacheP2pUnmarshallingNearErrorTest extends IgniteCacheP2pUnma
/** {@inheritDoc} */
@Override public void testResponseMessageOnUnmarshallingFailed() throws InterruptedException {
- //GridCacheEvictionRequest unmarshalling failed test
- readCnt.set(5); //2 for each put
+ //GridCacheEvictionRequest unmarshalling failed test.
+ readCnt.set(5); //2 for each put.
jcache(0).put(new TestKey(String.valueOf(++key)), "");
jcache(0).put(new TestKey(String.valueOf(++key)), "");
- //Eviction request unmarshalling failed but ioManager does not hangs up.
+ // Eviction request unmarshalling failed but ioManager does not hangs up.
// Wait for eviction complete.
Thread.sleep(1000);
http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheStartTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheStartTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheStartTest.java
new file mode 100644
index 0000000..eb8077f
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheStartTest.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ *
+ */
+public class IgniteCacheStartTest extends GridCommonAbstractTest {
+ /** */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private static final String CACHE_NAME = "c1";
+
+ /** */
+ private boolean client;
+
+ /** */
+ private CacheConfiguration ccfg;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+
+ cfg.setClientMode(client);
+
+ if (ccfg != null)
+ cfg.setCacheConfiguration(ccfg);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+
+ super.afterTest();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @SuppressWarnings("unchecked")
+ public void testStartAndNodeJoin() throws Exception {
+ Ignite node0 = startGrid(0);
+
+ checkCache(0, CACHE_NAME, false);
+
+ node0.createCache(cacheConfiguration(CACHE_NAME));
+
+ checkCache(0, CACHE_NAME, true);
+
+ startGrid(1);
+
+ checkCache(0, CACHE_NAME, true);
+ checkCache(1, CACHE_NAME, true);
+
+ client = true;
+
+ startGrid(2);
+
+ checkCache(0, CACHE_NAME, true);
+ checkCache(1, CACHE_NAME, true);
+ checkCache(2, CACHE_NAME, false);
+
+ ignite(2).destroyCache(CACHE_NAME);
+
+ checkCache(0, CACHE_NAME, false);
+ checkCache(1, CACHE_NAME, false);
+ checkCache(2, CACHE_NAME, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testStartFromJoiningNode1() throws Exception {
+ checkStartFromJoiningNode(false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testStartFromJoiningNode2() throws Exception {
+ checkStartFromJoiningNode(true);
+ }
+
+ /**
+ * @param joinClient {@code True} if client node joins.
+ * @throws Exception If failed.
+ */
+ private void checkStartFromJoiningNode(boolean joinClient) throws Exception {
+ startGrid(0);
+ startGrid(1);
+
+ client = true;
+
+ startGrid(2);
+
+ ccfg = cacheConfiguration(CACHE_NAME);
+ client = joinClient;
+
+ startGrid(3);
+
+ checkCache(0, CACHE_NAME, true);
+ checkCache(1, CACHE_NAME, true);
+ checkCache(2, CACHE_NAME, false);
+ checkCache(3, CACHE_NAME, true);
+
+ client = false;
+ ccfg = null;
+
+ startGrid(4);
+
+ checkCache(0, CACHE_NAME, true);
+ checkCache(1, CACHE_NAME, true);
+ checkCache(2, CACHE_NAME, false);
+ checkCache(3, CACHE_NAME, true);
+ checkCache(4, CACHE_NAME, true);
+
+ client = true;
+
+ startGrid(5);
+
+ checkCache(0, CACHE_NAME, true);
+ checkCache(1, CACHE_NAME, true);
+ checkCache(2, CACHE_NAME, false);
+ checkCache(3, CACHE_NAME, true);
+ checkCache(4, CACHE_NAME, true);
+ checkCache(5, CACHE_NAME, false);
+
+ ignite(5).destroyCache(CACHE_NAME);
+
+ for (int i = 0; i < 5; i++)
+ checkCache(i, CACHE_NAME, false);
+ }
+
+ /**
+ * @param cacheName Cache name.
+ * @return Cache configuration.
+ */
+ private CacheConfiguration cacheConfiguration(String cacheName) {
+ return new CacheConfiguration(cacheName);
+ }
+
+ /**
+ * @param idx Node index.
+ * @param cacheName Cache name.
+ * @param expCache {@code True} if cache should be created.
+ */
+ private void checkCache(int idx, final String cacheName, final boolean expCache) throws IgniteInterruptedCheckedException {
+ final IgniteKernal node = (IgniteKernal)ignite(idx);
+
+ assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return expCache == (node.context().cache().cache(cacheName) != null);
+ }
+ }, 1000));
+
+ assertNotNull(node.context().cache().cache(CU.UTILITY_CACHE_NAME));
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
index 4a34a1d..e7c5ca5 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
@@ -1027,7 +1027,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
* @param nearOnly Near only flag.
* @throws Exception If failed.
*/
- public void checkGetOrCreateNear(final boolean nearOnly) throws Exception {
+ private void checkGetOrCreateNear(final boolean nearOnly) throws Exception {
try {
final AtomicInteger cnt = new AtomicInteger(nodeCount());
final AtomicReference<Throwable> err = new AtomicReference<>();
http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTopologyValidatorGridSplitCacheTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTopologyValidatorGridSplitCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTopologyValidatorGridSplitCacheTest.java
index fd77309..057b0d6 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTopologyValidatorGridSplitCacheTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTopologyValidatorGridSplitCacheTest.java
@@ -130,7 +130,8 @@ public class IgniteTopologyValidatorGridSplitCacheTest extends GridCommonAbstrac
/**
* Tests topology split scenario.
- * @throws Exception
+ *
+ * @throws Exception If failed.
*/
public void testTopologyValidator() throws Exception {
assertTrue(initLatch.await(10, TimeUnit.SECONDS));
@@ -242,12 +243,15 @@ public class IgniteTopologyValidatorGridSplitCacheTest extends GridCommonAbstrac
/** */
private static final long serialVersionUID = 0L;
+ /** */
@CacheNameResource
private String cacheName;
+ /** */
@IgniteInstanceResource
private Ignite ignite;
+ /** */
@LoggerResource
private IgniteLogger log;
@@ -263,7 +267,7 @@ public class IgniteTopologyValidatorGridSplitCacheTest extends GridCommonAbstrac
}).isEmpty())
return false;
- IgniteKernal kernal = (IgniteKernal)ignite.cache(cacheName).unwrap(Ignite.class);
+ IgniteKernal kernal = (IgniteKernal)ignite;
GridDhtCacheAdapter<Object, Object> dht = kernal.context().cache().internalCache(cacheName).context().dht();
http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheDiscoveryDataConcurrentJoinTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheDiscoveryDataConcurrentJoinTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheDiscoveryDataConcurrentJoinTest.java
new file mode 100644
index 0000000..a80830a
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheDiscoveryDataConcurrentJoinTest.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.util.GridAtomicInteger;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryJoinRequestMessage;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+
+/**
+ *
+ */
+@SuppressWarnings("unchecked")
+public class CacheDiscoveryDataConcurrentJoinTest extends GridCommonAbstractTest {
+ /** */
+ private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** Iteration. */
+ private static final int ITERATIONS = 3;
+
+ /** */
+ private boolean client;
+
+ /** */
+ private ThreadLocal<Integer> staticCaches = new ThreadLocal<>();
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ TcpDiscoverySpi testSpi = new TcpDiscoverySpi() {
+ /** */
+ private boolean delay = true;
+
+ @Override protected void startMessageProcess(TcpDiscoveryAbstractMessage msg) {
+ if (getTestIgniteInstanceName(0).equals(ignite.name())) {
+ if (msg instanceof TcpDiscoveryJoinRequestMessage) {
+ TcpDiscoveryJoinRequestMessage msg0 = (TcpDiscoveryJoinRequestMessage)msg;
+
+ if (delay) {
+ log.info("Delay join processing: " + msg0);
+
+ delay = false;
+
+ doSleep(5000);
+ }
+ }
+ }
+
+ super.startMessageProcess(msg);
+ }
+ };
+
+ testSpi.setIpFinder(ipFinder);
+ testSpi.setJoinTimeout(60_000);
+
+ cfg.setDiscoverySpi(testSpi);
+
+ cfg.setClientMode(client);
+
+ Integer caches = staticCaches.get();
+
+ if (caches != null) {
+ cfg.setCacheConfiguration(cacheConfigurations(caches).toArray(new CacheConfiguration[caches]));
+
+ staticCaches.remove();
+ }
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected long getTestTimeout() {
+ return 10 * 60 * 1000L;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+
+ super.afterTest();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testConcurrentJoin() throws Exception {
+ for (int iter = 0; iter < ITERATIONS; iter++) {
+ log.info("Iteration: " + iter);
+
+ final int NODES = 6;
+ final int MAX_CACHES = 10;
+
+ final GridAtomicInteger caches = new GridAtomicInteger();
+
+ startGrid(0);
+
+ final AtomicInteger idx = new AtomicInteger(1);
+
+ GridTestUtils.runMultiThreaded(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ int c = ThreadLocalRandom.current().nextInt(MAX_CACHES) + 1;
+
+ staticCaches.set(c);
+
+ startGrid(idx.getAndIncrement());
+
+ caches.setIfGreater(c);
+
+ return null;
+ }
+ }, NODES - 1, "start-node");
+
+ assertTrue(caches.get() > 0);
+
+ for (int i = 0; i < NODES; i++) {
+ Ignite node = ignite(i);
+
+ for (int c = 0; c < caches.get(); c++) {
+ Collection<ClusterNode> nodes = node.cluster().forCacheNodes("cache-" + c).nodes();
+
+ assertEquals(NODES, nodes.size());
+
+ checkCache(node, "cache-" + c);
+ }
+ }
+
+ stopAllGrids();
+ }
+ }
+
+ /**
+ * @param caches Number of caches.
+ * @return Cache configurations.
+ */
+ private Collection<CacheConfiguration> cacheConfigurations(int caches) {
+ List<CacheConfiguration> ccfgs = new ArrayList<>();
+
+ for (int i = 0; i < caches; i++)
+ ccfgs.add(cacheConfiguration("cache-" + i));
+
+ return ccfgs;
+ }
+
+ /**
+ * @param cacheName Cache name.
+ * @return Cache configuration.
+ */
+ private CacheConfiguration cacheConfiguration(String cacheName) {
+ CacheConfiguration ccfg = new CacheConfiguration(cacheName);
+
+ ccfg.setName(cacheName);
+ ccfg.setAtomicityMode(TRANSACTIONAL);
+ ccfg.setAffinity(new RendezvousAffinityFunction(false, 16));
+
+ return ccfg;
+ }
+ /**
+ * @param node Node.
+ * @param cacheName Cache name.
+ */
+ private void checkCache(Ignite node, final String cacheName) {
+ assertNotNull(((IgniteKernal)node).context().cache().cache(cacheName));
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
index fed388a..bc435e2 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
@@ -2165,7 +2165,7 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest {
Map<String, List<List<ClusterNode>>> aff = new HashMap<>();
for (Ignite node : nodes) {
- log.info("Check node: " + node.name());
+ log.info("Check affinity [node=" + node.name() + ", topVer=" + topVer + ", expIdeal=" + expIdeal + ']');
IgniteKernal node0 = (IgniteKernal)node;
@@ -2175,7 +2175,7 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest {
fut.get();
for (GridCacheContext cctx : node0.context().cache().context().cacheContexts()) {
- if (cctx.startTopologyVersion() != null && cctx.startTopologyVersion().compareTo(topVer) > 0)
+ if (cctx.startTopologyVersion().compareTo(topVer) > 0)
continue;
List<List<ClusterNode>> aff1 = aff.get(cctx.name());
http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheStartOnJoinTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheStartOnJoinTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheStartOnJoinTest.java
index 321faf8..88df607 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheStartOnJoinTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheStartOnJoinTest.java
@@ -34,6 +34,7 @@ import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.MemoryConfiguration;
+import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
@@ -210,6 +211,8 @@ public class CacheStartOnJoinTest extends GridCommonAbstractTest {
Collection<ClusterNode> nodes = node.cluster().forCacheNodes("cache-" + c).nodes();
assertEquals(NODES, nodes.size());
+
+ checkCache(node, "cache-" + c);
}
for (int c = 0; c < 5; c++) {
@@ -247,4 +250,11 @@ public class CacheStartOnJoinTest extends GridCommonAbstractTest {
return ccfg;
}
+ /**
+ * @param node Node.
+ * @param cacheName Cache name.
+ */
+ private void checkCache(Ignite node, final String cacheName) {
+ assertNotNull(((IgniteKernal)node).context().cache().cache(cacheName));
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java
index 4864a67..bf5ba61 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java
@@ -26,6 +26,7 @@ import javax.cache.Cache;
import javax.cache.configuration.Factory;
import javax.cache.integration.CacheLoaderException;
import javax.cache.integration.CacheWriterException;
+import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.store.CacheStore;
import org.apache.ignite.cache.store.CacheStoreSession;
@@ -35,6 +36,7 @@ import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.resources.CacheStoreSessionResource;
+import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.transactions.Transaction;
import org.jetbrains.annotations.Nullable;
@@ -357,9 +359,12 @@ public class IgniteCrossCacheTxStoreSelfTest extends GridCommonAbstractTest {
*
*/
private static class FirstStoreFactory implements Factory<CacheStore> {
+ @IgniteInstanceResource
+ private Ignite ignite;
+
/** {@inheritDoc} */
@Override public CacheStore create() {
- String igniteInstanceName = startingIgniteInstanceName.get();
+ String igniteInstanceName = ignite.name();
CacheStore store = firstStores.get(igniteInstanceName);
@@ -374,9 +379,12 @@ public class IgniteCrossCacheTxStoreSelfTest extends GridCommonAbstractTest {
*
*/
private static class SecondStoreFactory implements Factory<CacheStore> {
+ @IgniteInstanceResource
+ private Ignite ignite;
+
/** {@inheritDoc} */
@Override public CacheStore create() {
- String igniteInstanceName = startingIgniteInstanceName.get();
+ String igniteInstanceName = ignite.name();
CacheStore store = secondStores.get(igniteInstanceName);
http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedBasicStoreMultiNodeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedBasicStoreMultiNodeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedBasicStoreMultiNodeSelfTest.java
index 372da32..a7128e0 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedBasicStoreMultiNodeSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedBasicStoreMultiNodeSelfTest.java
@@ -22,7 +22,9 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import javax.cache.configuration.Factory;
import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.store.CacheStore;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.NearCacheConfiguration;
@@ -102,11 +104,7 @@ public class GridCachePartitionedBasicStoreMultiNodeSelfTest extends GridCommonA
cc.setAtomicityMode(TRANSACTIONAL);
cc.setBackups(1);
- GridCacheTestStore store = new GridCacheTestStore();
-
- stores.add(store);
-
- cc.setCacheStoreFactory(singletonFactory(store));
+ cc.setCacheStoreFactory(new StoreFactory());
cc.setReadThrough(true);
cc.setWriteThrough(true);
cc.setLoadPreviousValue(true);
@@ -269,4 +267,18 @@ public class GridCachePartitionedBasicStoreMultiNodeSelfTest extends GridCommonA
assertEquals(expPutAll, putAll);
assertEquals(expTxs, txs);
}
+
+ /**
+ *
+ */
+ static class StoreFactory implements Factory<CacheStore> {
+ /** {@inheritDoc} */
+ @Override public CacheStore create() {
+ GridCacheTestStore store = new GridCacheTestStore();
+
+ stores.add(store);
+
+ return store;
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/86002002/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
index 0f4aa87..2096179 100644
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
+++ b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
@@ -18,8 +18,10 @@
package org.apache.ignite.loadtests.hashmap;
import java.util.IdentityHashMap;
+import java.util.UUID;
import org.apache.ignite.cache.store.CacheStore;
import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheAffinitySharedManager;
import org.apache.ignite.internal.processors.cache.CacheOsConflictResolutionManager;
import org.apache.ignite.internal.processors.cache.CacheType;
@@ -78,6 +80,8 @@ public class GridCacheTestContext<K, V> extends GridCacheContext<K, V> {
),
defaultCacheConfiguration(),
CacheType.USER,
+ AffinityTopologyVersion.ZERO,
+ UUID.randomUUID(),
true,
true,
null,