You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/05/22 15:13:15 UTC
[47/50] [abbrv] ignite git commit: Merge remote-tracking branch
'remotes/community/ignite-5075' into ignite-5075-pds
Merge remote-tracking branch 'remotes/community/ignite-5075' into ignite-5075-pds
# Conflicts:
# modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
# modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
# modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
# modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
# modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
# modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
# modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
# modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
# modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
# modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
# modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
# modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
# modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
# modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
# modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
# modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
# modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/08404350
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/08404350
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/08404350
Branch: refs/heads/ignite-5075-pds
Commit: 08404350a6cfbf841638a40e5b96823bc06f87e3
Parents: 09e0bc2 db7a809
Author: sboikov <sb...@gridgain.com>
Authored: Mon May 22 12:58:11 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon May 22 12:58:11 2017 +0300
----------------------------------------------------------------------
.../configuration/CacheConfiguration.java | 13 +
.../managers/communication/GridIoManager.java | 10 +-
.../internal/managers/discovery/DiscoCache.java | 30 +-
.../discovery/GridDiscoveryManager.java | 144 +-
.../affinity/GridAffinityAssignmentCache.java | 38 +-
.../affinity/GridAffinityProcessor.java | 2 +-
.../processors/affinity/GridAffinityUtils.java | 2 +-
.../cache/CacheAffinitySharedManager.java | 604 ++---
.../CacheClientReconnectDiscoveryData.java | 62 +-
.../internal/processors/cache/CacheData.java | 18 +-
.../processors/cache/CacheGroupData.java | 142 ++
.../processors/cache/CacheGroupDescriptor.java | 210 ++
.../cache/CacheGroupInfrastructure.java | 783 ++++++
.../processors/cache/CacheMetricsImpl.java | 10 +-
.../cache/CacheNodeCommonDiscoveryData.java | 33 +
.../processors/cache/ClusterCachesInfo.java | 364 ++-
.../cache/ClusterCachesReconnectResult.java | 75 +
.../cache/DynamicCacheDescriptor.java | 26 +
.../processors/cache/ExchangeActions.java | 86 +-
.../processors/cache/GridCacheAdapter.java | 51 +-
.../cache/GridCacheAffinityManager.java | 64 +-
.../processors/cache/GridCacheAttributes.java | 14 +
.../cache/GridCacheClearAllRunnable.java | 2 +-
.../cache/GridCacheConcurrentMap.java | 22 +-
.../cache/GridCacheConcurrentMapImpl.java | 132 +-
.../processors/cache/GridCacheContext.java | 145 +-
.../processors/cache/GridCacheEntryInfo.java | 36 +-
.../processors/cache/GridCacheEventManager.java | 36 -
.../cache/GridCacheGroupIdMessage.java | 110 +
.../processors/cache/GridCacheIdMessage.java | 117 +
.../processors/cache/GridCacheIoManager.java | 288 ++-
.../cache/GridCacheLocalConcurrentMap.java | 42 +-
.../processors/cache/GridCacheMapEntry.java | 18 +-
.../processors/cache/GridCacheMessage.java | 97 +-
.../GridCachePartitionExchangeManager.java | 289 +--
.../processors/cache/GridCachePreloader.java | 23 +-
.../cache/GridCachePreloaderAdapter.java | 46 +-
.../processors/cache/GridCacheProcessor.java | 373 ++-
.../cache/GridCacheSharedContext.java | 4 +-
.../processors/cache/GridCacheTtlManager.java | 22 +-
.../processors/cache/GridCacheUtils.java | 42 +-
.../GridChangeGlobalStateMessageResponse.java | 20 +-
.../processors/cache/GridNoStorageCacheMap.java | 26 +-
.../cache/IgniteCacheOffheapManager.java | 122 +-
.../cache/IgniteCacheOffheapManagerImpl.java | 1164 ++++++---
.../cache/affinity/GridCacheAffinityImpl.java | 9 +-
.../processors/cache/database/CacheDataRow.java | 5 -
.../cache/database/CacheDataRowAdapter.java | 47 +-
.../cache/database/CacheSearchRow.java | 5 +
.../processors/cache/database/RowStore.java | 16 +-
.../cache/database/tree/BPlusTree.java | 39 +-
.../cache/database/tree/io/PageIO.java | 24 +
.../distributed/GridCacheTtlUpdateRequest.java | 4 +-
.../distributed/GridDistributedBaseMessage.java | 4 +-
.../GridDistributedCacheAdapter.java | 11 +-
.../GridDistributedTxFinishResponse.java | 28 +-
.../dht/GridCachePartitionedConcurrentMap.java | 72 +-
.../dht/GridClientPartitionTopology.java | 25 +-
.../dht/GridDhtAffinityAssignmentRequest.java | 10 +-
.../dht/GridDhtAffinityAssignmentResponse.java | 12 +-
.../dht/GridDhtAssignmentFetchFuture.java | 22 +-
.../distributed/dht/GridDhtCacheAdapter.java | 338 ++-
.../distributed/dht/GridDhtCacheEntry.java | 6 +-
.../cache/distributed/dht/GridDhtGetFuture.java | 8 +-
.../distributed/dht/GridDhtGetSingleFuture.java | 11 +-
.../distributed/dht/GridDhtLocalPartition.java | 366 ++-
.../distributed/dht/GridDhtLockResponse.java | 2 +-
.../dht/GridDhtPartitionTopology.java | 9 +-
.../dht/GridDhtPartitionTopologyImpl.java | 272 +-
.../dht/GridDhtTransactionalCacheAdapter.java | 37 +-
.../dht/GridDhtTxFinishResponse.java | 14 +-
.../dht/GridDhtTxOnePhaseCommitAckRequest.java | 16 +-
.../distributed/dht/GridDhtTxPrepareFuture.java | 4 +-
.../dht/GridPartitionedGetFuture.java | 2 +-
.../dht/GridPartitionedSingleGetFuture.java | 2 +-
.../GridDhtAtomicAbstractUpdateRequest.java | 4 +-
.../dht/atomic/GridDhtAtomicCache.java | 60 +-
.../dht/atomic/GridDhtAtomicCacheEntry.java | 53 -
.../GridDhtAtomicDeferredUpdateResponse.java | 4 +-
.../dht/atomic/GridDhtAtomicNearResponse.java | 4 +-
.../dht/atomic/GridDhtAtomicUpdateResponse.java | 5 +-
.../GridNearAtomicAbstractUpdateRequest.java | 4 +-
.../GridNearAtomicCheckUpdateRequest.java | 4 +-
.../atomic/GridNearAtomicUpdateResponse.java | 4 +-
.../dht/colocated/GridDhtColocatedCache.java | 23 +-
.../colocated/GridDhtColocatedCacheEntry.java | 52 -
.../dht/preloader/GridDhtForceKeysFuture.java | 15 +-
.../dht/preloader/GridDhtForceKeysRequest.java | 4 +-
.../dht/preloader/GridDhtForceKeysResponse.java | 8 +-
.../GridDhtPartitionDemandMessage.java | 12 +-
.../dht/preloader/GridDhtPartitionDemander.java | 208 +-
.../dht/preloader/GridDhtPartitionSupplier.java | 57 +-
.../GridDhtPartitionSupplyMessage.java | 26 +-
.../GridDhtPartitionsAbstractMessage.java | 26 +-
.../GridDhtPartitionsExchangeFuture.java | 216 +-
.../preloader/GridDhtPartitionsFullMessage.java | 62 +-
.../GridDhtPartitionsSingleMessage.java | 41 +-
.../GridDhtPartitionsSingleRequest.java | 7 +-
.../dht/preloader/GridDhtPreloader.java | 410 +--
.../distributed/near/GridNearAtomicCache.java | 2 +-
.../distributed/near/GridNearCacheAdapter.java | 14 +-
.../distributed/near/GridNearGetRequest.java | 4 +-
.../distributed/near/GridNearGetResponse.java | 4 +-
.../near/GridNearSingleGetRequest.java | 4 +-
.../near/GridNearSingleGetResponse.java | 4 +-
.../near/GridNearTransactionalCache.java | 4 +-
.../near/GridNearTxFinishResponse.java | 14 +-
.../processors/cache/local/GridLocalCache.java | 15 +-
.../local/atomic/GridLocalAtomicCache.java | 4 +-
.../query/GridCacheDistributedQueryManager.java | 10 +-
.../cache/query/GridCacheQueryManager.java | 4 +-
.../cache/query/GridCacheQueryRequest.java | 3 +-
.../cache/query/GridCacheQueryResponse.java | 4 +-
.../CacheContinuousQueryBatchAck.java | 4 +-
.../continuous/CacheContinuousQueryHandler.java | 2 +-
.../continuous/CacheContinuousQueryManager.java | 7 +-
.../cache/transactions/IgniteTxEntry.java | 4 +-
.../cache/transactions/IgniteTxHandler.java | 26 +-
.../cache/transactions/TxLocksRequest.java | 20 +-
.../cache/transactions/TxLocksResponse.java | 28 +-
.../cluster/GridClusterStateProcessor.java | 4 +-
.../visor/cache/VisorCachePartitionsTask.java | 2 +-
.../CacheAtomicSingleMessageCountSelfTest.java | 2 +-
.../cache/CacheDeferredDeleteQueueTest.java | 2 +-
...cheDhtLocalPartitionAfterRemoveSelfTest.java | 2 +-
...CacheExchangeMessageDuplicatedStateTest.java | 54 +-
.../cache/CacheOffheapMapEntrySelfTest.java | 9 +-
.../GridCacheConditionalDeploymentSelfTest.java | 18 +
.../processors/cache/GridCacheLeakTest.java | 3 +-
.../GridCacheOrderedPreloadingSelfTest.java | 14 +-
.../cache/GridCacheTtlManagerSelfTest.java | 3 +-
.../processors/cache/IgniteCacheGroupsTest.java | 2406 ++++++++++++++++++
.../cache/IgniteCachePeekModesAbstractTest.java | 2 +-
.../processors/cache/IgniteCacheStartTest.java | 5 +-
.../cache/IgniteOnePhaseCommitInvokeTest.java | 4 +-
...niteTopologyValidatorGridSplitCacheTest.java | 8 +-
.../IgniteTxStoreExceptionAbstractSelfTest.java | 4 +-
.../GridCacheBinaryObjectsAbstractSelfTest.java | 2 +-
.../GridCacheQueueCleanupSelfTest.java | 13 +-
.../GridCacheSetAbstractSelfTest.java | 17 +-
.../GridCacheSetFailoverAbstractSelfTest.java | 6 +-
.../IgnitePartitionedQueueNoBackupsTest.java | 6 +-
.../IgnitePartitionedSetNoBackupsSelfTest.java | 6 +-
.../CacheDiscoveryDataConcurrentJoinTest.java | 17 +
.../CacheLateAffinityAssignmentTest.java | 6 +-
.../IgniteCachePartitionLossPolicySelfTest.java | 19 +-
.../IgniteCacheReadFromBackupTest.java | 5 +-
...sabledMultiNodeWithGroupFullApiSelfTest.java | 35 +
.../atomic/IgniteCacheAtomicProtocolTest.java | 7 +-
...AtomicMultiNodeWithGroupFullApiSelfTest.java | 34 +
...nabledMultiNodeWithGroupFullApiSelfTest.java | 35 +
.../near/GridCacheNearReadersSelfTest.java | 4 +-
...tionedMultiNodeWithGroupFullApiSelfTest.java | 34 +
.../GridCacheReplicatedPreloadSelfTest.java | 3 +-
.../IgniteCacheClientNearCacheExpiryTest.java | 20 +-
.../expiry/IgniteCacheTtlCleanupSelfTest.java | 2 +-
...IgniteCacheJdbcBlobStoreNodeRestartTest.java | 3 +
...acheLocalAtomicWithGroupFullApiSelfTest.java | 34 +
.../local/GridCacheLocalFullApiSelfTest.java | 1 -
.../GridCacheLocalWithGroupFullApiSelfTest.java | 34 +
.../TxOptimisticDeadlockDetectionTest.java | 2 +-
.../TxPessimisticDeadlockDetectionTest.java | 2 +-
.../loadtests/hashmap/GridCacheTestContext.java | 6 +-
.../communication/GridCacheMessageSelfTest.java | 30 +
.../testframework/junits/GridAbstractTest.java | 17 +
.../junits/common/GridCommonAbstractTest.java | 64 +
.../IgniteCacheFullApiSelfTestSuite.java | 13 +
.../testsuites/IgniteCacheTestSuite3.java | 3 +
.../query/h2/database/H2PkHashIndex.java | 4 +-
.../query/h2/database/H2RowFactory.java | 2 +-
.../h2/twostep/GridReduceQueryExecutor.java | 12 +-
.../cache/IgniteCacheGroupsSqlTest.java | 144 ++
.../cache/IgniteCacheNoClassQuerySelfTest.java | 23 +-
.../IgniteCacheWithIndexingTestSuite.java | 3 +
.../yardstick/IgniteBenchmarkArguments.java | 12 +
.../org/apache/ignite/yardstick/IgniteNode.java | 3 +
176 files changed, 9049 insertions(+), 3043 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/08404350/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/08404350/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 6601591,c3311a8..875f684
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@@ -515,10 -527,10 +532,10 @@@ public class CacheAffinitySharedManager
/**
*
*/
- public void removeAllCacheInfo(){
+ public void removeAllCacheInfo() {
- caches.clear();
+ grpHolders.clear();
- registeredCaches.clear();
+ registeredGrps.clear();
}
/**
@@@ -720,8 -733,7 +738,8 @@@
* @param nodeId Node ID.
* @param res Response.
*/
- private void processAffinityAssignmentResponse(UUID nodeId,
- private void processAffinityAssignmentResponse(Integer grpId, UUID nodeId, GridDhtAffinityAssignmentResponse res) {
++ private void processAffinityAssignmentResponse(Integer grpId, UUID nodeId,
+ GridDhtAffinityAssignmentResponse res) {
if (log.isDebugEnabled())
log.debug("Processing affinity assignment response [node=" + nodeId + ", res=" + res + ']');
http://git-wip-us.apache.org/repos/asf/ignite/blob/08404350/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/08404350/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/08404350/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/08404350/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/08404350/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/08404350/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index c7d1fa7,96ae0b9..976b843
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@@ -845,10 -844,9 +846,9 @@@ public class GridCachePartitionExchange
/**
* @param nodes Nodes.
- * @return {@code True} if message was sent, {@code false} if node left grid.
*/
- private boolean sendAllPartitions(Collection<ClusterNode> nodes) {
+ private void sendAllPartitions(Collection<ClusterNode> nodes) {
- GridDhtPartitionsFullMessage m = createPartitionsFullMessage(nodes, null, null, true);
+ GridDhtPartitionsFullMessage m = createPartitionsFullMessage(nodes, null, null, null, null, true);
if (log.isDebugEnabled())
log.debug("Sending all partitions [nodeIds=" + U.nodeIds(nodes) + ", msg=" + m + ']');
@@@ -1266,13 -1253,13 +1260,13 @@@
GridDhtPartitionTopology top = null;
- if (cacheCtx == null)
- top = clientTops.get(cacheId);
- else if (!cacheCtx.isLocal())
- top = cacheCtx.topology();
+ if (grp == null)
+ top = clientTops.get(grpId);
+ else if (!grp.isLocal())
+ top = grp.topology();
if (top != null)
- updated |= top.update(null, entry.getValue(), null) != null;
+ updated |= top.update(null, entry.getValue(), null, msg.partsToReload(cctx.localNodeId(), cacheId)) != null;
}
if (!cctx.kernalContext().clientNode() && updated)
http://git-wip-us.apache.org/repos/asf/ignite/blob/08404350/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 3f26871,e00ba5f..7dd457b
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@@ -73,9 -73,9 +73,10 @@@ import org.apache.ignite.internal.pagem
import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+ import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
import org.apache.ignite.internal.processors.cache.database.IgniteCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.database.IgniteCacheSnapshotManager;
import org.apache.ignite.internal.processors.cache.database.MemoryPolicy;
import org.apache.ignite.internal.processors.cache.database.freelist.FreeList;
import org.apache.ignite.internal.processors.cache.database.tree.reuse.ReuseList;
@@@ -1229,13 -1291,27 +1293,19 @@@ public class GridCacheProcessor extend
ctx.kernalContext().cache().context().database().onCacheStop(ctx);
+ ctx.kernalContext().cache().context().snapshot().onCacheStop(ctx);
+
+ ctx.group().stopCache(ctx, destroy);
+
U.stopLifecycleAware(log, lifecycleAwares(cache.configuration(), ctx.store().configuredStore()));
- if (log.isInfoEnabled())
- log.info("Stopped cache: " + cache.name());
+ if (log.isInfoEnabled()) {
+ if (ctx.group().sharedGroup())
+ log.info("Stopped cache [cacheName=" + cache.name() + ", group=" + ctx.group().name() + ']');
+ else
+ log.info("Stopped cache [cacheName=" + cache.name() + ']');
+ }
- if (sharedCtx.pageStore() != null) {
- try {
- sharedCtx.pageStore().shutdownForCache(ctx, destroy);
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to gracefully clean page store resources for destroyed cache " +
- "[cache=" + ctx.name() + "]", e);
- }
- }
-
cleanup(ctx);
}
@@@ -1855,10 -1930,9 +1928,12 @@@
startCache(cache, schema != null ? schema : new QuerySchema());
+ grp.onCacheStarted(cacheCtx);
+
onKernalStart(cache);
+
+ if (proxyRestart)
+ proxy.onRestarted(cacheCtx, cache);
}
/**
@@@ -1963,28 -2069,16 +2088,31 @@@
for (ExchangeActions.ActionData action : exchActions.cacheStopRequests()) {
stopGateway(action.request());
- prepareCacheStop(action.request());
+ sharedCtx.database().checkpointReadLock();
+
+ try {
+ stopCtx = prepareCacheStop(action.request());
+ destroy = action.request().destroy();
+ }
+ finally {
+ sharedCtx.database().checkpointReadUnlock();
+ }
+
+ if (stopCtx != null) {
+ if (stopped == null)
+ stopped = new ArrayList<>();
+
+ stopped.add(F.<GridCacheContext, Boolean>t(stopCtx, destroy));
+ }
}
- for (DynamicCacheChangeRequest req : exchActions.closeRequests(ctx.localNodeId())) {
- String cacheName = req.cacheName();
+ for (CacheGroupDescriptor grpDesc : exchActions.cacheGroupsToStop())
+ stopCacheGroup(grpDesc.groupId());
+
+ for (ExchangeActions.ActionData req : exchActions.closeRequests(ctx.localNodeId())) {
+ String cacheName = req.request().cacheName();
- IgniteCacheProxy<?, ?> proxy = jCacheProxies.get(cacheName);
+ IgniteCacheProxy<?, ?> proxy = jCacheProxies.remove(cacheName);
if (proxy != null) {
if (proxy.context().affinityNode()) {
@@@ -2000,28 -2093,13 +2128,33 @@@
proxy.context().gate().onStopped();
- CacheGroupInfrastructure grp = prepareCacheStop(req.request());
+ sharedCtx.database().checkpointReadLock();
+
+ try {
+ stopCtx = prepareCacheStop(req);
+
- if (grp != null && !grp.hasCaches())
- stopCacheGroup(grp.groupId());
+ destroy = req.destroy();
++
++ if (stopCtx != null && !stopCtx.group().hasCaches())
++ stopCacheGroup(stopCtx.groupId());
++
+ }
+ finally {
+ sharedCtx.database().checkpointReadUnlock();
+ }
}
}
+
+ if (stopCtx != null) {
+ if (stopped == null)
+ stopped = new ArrayList<>();
+
+ stopped.add(F.<GridCacheContext, Boolean>t(stopCtx, destroy));
+ }
}
+
+ if (stopped != null && !sharedCtx.kernalContext().clientNode())
+ sharedCtx.database().onCachesStopped(stopped);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/08404350/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/08404350/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridChangeGlobalStateMessageResponse.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/08404350/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
index 5bcefda,d344e20..7ea1f9a
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
@@@ -96,9 -126,11 +126,10 @@@ public interface IgniteCacheOffheapMana
public CacheDataStore dataStore(GridDhtLocalPartition part);
/**
- * @param p Partition ID.
* @param store Data store.
+ * @throws IgniteCheckedException If failed.
*/
- public void destroyCacheDataStore(int p, CacheDataStore store) throws IgniteCheckedException;
+ public void destroyCacheDataStore(CacheDataStore store) throws IgniteCheckedException;
/**
* TODO: GG-10884, used on only from initialValue.
@@@ -147,16 -185,7 +182,17 @@@
) throws IgniteCheckedException;
/**
+ * @param key Key.
+ * @param part Partition.
+ * @throws IgniteCheckedException If failed.
+ */
+ public void updateIndexes(
+ KeyCacheObject key,
+ GridDhtLocalPartition part
+ ) throws IgniteCheckedException;
+
+ /**
+ * @param cctx Cache context.
* @param key Key.
* @param partId Partition number.
* @param part Partition.
@@@ -348,26 -403,26 +410,30 @@@
@Nullable CacheDataRow oldRow) throws IgniteCheckedException;
/**
+ * @param cctx Cache context.
* @param key Key.
- * @param part Partition.
* @param val Value.
* @param ver Version.
* @param expireTime Expire time.
* @param oldRow Old row if available.
* @throws IgniteCheckedException If failed.
*/
- void update(KeyCacheObject key,
+ void update(
+ GridCacheContext cctx,
+ KeyCacheObject key,
- int part,
CacheObject val,
GridCacheVersion ver,
long expireTime,
@Nullable CacheDataRow oldRow) throws IgniteCheckedException;
/**
+ * @param key Key.
+ * @throws IgniteCheckedException If failed.
+ */
+ void updateIndexes(KeyCacheObject key) throws IgniteCheckedException;
+
+ /**
+ * @param cctx Cache context.
* @param key Key.
* @param c Closure.
* @throws IgniteCheckedException If failed.
http://git-wip-us.apache.org/repos/asf/ignite/blob/08404350/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index 947421b,8da7357..db08801
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@@ -81,8 -78,21 +79,20 @@@ import static org.apache.ignite.interna
*
*/
@SuppressWarnings("PublicInnerClass")
- public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter implements IgniteCacheOffheapManager {
+ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager {
+ /** */
+ private static final int UNDEFINED_CACHE_ID = 0;
+
+ /** */
+ protected GridCacheSharedContext ctx;
+
+ /** */
+ protected CacheGroupInfrastructure grp;
+
+ /** */
+ protected IgniteLogger log;
+
/** */
- // TODO GG-11208 need restore size after restart.
private CacheDataStore locCacheDataStore;
/** */
@@@ -118,15 -122,15 +125,17 @@@
}
/** {@inheritDoc} */
- @Override protected void start0() throws IgniteCheckedException {
- super.start0();
+ @Override public void start(GridCacheSharedContext ctx, CacheGroupInfrastructure grp) throws IgniteCheckedException {
+ this.ctx = ctx;
+ this.grp = grp;
+ this.log = ctx.logger(getClass());
+ indexingEnabled = QueryUtils.isEnabled(cctx.config());
+
- updateValSizeThreshold = cctx.shared().database().pageSize() / 2;
+ updateValSizeThreshold = ctx.database().pageSize() / 2;
- if (cctx.affinityNode()) {
- cctx.shared().database().checkpointReadLock();
+ if (grp.affinityNode()) {
+ ctx.database().checkpointReadLock();
try {
initDataStructures();
@@@ -170,9 -175,21 +180,21 @@@
}
/** {@inheritDoc} */
- @Override protected void onKernalStop0(boolean cancel) {
- super.onKernalStop0(cancel);
+ @Override public void stop() {
+ try {
+ for (CacheDataStore store : cacheDataStores())
- store.destroy();
++ destroyCacheDataStore(store);
+
+ if (pendingEntries != null)
+ pendingEntries.destroy();
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e.getMessage(), e);
+ }
+ }
+ /** {@inheritDoc} */
+ @Override public void onKernalStop() {
busyLock.block();
}
@@@ -229,10 -253,10 +258,10 @@@
* @return Partition data.
*/
@Nullable private CacheDataStore partitionData(int p) {
- if (cctx.isLocal())
+ if (grp.isLocal())
return locCacheDataStore;
else {
- GridDhtLocalPartition part = cctx.topology().localPartition(p, AffinityTopologyVersion.NONE, false, true);
- GridDhtLocalPartition part = grp.topology().localPartition(p, AffinityTopologyVersion.NONE, false);
++ GridDhtLocalPartition part = grp.topology().localPartition(p, AffinityTopologyVersion.NONE, false, true);
return part != null ? part.dataStore() : null;
}
@@@ -341,12 -351,7 +355,12 @@@
) throws IgniteCheckedException {
assert expireTime >= 0;
- dataStore(part).update(key, val, ver, expireTime, oldRow);
- dataStore(part).update(cctx, key, partId, val, ver, expireTime, oldRow);
++ dataStore(part).update(cctx, key, val, ver, expireTime, oldRow);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void updateIndexes(KeyCacheObject key, GridDhtLocalPartition part) throws IgniteCheckedException {
+ dataStore(part).updateIndexes(key);
}
/** {@inheritDoc} */
@@@ -419,34 -425,27 +434,34 @@@
* @param readers {@code True} to clear readers.
*/
@SuppressWarnings("unchecked")
- @Override public void clear(boolean readers) {
+ @Override public void clear(GridCacheContext cctx, boolean readers) {
GridCacheVersion obsoleteVer = null;
- GridIterator<CacheDataRow> it = rowsIterator(true, true, null);
+ GridIterator<CacheDataRow> it = iterator(cctx.cacheId(), cacheDataStores().iterator());
while (it.hasNext()) {
- KeyCacheObject key = it.next().key();
+ cctx.shared().database().checkpointReadLock();
try {
- if (obsoleteVer == null)
- obsoleteVer = ctx.versions().next();
+ KeyCacheObject key = it.next().key();
- GridCacheEntryEx entry = cctx.cache().entryEx(key);
+ try {
+ if (obsoleteVer == null)
- obsoleteVer = cctx.versions().next();
++ obsoleteVer = ctx.versions().next();
- entry.clear(obsoleteVer, readers);
- }
- catch (GridDhtInvalidPartitionException ignore) {
- // Ignore.
+ GridCacheEntryEx entry = cctx.cache().entryEx(key);
+
+ entry.clear(obsoleteVer, readers);
+ }
+ catch (GridDhtInvalidPartitionException ignore) {
+ // Ignore.
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to clear cache entry: " + key, e);
+ }
}
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to clear cache entry: " + key, e);
+ finally {
+ cctx.shared().database().checkpointReadUnlock();
}
}
}
@@@ -826,39 -826,41 +864,46 @@@
IgniteInClosure2X<GridCacheEntryEx, GridCacheVersion> c,
int amount
) throws IgniteCheckedException {
+ assert !cctx.isNear() : cctx.name();
+
if (hasPendingEntries && pendingEntries != null) {
- GridCacheVersion obsoleteVer = null;
+ cctx.shared().database().checkpointReadLock();
- long now = U.currentTimeMillis();
+ try {
+ GridCacheVersion obsoleteVer = null;
+
+ long now = U.currentTimeMillis();
- GridCursor<PendingRow> cur = pendingEntries.find(START_PENDING_ROW, new PendingRow(now, 0));
- GridCursor<PendingRow> cur;
++ GridCursor<PendingRow> cur;
+
+ if (grp.sharedGroup())
+ cur = pendingEntries.find(new PendingRow(cctx.cacheId()), new PendingRow(cctx.cacheId(), now, 0));
+ else
+ cur = pendingEntries.find(null, new PendingRow(UNDEFINED_CACHE_ID, now, 0));
- int cleared = 0;
+ int cleared = 0;
- while (cur.next()) {
- PendingRow row = cur.get();
+ while (cur.next()) {
+ PendingRow row = cur.get();
- if (amount != -1 && cleared > amount)
- return true;
+ if (amount != -1 && cleared > amount)
+ return true;
if (row.key.partition() == -1)
- row.key.partition(cctx.affinity().partition(row.key));
+ row.key.partition(cctx.affinity().partition(row.key));assert row.key != null && row.link != 0 && row.expireTime != 0 : row;
- if (pendingEntries.remove(row) != null) {
- assert row.key != null && row.link != 0 && row.expireTime != 0 : row;
++ if (pendingEntries.removex(row)) {
+ if (obsoleteVer == null)
- obsoleteVer = cctx.versions().next();
++ obsoleteVer = ctx.versions().next();
- if (pendingEntries.removex(row)) {
- if (obsoleteVer == null)
- obsoleteVer = ctx.versions().next();
+ c.apply(cctx.cache().entryEx(row.key), obsoleteVer);
+ }
- c.apply(cctx.cache().entryEx(row.key), obsoleteVer);
+ cleared++;
}
-
- cleared++;
+ }
+ finally {
+ cctx.shared().database().checkpointReadUnlock();
}
}
@@@ -1043,8 -1101,10 +1144,8 @@@
}
/** {@inheritDoc} */
-- @Override public void update(
- GridCacheContext cctx,
-- KeyCacheObject key,
- int p,
++ @Override public void update(GridCacheContext cctx,KeyCacheObject key,
++
CacheObject val,
GridCacheVersion ver,
long expireTime,
@@@ -1056,10 -1115,11 +1157,11 @@@
throw new NodeStoppingException("Operation has been cancelled (node is stopping).");
try {
- int cacheId = cctx.memoryPolicy().config().getPageEvictionMode() != DataPageEvictionMode.DISABLED ?
- cctx.cacheId() : 0;
+ int cacheId = grp.storeCacheIdInDataPage() ? cctx.cacheId() : UNDEFINED_CACHE_ID;
+
+ assert oldRow == null || oldRow.cacheId() == cacheId : oldRow;
- DataRow dataRow = new DataRow(key, val, ver, p, expireTime, cacheId);
+ DataRow dataRow = new DataRow(key, val, ver, partId, expireTime, cacheId);
CacheObjectContext coCtx = cctx.cacheObjectContext();
@@@ -1152,28 -1216,7 +1261,28 @@@
}
/** {@inheritDoc} */
+ @Override public void updateIndexes(KeyCacheObject key) throws IgniteCheckedException {
+ if (indexingEnabled) {
+ CacheDataRow row = dataTree.findOne(new SearchRow(key));
+
+ GridCacheQueryManager qryMgr = cctx.queries();
+
+ if (row != null) {
+ qryMgr.store(
+ key,
+ partId,
+ null,
+ null,
+ row.value(),
+ row.version(),
+ row.expireTime(),
+ row.link());
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
- @Override public void remove(KeyCacheObject key, int partId) throws IgniteCheckedException {
+ @Override public void remove(GridCacheContext cctx, KeyCacheObject key, int partId) throws IgniteCheckedException {
if (!busyLock.enterBusy())
throw new NodeStoppingException("Operation has been cancelled (node is stopping).");
http://git-wip-us.apache.org/repos/asf/ignite/blob/08404350/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/RowStore.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/08404350/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/BPlusTree.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/08404350/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
index c966877,0955a51..59a5bb9
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
@@@ -306,9 -305,9 +305,9 @@@ public abstract class GridDistributedCa
IgniteCacheOffheapManager offheap = ctx.offheap();
- if (ctx.affinity().primaryByPartition(ctx.localNode(), partition, topVer) && modes.primary ||
- ctx.affinity().backupByPartition(ctx.localNode(), partition, topVer) && modes.backup)
- size += offheap.cacheEntriesCount(ctx.cacheId(), partition);
+ if (ctx.affinity().primaryByPartition(ctx.localNode(), part, topVer) && modes.primary ||
+ ctx.affinity().backupByPartition(ctx.localNode(), part, topVer) && modes.backup)
- size += offheap.entriesCount(part);
++ size += offheap.cacheEntriesCount(ctx.cacheId(), part);
}
return size;
http://git-wip-us.apache.org/repos/asf/ignite/blob/08404350/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index 31edeea,e94415c..fe92cfb
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@@ -355,14 -353,8 +355,14 @@@ public class GridClientPartitionTopolog
}
/** {@inheritDoc} */
+ @Nullable @Override public GridDhtLocalPartition localPartition(int p, AffinityTopologyVersion topVer,
+ boolean create, boolean showRenting) throws GridDhtInvalidPartitionException {
+ return localPartition(p, topVer, create);
+ }
+
+ /** {@inheritDoc} */
- @Override public GridDhtLocalPartition localPartition(Object key, boolean create) {
- return localPartition(1, AffinityTopologyVersion.NONE, create);
+ @Override public GridDhtLocalPartition localPartition(int p) {
+ return localPartition(p, AffinityTopologyVersion.NONE, false);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/08404350/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/08404350/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/08404350/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/08404350/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
index 1d910a3,bbb3cc5..5fd7ada
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
@@@ -54,7 -61,7 +61,8 @@@ import org.apache.ignite.internal.util.
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+ import org.jsr166.ConcurrentHashMap8;
import org.jsr166.ConcurrentLinkedDeque8;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE;
@@@ -133,22 -152,35 +153,36 @@@ public class GridDhtLocalPartition exte
* reservation is released. */
private volatile boolean shouldBeRenting;
+ /** Set if partition must be re-created and preloaded after eviction. */
+ private boolean reload;
+
/**
- * @param cctx Context.
+ * @param ctx Context.
+ * @param grp Cache group.
* @param id Partition ID.
* @param entryFactory Entry factory.
*/
- @SuppressWarnings("ExternalizableWithoutPublicNoArgConstructor") GridDhtLocalPartition(GridCacheContext cctx,
- int id, GridCacheMapEntryFactory entryFactory) {
- super(cctx, entryFactory, Math.max(10, GridCacheAdapter.DFLT_START_CACHE_SIZE / cctx.affinity().partitions()));
+ @SuppressWarnings("ExternalizableWithoutPublicNoArgConstructor")
+ GridDhtLocalPartition(GridCacheSharedContext ctx,
- CacheGroupInfrastructure grp,
- int id,
- GridCacheMapEntryFactory entryFactory) {
- super(entryFactory);
++ CacheGroupInfrastructure grp, int id, GridCacheMapEntryFactory entryFactory) {
++ super( entryFactory);
this.id = id;
- this.cctx = cctx;
+ this.ctx = ctx;
+ this.grp = grp;
- log = U.logger(cctx.kernalContext(), logRef, this);
+ log = U.logger(ctx.kernalContext(), logRef, this);
+
+ if (grp.sharedGroup()) {
+ singleCacheEntryMap = null;
+ cachesEntryMaps = new ConcurrentHashMap<>();
+ cacheSizes = new ConcurrentHashMap<>();
+ }
+ else {
+ singleCacheEntryMap = createEntriesMap();
+ cachesEntryMaps = null;
+ cacheSizes = null;
+ }
rent = new GridFutureAdapter<Object>() {
@Override public String toString() {
@@@ -756,7 -867,9 +870,7 @@@
*/
private void destroyCacheDataStore() {
try {
- cctx.offheap().destroyCacheDataStore(dataStore());
- CacheDataStore store = dataStore();
-
- grp.offheap().destroyCacheDataStore(id, store);
++ grp.offheap().destroyCacheDataStore(dataStore());
}
catch (IgniteCheckedException e) {
log.error("Unable to destroy cache data store on partition eviction [id=" + id + "]", e);
@@@ -900,31 -1030,24 +1026,29 @@@
true,
false);
- if (cached instanceof GridDhtCacheEntry && ((GridDhtCacheEntry)cached).clearInternal(clearVer, extras)) {
+ cctx.shared().database().checkpointReadLock();
+
- try {
- if (cached instanceof GridDhtCacheEntry && ((GridDhtCacheEntry)cached).clearInternal(clearVer, extras)) {
- if (rec) {
- cctx.events().addEvent(cached.partition(),
- cached.key(),
- cctx.localNodeId(),
- (IgniteUuid)null,
- null,
- EVT_CACHE_REBALANCE_OBJECT_UNLOADED,
- null,
- false,
- cached.rawGet(),
- cached.hasValue(),
- null,
- null,
- null,
- false);
- }
++ try {if (cached instanceof GridDhtCacheEntry && ((GridDhtCacheEntry)cached).clearInternal(clearVer, extras)) {
+ if (rec) {
+ cctx.events().addEvent(cached.partition(),
+ cached.key(),
+ ctx.localNodeId(),
+ (IgniteUuid)null,
+ null,
+ EVT_CACHE_REBALANCE_OBJECT_UNLOADED,
+ null,
+ false,
+ cached.rawGet(),
+ cached.hasValue(),
+ null,
+ null,
+ null,
- false);
++ false);}
}
}
+ finally {
+ cctx.shared().database().checkpointReadUnlock();
+ }
}
catch (GridDhtInvalidPartitionException e) {
assert isEmpty() && state() == EVICTED : "Invalid error [e=" + e + ", part=" + this + ']';
http://git-wip-us.apache.org/repos/asf/ignite/blob/08404350/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/08404350/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 938c1be,1a36e4d..f006c83
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@@ -68,7 -73,8 +70,8 @@@ import static org.apache.ignite.interna
/**
* Partition topology.
*/
- @GridToStringExclude class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
+ @GridToStringExclude
-public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
++publicclass GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
/** If true, then check consistency. */
private static final boolean CONSISTENCY_CHECK = false;
@@@ -127,20 -136,24 +133,26 @@@
private volatile boolean treatAllPartAsLoc;
/**
- * @param cctx Context.
+ * @param ctx Cache shared context.
+ * @param grp Cache group.
* @param entryFactory Entry factory.
*/
- GridDhtPartitionTopologyImpl(GridCacheContext<?, ?> cctx, GridCacheMapEntryFactory entryFactory) {
- assert cctx != null;
-
- this.cctx = cctx;
+ public GridDhtPartitionTopologyImpl(GridCacheSharedContext ctx,
+ CacheGroupInfrastructure grp,
+ GridCacheMapEntryFactory entryFactory) {
+ assert ctx != null;
+ assert grp != null;
+ assert entryFactory != null;
+
+ this.ctx = ctx;
+ this.grp = grp;
this.entryFactory = entryFactory;
- log = cctx.logger(getClass());
+ log = ctx.logger(getClass());
- locParts = new AtomicReferenceArray<>(cctx.config().getAffinity().partitions());
+ locParts = new AtomicReferenceArray<>(grp.affinityFunction().partitions());
+
+ part2node = new HashMap<>(cctx.config().getAffinity().partitions(), 1.0f);
}
/** {@inheritDoc} */
@@@ -404,100 -503,119 +415,109 @@@
ClusterState newState = exchFut.newClusterState();
treatAllPartAsLoc = (newState != null && newState == ClusterState.ACTIVE)
- || (cctx.kernalContext().state().active()
+ || (ctx.kernalContext().state().active()
&& discoEvt.type() == EventType.EVT_NODE_JOINED
&& discoEvt.eventNode().isLocal()
- && !cctx.kernalContext().clientNode()
+ && !ctx.kernalContext().clientNode()
);
- ClusterNode loc = cctx.localNode();
- // Wait for rent outside of checkpoint lock.
- waitForRent();
-
+ ClusterNode loc = ctx.localNode();
- cctx.shared().database().checkpointReadLock();
+ ctx.database().checkpointReadLock();
- synchronized (ctx.exchange().interruptLock()) {
- if (Thread.currentThread().isInterrupted())
- throw new IgniteInterruptedCheckedException("Thread is interrupted: " + Thread.currentThread());
+ try {
- synchronized (cctx.shared().exchange().interruptLock()) {
++ synchronized (ctx.exchange().interruptLock()) {
+ if (Thread.currentThread().isInterrupted())
+ throw new IgniteInterruptedCheckedException("Thread is interrupted: " + Thread.currentThread());
- try {
U.writeLock(lock);
- }
- catch (IgniteInterruptedCheckedException e) {
- ctx.database().checkpointReadUnlock();
- throw e;
- }
+ try {
+ GridDhtPartitionExchangeId exchId = exchFut.exchangeId();
- try {
- GridDhtPartitionExchangeId exchId = exchFut.exchangeId();
+ if (stopping)
+ return;
- if (stopping)
- return;
+ assert topVer.equals(exchId.topologyVersion()) : "Invalid topology version [topVer=" +
+ topVer + ", exchId=" + exchId + ']';
- assert topVer.equals(exchId.topologyVersion()) : "Invalid topology version [topVer=" +
- topVer + ", exchId=" + exchId + ']';
+ if (exchId.isLeft())
+ removeNode(exchId.nodeId());
- if (exchId.isLeft())
- removeNode(exchId.nodeId());
+ ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache();
- ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache();
+ if (log.isDebugEnabled())
+ log.debug("Partition map beforeExchange [exchId=" + exchId + ", fullMap=" + fullMapString() + ']');
- if (log.isDebugEnabled())
- log.debug("Partition map beforeExchange [exchId=" + exchId + ", fullMap=" + fullMapString() + ']');
+ long updateSeq = this.updateSeq.incrementAndGet();
- long updateSeq = this.updateSeq.incrementAndGet();
+ cntrMap.clear();
- cntrMap.clear();
+ // If this is the oldest node.
- if (oldest != null && (loc.equals(oldest) || exchFut.cacheAddedOnExchange(cctx.cacheId(), cctx.receivedFrom()))) {
++ if (oldest != null && (loc.equals(oldest) || exchFut.cacheGroupAddedOnExchange(grp.groupId(), grp.receivedFrom()))) {
+ if (node2part == null) {
+ node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq);
- // If this is the oldest node.
- if (oldest != null && (loc.equals(oldest) || exchFut.cacheGroupAddedOnExchange(grp.groupId(), grp.receivedFrom()))) {
- if (node2part == null) {
- node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq);
+ if (log.isDebugEnabled())
+ log.debug("Created brand new full topology map on oldest node [exchId=" +
+ exchId + ", fullMap=" + fullMapString() + ']');
+ }
+ else if (!node2part.valid()) {
+ node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq, node2part, false);
- if (log.isDebugEnabled())
- log.debug("Created brand new full topology map on oldest node [exchId=" +
- exchId + ", fullMap=" + fullMapString() + ']');
- }
- else if (!node2part.valid()) {
- node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq, node2part, false);
+ if (log.isDebugEnabled())
+ log.debug("Created new full topology map on oldest node [exchId=" + exchId + ", fullMap=" +
+ node2part + ']');
+ }
+ else if (!node2part.nodeId().equals(loc.id())) {
+ node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq, node2part, false);
- if (log.isDebugEnabled())
- log.debug("Created new full topology map on oldest node [exchId=" + exchId + ", fullMap=" +
- node2part + ']');
+ if (log.isDebugEnabled())
+ log.debug("Copied old map into new map on oldest node (previous oldest node left) [exchId=" +
+ exchId + ", fullMap=" + fullMapString() + ']');
+ }
}
- else if (!node2part.nodeId().equals(loc.id())) {
- node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq, node2part, false);
- if (log.isDebugEnabled())
- log.debug("Copied old map into new map on oldest node (previous oldest node left) [exchId=" +
- exchId + ", fullMap=" + fullMapString() + ']');
+ if (affReady)
+ initPartitions0(exchFut, updateSeq);
+ else {
- List<List<ClusterNode>> aff = cctx.affinity().idealAssignment();
++ List<List<ClusterNode>> aff = grp.affinity().idealAssignment();
+
+ createPartitions(aff, updateSeq);
}
- }
- if (affReady)
- initPartitions0(exchFut, updateSeq);
- else {
- List<List<ClusterNode>> aff = grp.affinity().idealAssignment();
+ consistencyCheck();
- createPartitions(aff, updateSeq);
+ if (log.isDebugEnabled())
+ log.debug("Partition map after beforeExchange [exchId=" + exchId + ", fullMap=" +
+ fullMapString() + ']');
+ }
+ finally {
+ lock.writeLock().unlock();
}
-
- consistencyCheck();
-
- if (log.isDebugEnabled())
- log.debug("Partition map after beforeExchange [exchId=" + exchId + ", fullMap=" +
- fullMapString() + ']');
- }
- finally {
- lock.writeLock().unlock();
-
- ctx.database().checkpointReadUnlock();
}
}
-
- // Wait for evictions.
- waitForRent();
+ finally {
- cctx.shared().database().checkpointReadUnlock();
++ ctx.database().checkpointReadUnlock();
+ }
}
+ /**
+ * @param p Partition number.
+ * @param topVer Topology version.
+ * @return {@code True} if given partition belongs to local node.
+ */
+ private boolean partitionLocalNode(int p, AffinityTopologyVersion topVer) {
+ return grp.affinity().nodes(p, topVer).contains(ctx.localNode());
+ }
+
/** {@inheritDoc} */
@Override public boolean afterExchange(GridDhtPartitionsExchangeFuture exchFut) throws IgniteCheckedException {
treatAllPartAsLoc = false;
- boolean changed = waitForRent();
+ boolean changed = false;
- int num = cctx.affinity().partitions();
+ int num = grp.affinity().partitions();
AffinityTopologyVersion topVer = exchFut.topologyVersion();
@@@ -522,9 -640,9 +542,9 @@@
long updateSeq = this.updateSeq.incrementAndGet();
for (int p = 0; p < num; p++) {
- GridDhtLocalPartition locPart = localPartition(p, topVer, false, false);
+ GridDhtLocalPartition locPart = localPartition0(p, topVer, false, false, false);
- if (cctx.affinity().partitionLocalNode(p, topVer)) {
+ if (partitionLocalNode(p, topVer)) {
// This partition will be created during next topology event,
// which obviously has not happened at this point.
if (locPart == null) {
@@@ -866,8 -980,8 +890,8 @@@
for (UUID nodeId : nodeIds) {
HashSet<UUID> affIds = affAssignment.getIds(p);
- if (!affIds.contains(nodeId) && hasState(p, nodeId, OWNING, MOVING, RENTING)) {
+ if (!affIds.contains(nodeId) && hasState(p, nodeId, OWNING, MOVING)) {
- ClusterNode n = cctx.discovery().node(nodeId);
+ ClusterNode n = ctx.discovery().node(nodeId);
if (n != null && (topVer.topologyVersion() < 0 || n.order() <= topVer.topologyVersion())) {
if (nodes == null) {
@@@ -1113,13 -1225,15 +1141,13 @@@
}
}
- part2node = p2n;
-
boolean changed = false;
- AffinityTopologyVersion affVer = cctx.affinity().affinityTopologyVersion();
+ AffinityTopologyVersion affVer = grp.affinity().lastVersion();
- GridDhtPartitionMap nodeMap = partMap.get(cctx.localNodeId());
+ GridDhtPartitionMap nodeMap = partMap.get(ctx.localNodeId());
- if (nodeMap != null && cctx.shared().database().persistenceEnabled()) {
+ if (nodeMap != null && ctx.database().persistenceEnabled()) {
for (Map.Entry<Integer, GridDhtPartitionState> e : nodeMap.entrySet()) {
int p = e.getKey();
GridDhtPartitionState state = e.getValue();
@@@ -1185,10 -1259,8 +1213,10 @@@
}
}
+ long updateSeq = this.updateSeq.incrementAndGet();
+
if (!affVer.equals(AffinityTopologyVersion.NONE) && affVer.compareTo(topVer) >= 0) {
- List<List<ClusterNode>> aff = cctx.affinity().assignments(topVer);
+ List<List<ClusterNode>> aff = grp.affinity().assignments(topVer);
changed |= checkEvictions(updateSeq, aff);
@@@ -1531,18 -1599,9 +1563,18 @@@
GridDhtLocalPartition locPart = locParts.get(p);
if (locPart != null) {
- if (locPart.state() == OWNING && !owners.contains(cctx.localNodeId())) {
- if (locPart.state() == OWNING && !owners.contains(ctx.localNodeId()))
- locPart.moving();
++ if (locPart.state() == OWNING && !owners.contains(ctx.localNodeId())) {
+ if (haveHistory)
+ locPart.moving();
+ else {
+ locPart.rent(false);
+
+ locPart.reload(true);
+
+ result.add(cctx.localNodeId());
+ }
+
+ }
}
for (Map.Entry<UUID, GridDhtPartitionMap> e : node2part.entrySet()) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/08404350/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/08404350/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/08404350/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/08404350/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/08404350/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
index ef6a3b9,04a7e97..4a693bf
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
@@@ -89,9 -85,8 +89,9 @@@ public class GridDhtPartitionDemandMess
* @param cp Message to copy from.
* @param parts Partitions.
*/
- GridDhtPartitionDemandMessage(GridDhtPartitionDemandMessage cp, Collection<Integer> parts, Map<Integer, Long> partsCntrs) {
+ GridDhtPartitionDemandMessage(GridDhtPartitionDemandMessage cp, Collection<Integer> parts,
+ Map<Integer, Long> partsCntrs) {
- cacheId = cp.cacheId;
+ grpId = cp.grpId;
updateSeq = cp.updateSeq;
topic = cp.topic;
timeout = cp.timeout;
http://git-wip-us.apache.org/repos/asf/ignite/blob/08404350/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 77a645e,c9a6525..485baee
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@@ -610,18 -594,11 +606,20 @@@ public class GridDhtPartitionDemander
return;
}
- final GridDhtPartitionTopology top = cctx.dht().topology();
+ final GridDhtPartitionTopology top = grp.topology();
+ final boolean statsEnabled = cctx.config().isStatisticsEnabled();
+
+ if (statsEnabled) {
+ if (supply.estimatedKeysCount() != -1)
+ cctx.cache().metrics0().onRebalancingKeysCountEstimateReceived(supply.estimatedKeysCount());
+
+ cctx.cache().metrics0().onRebalanceBatchReceived(supply.messageSize());
+ }
+
try {
+ AffinityAssignment aff = grp.affinity().cachedAffinity(topVer);
+
// Preload.
for (Map.Entry<Integer, CacheEntryInfoCollection> e : supply.infos().entrySet()) {
int p = e.getKey();
@@@ -867,16 -836,13 +867,14 @@@
long updateSeq) {
assert assigns != null;
- this.exchFut = assigns.exchangeFuture();
- this.topVer = assigns.topologyVersion();
+ exchFut = assigns.exchangeFuture();
+ topVer = assigns.topologyVersion();
+
- this.cctx = cctx;
+ this.grp = grp;
this.log = log;
- this.startedEvtSent = startedEvtSent;
- this.stoppedEvtSent = stoppedEvtSent;
this.updateSeq = updateSeq;
+
+ ctx= grp.shared();
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/08404350/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
index 0ff03f7,ce5f9ea..0c907f5
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@@ -295,16 -285,10 +300,16 @@@ class GridDhtPartitionSupplier
IgniteRebalanceIterator iter;
if (sctx == null || sctx.entryIt == null) {
- iter = cctx.offheap().rebalanceIterator(part, d.topologyVersion(),
- iter = grp.offheap().rebalanceIterator(part, d.topologyVersion(), d.partitionCounter(part));
++ iter = grp.offheap().rebalanceIterator(part, d.topologyVersion(),
+ d.isHistorical(part) ? d.partitionCounter(part) : null);
+
+ if (!iter.historical()) {
+ assert !cctx.shared().database().persistenceEnabled() || !d.isHistorical(part);
- if (!iter.historical())
s.clean(part);
+ }
+ else
+ assert cctx.shared().database().persistenceEnabled() && d.isHistorical(part);
}
else
iter = (IgniteRebalanceIterator)sctx.entryIt;
http://git-wip-us.apache.org/repos/asf/ignite/blob/08404350/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
index 1cb32e3,5d02f3f..9f66491
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
@@@ -74,14 -75,12 +75,14 @@@ public class GridDhtPartitionSupplyMess
private Map<Integer, CacheEntryInfoCollection> infos;
/** Message size. */
- @GridDirectTransient
private int msgSize;
+ /** Estimated keys count. */
+ private long estimatedKeysCnt = -1;
+
/**
* @param updateSeq Update sequence for this node.
- * @param cacheId Cache ID.
+ * @param grpId Cache group ID.
* @param topVer Topology version.
* @param addDepInfo Deployment info flag.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/08404350/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index c75b0a2,6725773..65edd96
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@@ -57,11 -55,11 +57,12 @@@ import org.apache.ignite.internal.pagem
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage;
+ import org.apache.ignite.internal.processors.cache.CacheGroupInfrastructure;
import org.apache.ignite.internal.processors.cache.CacheInvalidStateException;
+import org.apache.ignite.internal.processors.cache.CachePartitionExchangeWorkerTask;
import org.apache.ignite.internal.processors.cache.ClusterState;
import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
-import org.apache.ignite.internal.processors.cache.CachePartitionExchangeWorkerTask;
+import org.apache.ignite.internal.processors.cache.DynamicCacheChangeRequest;
import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
import org.apache.ignite.internal.processors.cache.ExchangeActions;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
@@@ -714,16 -643,16 +733,16 @@@ public class GridDhtPartitionsExchangeF
long updSeq = clientTop == null ? -1 : clientTop.lastUpdateSequence();
- GridDhtPartitionTopology top = cacheCtx.topology();
+ GridDhtPartitionTopology top = grp.topology();
if (crd) {
- boolean updateTop = exchId.topologyVersion().equals(cacheCtx.startTopologyVersion());
+ boolean updateTop = exchId.topologyVersion().equals(grp.localStartVersion());
if (updateTop && clientTop != null)
- top.update(exchId, clientTop.partitionMap(true), clientTop.updateCounters(false));
+ top.update(this, clientTop.partitionMap(true), clientTop.updateCounters(false), Collections.<Integer>emptySet());
}
- top.updateTopologyVersion(exchId, this, updSeq, stopping(cacheCtx.cacheId()));
+ top.updateTopologyVersion(exchId, this, updSeq, cacheGroupStopping(grp.groupId()));
}
for (GridClientPartitionTopology top : cctx.exchange().clientTopologies())
@@@ -827,11 -756,12 +846,15 @@@
if (updateTop) {
for (GridClientPartitionTopology top : cctx.exchange().clientTopologies()) {
- if (top.cacheId() == cacheCtx.cacheId()) {
- cacheCtx.topology().update(this,
- top.partitionMap(true),
+ if (top.groupId() == grp.groupId()) {
+ GridDhtPartitionFullMap fullMap = top.partitionMap(true);
+
+ assert fullMap != null;
+
- grp.topology().update(exchId, fullMap, top.updateCounters(false));
++ grp.topology().update(this,
++ fullMap,
+ top.updateCounters(false),
+ Collections.<Integer>emptySet());
break;
}
@@@ -869,18 -799,13 +892,18 @@@
assert !cctx.kernalContext().clientNode();
- for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
- if (cacheCtx.isLocal())
+ for (CacheGroupInfrastructure grp : cctx.cache().cacheGroups()) {
+ if (grp.isLocal())
continue;
- cacheCtx.preloader().onTopologyChanged(this);
+ grp.preloader().onTopologyChanged(this);
}
+ cctx.database().releaseHistoryForPreloading();
+
+ // To correctly rebalance when persistence is enabled, it is necessary to reserve history within exchange.
+ partHistReserved = cctx.database().reserveHistoryForExchange();
+
waitPartitionRelease();
boolean topChanged = discoEvt.type() != EVT_DISCOVERY_CUSTOM_EVT || affChangeMsg != null;
@@@ -1212,12 -1138,9 +1248,12 @@@
@Override public boolean onDone(@Nullable AffinityTopologyVersion res, @Nullable Throwable err) {
boolean realExchange = !dummy && !forcePreload;
+ if (!done.compareAndSet(false, true))
+ return dummy;
+
if (err == null && realExchange) {
- for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
- if (cacheCtx.isLocal())
+ for (CacheGroupInfrastructure grp : cctx.cache().cacheGroups()) {
+ if (grp.isLocal())
continue;
try {
@@@ -1607,12 -1489,11 +1647,12 @@@
*/
private void assignPartitionStates(GridDhtPartitionTopology top) {
Map<Integer, CounterWithNodes> maxCntrs = new HashMap<>();
+ Map<Integer, Long> minCntrs = new HashMap<>();
- for (Map.Entry<UUID, GridDhtPartitionsAbstractMessage> e : msgs.entrySet()) {
+ for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e : msgs.entrySet()) {
- assert e.getValue().partitionUpdateCounters(top.cacheId()) != null;
+ assert e.getValue().partitionUpdateCounters(top.groupId()) != null;
- for (Map.Entry<Integer, T2<Long, Long>> e0 : e.getValue().partitionUpdateCounters(top.cacheId()).entrySet()) {
+ for (Map.Entry<Integer, T2<Long, Long>> e0 : e.getValue().partitionUpdateCounters(top.groupId()).entrySet()) {
int p = e0.getKey();
UUID uuid = e.getKey();
@@@ -1768,12 -1592,10 +1816,12 @@@
try {
assert crd.isLocal();
+ assert partHistSuppliers.isEmpty();
+
if (!crd.equals(discoCache.serverNodes().get(0))) {
- for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
- if (!cacheCtx.isLocal())
- cacheCtx.topology().beforeExchange(this, !centralizedAff);
+ for (CacheGroupInfrastructure grp : cctx.cache().cacheGroups()) {
+ if (!grp.isLocal())
+ grp.topology().beforeExchange(this, !centralizedAff);
}
}
@@@ -1975,25 -1797,20 +2023,25 @@@
private void updatePartitionFullMap(GridDhtPartitionsFullMessage msg) {
cctx.versions().onExchange(msg.lastVersion().order());
+ assert partHistSuppliers.isEmpty();
+
+ partHistSuppliers.putAll(msg.partitionHistorySuppliers());
+
for (Map.Entry<Integer, GridDhtPartitionFullMap> entry : msg.partitions().entrySet()) {
- Integer cacheId = entry.getKey();
+ Integer grpId = entry.getKey();
- Map<Integer, T2<Long, Long>> cntrMap = msg.partitionUpdateCounters(cacheId);
+ Map<Integer, T2<Long, Long>> cntrMap = msg.partitionUpdateCounters(grpId);
- GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
+ CacheGroupInfrastructure grp = cctx.cache().cacheGroup(grpId);
- if (cacheCtx != null)
- cacheCtx.topology().update(this, entry.getValue(), cntrMap,
+ if (grp != null)
- grp.topology().update(exchId, entry.getValue(), cntrMap);
++ grp.topology().update(this, entry.getValue(), cntrMap,
+ msg.partsToReload(cctx.localNodeId(), cacheId));
else {
ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(AffinityTopologyVersion.NONE);
if (oldest != null && oldest.isLocal())
- cctx.exchange().clientTopology(cacheId, this).update(this, entry.getValue(), cntrMap, Collections.<Integer>emptySet());
- cctx.exchange().clientTopology(grpId, this).update(exchId, entry.getValue(), cntrMap);
++ cctx.exchange().clientTopology(grpId, this).update(this, entry.getValue(), cntrMap, Collections.<Integer>emptySet());
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/08404350/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
index 94ad21e,f9bc5df..b64a58c
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
@@@ -121,10 -102,13 +121,15 @@@ public class GridDhtPartitionsFullMessa
assert id == null || topVer.equals(id.topologyVersion());
this.topVer = topVer;
+ this.partHistSuppliers = partHistSuppliers;
+ this.partsToReload = partsToReload;
}
+ /** {@inheritDoc} */
+ @Override public int handlerId() {
+ return 0;
+ }
+
/**
* @param compress {@code True} if it is possible to use compression for message.
*/
@@@ -175,24 -161,27 +182,23 @@@
}
/**
- * @param cacheId Cache ID.
+ * @param grpId Cache group ID.
* @param cntrMap Partition update counters.
*/
- public void addPartitionUpdateCounters(int cacheId, Map<Integer, T2<Long, Long>> cntrMap) {
+ public void addPartitionUpdateCounters(int grpId, Map<Integer, T2<Long, Long>> cntrMap) {
if (partCntrs == null)
- partCntrs = new HashMap<>();
+ partCntrs = new IgniteDhtPartitionCountersMap();
- partCntrs.putIfAbsent(cacheId, cntrMap);
- if (!partCntrs.containsKey(grpId))
- partCntrs.put(grpId, cntrMap);
++ partCntrs.putIfAbsent(grpId, cntrMap);
}
/**
- * @param cacheId Cache ID.
+ * @param grpId Cache group ID.
* @return Partition update counters.
*/
- @Override public Map<Integer, T2<Long, Long>> partitionUpdateCounters(int cacheId) {
- if (partCntrs != null) {
- return partCntrs.get(cacheId);
- }
+ @Override public Map<Integer, T2<Long, Long>> partitionUpdateCounters(int grpId) {
- if (partCntrs != null) {
- Map<Integer, T2<Long, Long>> res = partCntrs.get(grpId);
-
- return res != null ? res : Collections.<Integer, T2<Long, Long>>emptyMap();
- }
++ if (partCntrs != null)
++ return partCntrs.get(grpId);
return Collections.emptyMap();
}
@@@ -414,25 -356,13 +420,25 @@@
writer.incrementState();
- case 9:
+ case 8:
- if (!writer.writeByteArray("partsBytes", partsBytes))
+ if (!writer.writeByteArray("partHistSuppliersBytes", partHistSuppliersBytes))
return false;
writer.incrementState();
- case 10:
+ case 9:
+ if (!writer.writeByteArray("partsBytes", partsBytes))
+ return false;
+
+ writer.incrementState();
+
+ case 11:
+ if (!writer.writeByteArray("partsToReloadBytes", partsToReloadBytes))
+ return false;
+
+ writer.incrementState();
+
+ case 12:
if (!writer.writeMessage("topVer", topVer))
return false;
@@@ -478,31 -408,15 +484,31 @@@
reader.incrementState();
- case 9:
+ case 8:
- partsBytes = reader.readByteArray("partsBytes");
+ partHistSuppliersBytes = reader.readByteArray("partHistSuppliersBytes");
if (!reader.isLastRead())
return false;
reader.incrementState();
- case 10:
+ case 9:
+ partsBytes = reader.readByteArray("partsBytes");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 11:
+ partsToReloadBytes = reader.readByteArray("partsToReloadBytes");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 12:
topVer = reader.readMessage("topVer");
if (!reader.isLastRead())
http://git-wip-us.apache.org/repos/asf/ignite/blob/08404350/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
index 9222251,416b127..9e399f1
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
@@@ -375,13 -320,7 +380,13 @@@ public class GridDhtPartitionsSingleMes
writer.incrementState();
- case 10:
+ case 9:
+ if (!writer.writeByteArray("partHistCntrsBytes", partHistCntrsBytes))
+ return false;
+
+ writer.incrementState();
+
+ case 11:
if (!writer.writeByteArray("partsBytes", partsBytes))
return false;
@@@ -435,15 -374,7 +440,15 @@@
reader.incrementState();
- case 10:
+ case 9:
+ partHistCntrsBytes = reader.readByteArray("partHistCntrsBytes");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 11:
partsBytes = reader.readByteArray("partsBytes");
if (!reader.isLastRead())