You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/05/05 14:47:41 UTC
[1/4] ignite git commit: ignite-5075
Repository: ignite
Updated Branches:
refs/heads/ignite-5075 e6ebae167 -> 0096266b5
http://git-wip-us.apache.org/repos/asf/ignite/blob/0096266b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
index 59d986a..8711bf9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
@@ -119,7 +119,6 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda
/** {@inheritDoc} */
@Override public void onReconnected() {
map = new GridCacheLocalConcurrentMap(
- ctx,
entryFactory(),
ctx.config().getNearConfiguration().getNearStartSize());
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/0096266b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManagerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManagerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManagerSelfTest.java
index 18c0b32..52f19b7 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManagerSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManagerSelfTest.java
@@ -112,7 +112,8 @@ public class GridCacheTtlManagerSelfTest extends GridCommonAbstractTest {
assertNull(g.cache(DEFAULT_CACHE_NAME).get(key));
if (!g.internalCache(DEFAULT_CACHE_NAME).context().deferredDelete())
- assertNull(g.internalCache(DEFAULT_CACHE_NAME).map().getEntry(g.internalCache(DEFAULT_CACHE_NAME).context().toCacheKeyObject(key)));
+ assertNull(g.internalCache(DEFAULT_CACHE_NAME).map().getEntry(g.internalCache(DEFAULT_CACHE_NAME).context(),
+ g.internalCache(DEFAULT_CACHE_NAME).context().toCacheKeyObject(key)));
}
});
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/0096266b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxStoreExceptionAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxStoreExceptionAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxStoreExceptionAbstractSelfTest.java
index 872fe77..863ab38 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxStoreExceptionAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxStoreExceptionAbstractSelfTest.java
@@ -370,7 +370,7 @@ public abstract class IgniteTxStoreExceptionAbstractSelfTest extends GridCacheAb
GridCacheAdapter cache = grid.internalCache(DEFAULT_CACHE_NAME);
- GridCacheMapEntry entry = cache.map().getEntry(cache.context().toCacheKeyObject(key));
+ GridCacheMapEntry entry = cache.map().getEntry(cache.context(), cache.context().toCacheKeyObject(key));
log.info("Entry: " + entry);
@@ -383,7 +383,7 @@ public abstract class IgniteTxStoreExceptionAbstractSelfTest extends GridCacheAb
}
if (cache.isNear()) {
- entry = ((GridNearCacheAdapter)cache).dht().map().getEntry(cache.context().toCacheKeyObject(key));
+ entry = ((GridNearCacheAdapter)cache).dht().map().getEntry(cache.context(), cache.context().toCacheKeyObject(key));
log.info("Dht entry: " + entry);
http://git-wip-us.apache.org/repos/asf/ignite/blob/0096266b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheTtlCleanupSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheTtlCleanupSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheTtlCleanupSelfTest.java
index 9d21823..227fe1f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheTtlCleanupSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheTtlCleanupSelfTest.java
@@ -82,6 +82,6 @@ public class IgniteCacheTtlCleanupSelfTest extends GridCacheAbstractSelfTest {
CacheObjectContext cacheObjCtx = cacheAdapter.context().cacheObjectContext();
for (int i = 0; i < 100; i++)
- assertNull(cacheAdapter.map().getEntry(cacheObjects.toCacheKeyObject(cacheObjCtx, null, i, true)));
+ assertNull(cacheAdapter.map().getEntry(cacheAdapter.context(), cacheObjects.toCacheKeyObject(cacheObjCtx, null, i, true)));
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/0096266b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticDeadlockDetectionTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticDeadlockDetectionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticDeadlockDetectionTest.java
index b48c4ba..1573576 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticDeadlockDetectionTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticDeadlockDetectionTest.java
@@ -399,7 +399,7 @@ public class TxOptimisticDeadlockDetectionTest extends GridCommonAbstractTest {
KeyCacheObject keyCacheObj = intCache.context().toCacheKeyObject(key0);
- GridCacheMapEntry entry = map.getEntry(keyCacheObj);
+ GridCacheMapEntry entry = map.getEntry(intCache.context(), keyCacheObj);
if (entry != null)
assertNull("Entry still has locks " + entry, entry.mvccAllLocal());
http://git-wip-us.apache.org/repos/asf/ignite/blob/0096266b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPessimisticDeadlockDetectionTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPessimisticDeadlockDetectionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPessimisticDeadlockDetectionTest.java
index bc297a2..61f7125 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPessimisticDeadlockDetectionTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPessimisticDeadlockDetectionTest.java
@@ -408,7 +408,7 @@ public class TxPessimisticDeadlockDetectionTest extends GridCommonAbstractTest {
KeyCacheObject keyCacheObj = intCache.context().toCacheKeyObject(key0);
- GridCacheMapEntry entry = map.getEntry(keyCacheObj);
+ GridCacheMapEntry entry = map.getEntry(intCache.context(), keyCacheObj);
if (entry != null)
assertNull("Entry still has locks " + entry, entry.mvccAllLocal());
http://git-wip-us.apache.org/repos/asf/ignite/blob/0096266b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
index e301993..5a7dbc1 100644
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
+++ b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
@@ -84,9 +84,6 @@ public class GridCacheTestContext<K, V> extends GridCacheContext<K, V> {
AffinityTopologyVersion.ZERO,
true,
true,
- null,
- null,
- null,
new GridCacheEventManager(),
new CacheOsStoreManager(null, new CacheConfiguration()),
new GridCacheEvictionManager(),
[2/4] ignite git commit: ignite-5075
Posted by sb...@apache.org.
http://git-wip-us.apache.org/repos/asf/ignite/blob/0096266b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 6634e98..e942b5b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -549,7 +549,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
cntrMap.clear();
// If this is the oldest node.
- if (oldest != null && (loc.equals(oldest) || exchFut.isCacheAdded(cctx.cacheId(), exchId.topologyVersion()))) {
+ if (oldest != null && (loc.equals(oldest) || grp.localStartVersion().equals(exchId.topologyVersion()))) {
if (node2part == null) {
node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq);
@@ -598,6 +598,10 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
waitForRent();
}
+ private boolean partitionLocalNode(int p, AffinityTopologyVersion topVer) {
+ return grp.affinity().nodes(p, topVer).contains(ctx.localNode());
+ }
+
/** {@inheritDoc} */
@Override public boolean afterExchange(GridDhtPartitionsExchangeFuture exchFut) throws IgniteCheckedException {
treatAllPartAsLoc = false;
@@ -631,7 +635,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
for (int p = 0; p < num; p++) {
GridDhtLocalPartition locPart = localPartition(p, topVer, false, false);
- if (grp.affinity().partitionLocalNode(p, topVer)) {
+ if (partitionLocalNode(p, topVer)) {
// This partition will be created during next topology event,
// which obviously has not happened at this point.
if (locPart == null) {
@@ -724,11 +728,12 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
GridDhtLocalPartition loc = locParts.get(p);
if (loc == null || loc.state() == EVICTED) {
- locParts.set(p, loc = new GridDhtLocalPartition(cctx, p, entryFactory));
+ locParts.set(p, loc = new GridDhtLocalPartition(ctx, grp, p, entryFactory));
- if (cctx.shared().pageStore() != null) {
+ if (ctx.pageStore() != null) {
try {
- cctx.shared().pageStore().onPartitionCreated(cctx.cacheId(), p);
+ // TODO IGNITE-5075.
+ ctx.pageStore().onPartitionCreated(grp.groupId(), p);
}
catch (IgniteCheckedException e) {
// TODO ignite-db
@@ -758,7 +763,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
GridDhtPartitionState state = loc != null ? loc.state() : null;
- if (loc != null && state != EVICTED && (state != RENTING || !cctx.allowFastEviction()))
+ if (loc != null && state != EVICTED && (state != RENTING || !grp.allowFastEviction()))
return loc;
if (!create)
@@ -773,7 +778,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
state = loc != null ? loc.state() : null;
- boolean belongs = cctx.affinity().partitionLocalNode(p, topVer);
+ boolean belongs = partitionLocalNode(p, topVer);
if (loc != null && state == EVICTED) {
locParts.set(p, loc = null);
@@ -783,7 +788,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
"(often may be caused by inconsistent 'key.hashCode()' implementation) " +
"[part=" + p + ", topVer=" + topVer + ", this.topVer=" + this.topVer + ']');
}
- else if (loc != null && state == RENTING && cctx.allowFastEviction())
+ else if (loc != null && state == RENTING && grp.allowFastEviction())
throw new GridDhtInvalidPartitionException(p, "Adding entry to partition that is concurrently evicted.");
if (loc == null) {
@@ -792,7 +797,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
"local node (often may be caused by inconsistent 'key.hashCode()' implementation) " +
"[part=" + p + ", topVer=" + topVer + ", this.topVer=" + this.topVer + ']');
- locParts.set(p, loc = new GridDhtLocalPartition(cctx, p, entryFactory));
+ locParts.set(p, loc = new GridDhtLocalPartition(ctx, grp, p, entryFactory));
if (updateSeq)
this.updateSeq.incrementAndGet();
@@ -807,9 +812,10 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
lock.writeLock().unlock();
}
- if (created && cctx.shared().pageStore() != null) {
+ if (created && ctx.pageStore() != null) {
try {
- cctx.shared().pageStore().onPartitionCreated(cctx.cacheId(), p);
+ // TODO IGNITE-5075.
+ ctx.pageStore().onPartitionCreated(grp.groupId(), p);
}
catch (IgniteCheckedException e) {
// TODO ignite-db
@@ -834,8 +840,8 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
}
/** {@inheritDoc} */
- @Override public GridDhtLocalPartition localPartition(Object key, boolean create) {
- return localPartition(cctx.affinity().partition(key), AffinityTopologyVersion.NONE, create);
+ @Override public GridDhtLocalPartition localPartition(int part) {
+ return locParts.get(part);
}
/** {@inheritDoc} */
@@ -891,7 +897,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
map.put(i, part.state());
}
- return new GridDhtPartitionMap(cctx.nodeId(),
+ return new GridDhtPartitionMap(ctx.localNodeId(),
updateSeq.get(),
topVer,
Collections.unmodifiableMap(map),
@@ -931,7 +937,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
/** {@inheritDoc} */
@Override public List<ClusterNode> nodes(int p, AffinityTopologyVersion topVer) {
- AffinityAssignment affAssignment = cctx.affinity().assignment(topVer);
+ AffinityAssignment affAssignment = grp.affinity().cachedAffinity(topVer);
List<ClusterNode> affNodes = affAssignment.get(p);
@@ -954,8 +960,8 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
try {
assert node2part != null && node2part.valid() : "Invalid node-to-partitions map [topVer1=" + topVer +
", topVer2=" + this.topVer +
- ", node=" + cctx.igniteInstanceName() +
- ", cache=" + cctx.name() +
+ ", node=" + ctx.igniteInstanceName() +
+ ", grp=" + grp.name() +
", node2part=" + node2part + ']';
List<ClusterNode> nodes = null;
@@ -967,7 +973,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
HashSet<UUID> affIds = affAssignment.getIds(p);
if (!affIds.contains(nodeId) && hasState(p, nodeId, OWNING, MOVING, RENTING)) {
- ClusterNode n = cctx.discovery().node(nodeId);
+ ClusterNode n = ctx.discovery().node(nodeId);
if (n != null && (topVer.topologyVersion() < 0 || n.order() <= topVer.topologyVersion())) {
if (nodes == null) {
@@ -1001,7 +1007,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
GridDhtPartitionState state,
GridDhtPartitionState... states) {
Collection<UUID> allIds = topVer.topologyVersion() > 0 ?
- F.nodeIds(discoCache.cacheGroupAffinityNodes(cctx.group().groupId())) :
+ F.nodeIds(discoCache.cacheGroupAffinityNodes(grp.groupId())) :
null;
lock.readLock().lock();
@@ -1010,7 +1016,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
assert node2part != null && node2part.valid() : "Invalid node-to-partitions map [topVer=" + topVer +
", allIds=" + allIds +
", node2part=" + node2part +
- ", cache=" + cctx.name() + ']';
+ ", grp=" + grp.name() + ']';
Collection<UUID> nodeIds = part2node.get(p);
@@ -1027,7 +1033,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
continue;
if (hasState(p, id, state, states)) {
- ClusterNode n = cctx.discovery().node(id);
+ ClusterNode n = ctx.discovery().node(id);
if (n != null && (topVer.topologyVersion() < 0 || n.order() <= topVer.topologyVersion()))
nodes.add(n);
@@ -1043,7 +1049,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
/** {@inheritDoc} */
@Override public List<ClusterNode> owners(int p, AffinityTopologyVersion topVer) {
- if (!cctx.rebalanceEnabled())
+ if (!grp.rebalanceEnabled())
return ownersAndMoving(p, topVer);
return nodes(p, topVer, OWNING);
@@ -1056,7 +1062,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
/** {@inheritDoc} */
@Override public List<ClusterNode> moving(int p) {
- if (!cctx.rebalanceEnabled())
+ if (!grp.rebalanceEnabled())
return ownersAndMoving(p, AffinityTopologyVersion.NONE);
return nodes(p, AffinityTopologyVersion.NONE, MOVING);
@@ -1082,11 +1088,10 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
try {
assert node2part != null && node2part.valid() : "Invalid node2part [node2part: " + node2part +
- ", cache=" + cctx.name() +
- ", started=" + cctx.started() +
+ ", grp=" + grp.name() +
", stopping=" + stopping +
- ", locNodeId=" + cctx.localNode().id() +
- ", locName=" + cctx.igniteInstanceName() + ']';
+ ", locNodeId=" + ctx.localNode().id() +
+ ", locName=" + ctx.igniteInstanceName() + ']';
GridDhtPartitionFullMap m = node2part;
@@ -1168,7 +1173,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
// then we keep the newer value.
if (newPart != null &&
(newPart.updateSequence() < part.updateSequence() ||
- (cctx.cacheStartTopologyVersion().compareTo(newPart.topologyVersion()) > 0))
+ (grp.groupStartVersion().compareTo(newPart.topologyVersion()) > 0))
) {
if (log.isDebugEnabled())
log.debug("Overriding partition map in full update map [exchId=" + exchId + ", curPart=" +
@@ -1182,7 +1187,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
for (Iterator<UUID> it = partMap.keySet().iterator(); it.hasNext(); ) {
UUID nodeId = it.next();
- if (!cctx.discovery().alive(nodeId)) {
+ if (!ctx.discovery().alive(nodeId)) {
if (log.isDebugEnabled())
log.debug("Removing left node from full map update [nodeId=" + nodeId + ", partMap=" +
partMap + ']');
@@ -1194,7 +1199,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
node2part = partMap;
- Map<Integer, Set<UUID>> p2n = new HashMap<>(cctx.affinity().partitions(), 1.0f);
+ Map<Integer, Set<UUID>> p2n = new HashMap<>(grp.affinity().partitions(), 1.0f);
for (Map.Entry<UUID, GridDhtPartitionMap> e : partMap.entrySet()) {
for (Integer p : e.getValue().keySet()) {
@@ -1213,11 +1218,11 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
boolean changed = false;
- AffinityTopologyVersion affVer = cctx.affinity().affinityTopologyVersion();
+ AffinityTopologyVersion affVer = grp.affinity().lastVersion();
- GridDhtPartitionMap nodeMap = partMap.get(cctx.localNodeId());
+ GridDhtPartitionMap nodeMap = partMap.get(ctx.localNodeId());
- if (nodeMap != null && cctx.shared().database().persistenceEnabled()) {
+ if (nodeMap != null && ctx.database().persistenceEnabled()) {
for (Map.Entry<Integer, GridDhtPartitionState> e : nodeMap.entrySet()) {
int p = e.getKey();
GridDhtPartitionState state = e.getValue();
@@ -1244,7 +1249,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
}
if (!affVer.equals(AffinityTopologyVersion.NONE) && affVer.compareTo(topVer) >= 0) {
- List<List<ClusterNode>> aff = cctx.affinity().assignments(topVer);
+ List<List<ClusterNode>> aff = grp.affinity().assignments(topVer);
changed |= checkEvictions(updateSeq, aff);
@@ -1257,7 +1262,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
log.debug("Partition map after full update: " + fullMapString());
if (changed)
- cctx.shared().exchange().scheduleResendPartitions();
+ ctx.exchange().scheduleResendPartitions();
return changed ? localPartitionMap() : null;
}
@@ -1276,7 +1281,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
if (log.isDebugEnabled())
log.debug("Updating single partition map [exchId=" + exchId + ", parts=" + mapString(parts) + ']');
- if (!cctx.discovery().alive(parts.nodeId())) {
+ if (!ctx.discovery().alive(parts.nodeId())) {
if (log.isDebugEnabled())
log.debug("Received partition update for non-existing node (will ignore) [exchId=" + exchId +
", parts=" + parts + ']');
@@ -1371,10 +1376,10 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
}
}
- AffinityTopologyVersion affVer = cctx.affinity().affinityTopologyVersion();
+ AffinityTopologyVersion affVer = grp.affinity().lastVersion();
if (!affVer.equals(AffinityTopologyVersion.NONE) && affVer.compareTo(topVer) >= 0) {
- List<List<ClusterNode>> aff = cctx.affinity().assignments(topVer);
+ List<List<ClusterNode>> aff = grp.affinity().assignments(topVer);
changed |= checkEvictions(updateSeq, aff);
@@ -1387,7 +1392,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
log.debug("Partition map after single update: " + fullMapString());
if (changed)
- cctx.shared().exchange().scheduleResendPartitions();
+ ctx.exchange().scheduleResendPartitions();
return changed ? localPartitionMap() : null;
}
@@ -1401,7 +1406,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
lock.writeLock().lock();
try {
- int parts = cctx.affinity().partitions();
+ int parts = grp.affinity().partitions();
Collection<Integer> lost = null;
@@ -1435,7 +1440,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
boolean changed = false;
if (lost != null) {
- PartitionLossPolicy plc = cctx.config().getPartitionLossPolicy();
+ PartitionLossPolicy plc = grp.config().getPartitionLossPolicy();
assert plc != null;
@@ -1467,13 +1472,14 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
}
}
- if (cctx.events().isRecordable(EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST))
- cctx.events().addPreloadEvent(part, EVT_CACHE_REBALANCE_PART_DATA_LOST,
- discoEvt.eventNode(), discoEvt.type(), discoEvt.timestamp());
+// TODO: IGNITE-5075.
+// if (cctx.events().isRecordable(EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST))
+// cctx.events().addPreloadEvent(part, EVT_CACHE_REBALANCE_PART_DATA_LOST,
+// discoEvt.eventNode(), discoEvt.type(), discoEvt.timestamp());
}
if (plc != PartitionLossPolicy.IGNORE)
- cctx.needsRecovery(true);
+ grp.needsRecovery(true);
}
return changed;
@@ -1488,7 +1494,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
lock.writeLock().lock();
try {
- int parts = cctx.affinity().partitions();
+ int parts = grp.affinity().partitions();
long updSeq = updateSeq.incrementAndGet();
for (int part = 0; part < parts; part++) {
@@ -1527,9 +1533,9 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
}
}
- checkEvictions(updSeq, cctx.affinity().assignments(topVer));
+ checkEvictions(updSeq, grp.affinity().assignments(topVer));
- cctx.needsRecovery(false);
+ grp.needsRecovery(false);
}
finally {
lock.writeLock().unlock();
@@ -1543,7 +1549,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
try {
Collection<Integer> res = null;
- int parts = cctx.affinity().partitions();
+ int parts = grp.affinity().partitions();
for (int part = 0; part < parts; part++) {
Set<UUID> nodeIds = part2node.get(part);
@@ -1580,7 +1586,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
GridDhtLocalPartition locPart = locParts.get(p);
if (locPart != null) {
- if (locPart.state() == OWNING && !owners.contains(cctx.localNodeId()))
+ if (locPart.state() == OWNING && !owners.contains(ctx.localNodeId()))
locPart.moving();
}
@@ -1605,12 +1611,12 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
* @return {@code True} if state changed.
*/
private boolean checkEvictions(long updateSeq) {
- AffinityTopologyVersion affVer = cctx.affinity().affinityTopologyVersion();
+ AffinityTopologyVersion affVer = grp.affinity().lastVersion();
boolean changed = false;
if (!affVer.equals(AffinityTopologyVersion.NONE) && affVer.compareTo(topVer) >= 0) {
- List<List<ClusterNode>> aff = cctx.affinity().assignments(topVer);
+ List<List<ClusterNode>> aff = grp.affinity().assignments(topVer);
changed = checkEvictions(updateSeq, aff);
@@ -1642,12 +1648,12 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
* @return Checks if any of the local partitions need to be evicted.
*/
private boolean checkEvictions(long updateSeq, List<List<ClusterNode>> aff) {
- if (!cctx.kernalContext().state().active())
+ if (!ctx.kernalContext().state().active())
return false;
boolean changed = false;
- UUID locId = cctx.nodeId();
+ UUID locId = ctx.localNodeId();
for (int p = 0; p < locParts.length(); p++) {
GridDhtLocalPartition part = locParts.get(p);
@@ -1660,7 +1666,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
if (state.active()) {
List<ClusterNode> affNodes = aff.get(p);
- if (!affNodes.contains(cctx.localNode())) {
+ if (!affNodes.contains(ctx.localNode())) {
List<ClusterNode> nodes = nodes(p, topVer, OWNING);
Collection<UUID> nodeIds = F.nodeIds(nodes);
@@ -1723,10 +1729,10 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
private long updateLocal(int p, GridDhtPartitionState state, long updateSeq) {
ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache();
- assert oldest != null || cctx.kernalContext().clientNode();
+ assert oldest != null || ctx.kernalContext().clientNode();
// If this node became the oldest node.
- if (cctx.localNode().equals(oldest)) {
+ if (ctx.localNode().equals(oldest)) {
long seq = node2part.updateSequence();
if (seq != updateSeq) {
@@ -1753,7 +1759,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
}
if (node2part != null) {
- UUID locNodeId = cctx.localNodeId();
+ UUID locNodeId = ctx.localNodeId();
GridDhtPartitionMap map = node2part.get(locNodeId);
@@ -1790,9 +1796,9 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
ClusterNode oldest = discoCache.oldestAliveServerNode();
- assert oldest != null || cctx.kernalContext().clientNode();
+ assert oldest != null || ctx.kernalContext().clientNode();
- ClusterNode loc = cctx.localNode();
+ ClusterNode loc = ctx.localNode();
if (node2part != null) {
if (loc.equals(oldest) && !node2part.nodeId().equals(loc.id())) {
@@ -1937,11 +1943,10 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
try {
assert node2part != null && node2part.valid() : "Invalid node2part [node2part: " + node2part +
- ", cache=" + cctx.name() +
- ", started=" + cctx.started() +
+ ", grp=" + grp.name() +
", stopping=" + stopping +
- ", locNodeId=" + cctx.localNode().id() +
- ", locName=" + cctx.igniteInstanceName() + ']';
+ ", locNodeId=" + ctx.localNodeId() +
+ ", locName=" + ctx.igniteInstanceName() + ']';
for (GridDhtPartitionMap map : node2part.values()) {
if (map.hasMovingPartitions())
@@ -1957,8 +1962,8 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
/** {@inheritDoc} */
@Override public void printMemoryStats(int threshold) {
- X.println(">>> Cache partition topology stats [igniteInstanceName=" + cctx.igniteInstanceName() +
- ", cache=" + cctx.name() + ']');
+ X.println(">>> Cache partition topology stats [igniteInstanceName=" + ctx.igniteInstanceName() +
+ ", grp=" + grp.name() + ']');
lock.readLock().lock();
@@ -1986,7 +1991,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
* @return {@code True} if given partition belongs to local node.
*/
private boolean localNode(int part, List<List<ClusterNode>> aff) {
- return aff.get(part).contains(cctx.localNode());
+ return aff.get(part).contains(ctx.localNode());
}
/**
@@ -1997,7 +2002,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
if (node2part == null || !node2part.valid())
return;
- for (int i = 0; i < cctx.affinity().partitions(); i++) {
+ for (int i = 0; i < grp.affinity().partitions(); i++) {
List<ClusterNode> affNodes = aff.get(i);
// Topology doesn't contain server nodes (just clients).
@@ -2013,7 +2018,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
rebalancedTopVer = topVer;
if (log.isDebugEnabled())
- log.debug("Updated rebalanced version [cache=" + cctx.name() + ", ver=" + rebalancedTopVer + ']');
+ log.debug("Updated rebalanced version [cache=" + grp.name() + ", ver=" + rebalancedTopVer + ']');
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/0096266b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
index c91eb7a..9b61f14 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
@@ -118,10 +118,6 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
@Override public void start() throws IgniteCheckedException {
super.start();
- preldr = new GridDhtPreloader(ctx);
-
- preldr.start();
-
ctx.io().addHandler(ctx.cacheId(), GridNearGetRequest.class, new CI2<UUID, GridNearGetRequest>() {
@Override public void apply(UUID nodeId, GridNearGetRequest req) {
processNearGetRequest(nodeId, req);
@@ -382,7 +378,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
}
IgniteInternalFuture<Object> keyFut = F.isEmpty(req.keys()) ? null :
- ctx.dht().dhtPreloader().request(req.keys(), req.topologyVersion());
+ ctx.group().preloader().request(ctx, req.keys(), req.topologyVersion());
if (keyFut == null || keyFut.isDone()) {
if (keyFut != null) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/0096266b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index 2292cb2..10ed584 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -899,7 +899,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
) {
assert keys != null;
- IgniteInternalFuture<Object> keyFut = ctx.dht().dhtPreloader().request(keys, topVer);
+ IgniteInternalFuture<Object> keyFut = ctx.group().preloader().request(cacheCtx, keys, topVer);
// Prevent embedded future creation if possible.
if (keyFut == null || keyFut.isDone()) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/0096266b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 75cbd00..505df91 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -38,13 +38,16 @@ import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheEntryInfoCollection;
+import org.apache.ignite.internal.processors.cache.CacheGroupInfrastructure;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
import org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
@@ -78,7 +81,10 @@ import static org.apache.ignite.internal.processors.dr.GridDrType.DR_PRELOAD;
@SuppressWarnings("NonConstantFieldWithUpperCaseName")
public class GridDhtPartitionDemander {
/** */
- private final GridCacheContext<?, ?> cctx;
+ private final GridCacheSharedContext<?, ?> ctx;
+
+ /** */
+ private final CacheGroupInfrastructure grp;
/** */
private final IgniteLogger log;
@@ -116,16 +122,18 @@ public class GridDhtPartitionDemander {
private final AtomicBoolean stoppedEvtSent = new AtomicBoolean();
/**
- * @param cctx Cctx.
+ * @param grp Ccahe group.
*/
- public GridDhtPartitionDemander(GridCacheContext<?, ?> cctx) {
- assert cctx != null;
+ public GridDhtPartitionDemander(CacheGroupInfrastructure grp) {
+ assert grp != null;
+
+ this.grp = grp;
- this.cctx = cctx;
+ ctx = grp.shared();
- log = cctx.logger(getClass());
+ log = ctx.logger(getClass());
- boolean enabled = cctx.rebalanceEnabled() && !cctx.kernalContext().clientNode();
+ boolean enabled = grp.rebalanceEnabled() && !ctx.kernalContext().clientNode();
rebalanceFut = new RebalanceFuture();//Dummy.
@@ -137,7 +145,7 @@ public class GridDhtPartitionDemander {
Map<Integer, Object> tops = new HashMap<>();
- for (int idx = 0; idx < cctx.gridConfig().getRebalanceThreadPoolSize(); idx++)
+ for (int idx = 0; idx < grp.shared().kernalContext().config().getRebalanceThreadPoolSize(); idx++)
tops.put(idx, GridCachePartitionExchangeManager.rebalanceTopic(idx));
rebalanceTopics = tops;
@@ -196,7 +204,7 @@ public class GridDhtPartitionDemander {
GridTimeoutObject obj = lastTimeoutObj.getAndSet(null);
if (obj != null)
- cctx.time().removeTimeoutObject(obj);
+ ctx.time().removeTimeoutObject(obj);
final GridDhtPartitionsExchangeFuture exchFut = lastExchangeFut;
@@ -208,7 +216,7 @@ public class GridDhtPartitionDemander {
exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
@Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
- IgniteInternalFuture<Boolean> fut0 = cctx.shared().exchange().forceRebalance(exchFut);
+ IgniteInternalFuture<Boolean> fut0 = ctx.exchange().forceRebalance(exchFut);
fut0.listen(new IgniteInClosure<IgniteInternalFuture<Boolean>>() {
@Override public void apply(IgniteInternalFuture<Boolean> future) {
@@ -237,7 +245,7 @@ public class GridDhtPartitionDemander {
*/
private boolean topologyChanged(RebalanceFuture fut) {
return
- !cctx.affinity().affinityTopologyVersion().equals(fut.topologyVersion()) || // Topology already changed.
+ !grp.affinity().lastVersion().equals(fut.topologyVersion()) || // Topology already changed.
fut != rebalanceFut; // Same topology, but dummy exchange forced because of missing partitions.
}
@@ -249,7 +257,8 @@ public class GridDhtPartitionDemander {
private void preloadEvent(int part, int type, DiscoveryEvent discoEvt) {
assert discoEvt != null;
- cctx.events().addPreloadEvent(part, type, discoEvt.eventNode(), discoEvt.type(), discoEvt.timestamp());
+ // TODO IGNITE-5075.
+ // cctx.events().addPreloadEvent(part, type, discoEvt.eventNode(), discoEvt.type(), discoEvt.timestamp());
}
/**
@@ -279,12 +288,12 @@ public class GridDhtPartitionDemander {
assert force == (forcedRebFut != null);
- long delay = cctx.config().getRebalanceDelay();
+ long delay = grp.config().getRebalanceDelay();
if (delay == 0 || force) {
final RebalanceFuture oldFut = rebalanceFut;
- final RebalanceFuture fut = new RebalanceFuture(assigns, cctx, log, startedEvtSent, stoppedEvtSent, cnt);
+ final RebalanceFuture fut = new RebalanceFuture(grp, assigns, log, startedEvtSent, stoppedEvtSent, cnt);
if (!oldFut.isInitial())
oldFut.cancel();
@@ -330,7 +339,7 @@ public class GridDhtPartitionDemander {
fut.onDone(true);
- ((GridFutureAdapter)cctx.preloader().syncFuture()).onDone();
+ ((GridFutureAdapter)grp.preloader().syncFuture()).onDone();
fut.sendRebalanceFinishedEvent();
@@ -381,7 +390,7 @@ public class GridDhtPartitionDemander {
GridTimeoutObject obj = lastTimeoutObj.get();
if (obj != null)
- cctx.time().removeTimeoutObject(obj);
+ ctx.time().removeTimeoutObject(obj);
final GridDhtPartitionsExchangeFuture exchFut = lastExchangeFut;
@@ -391,7 +400,7 @@ public class GridDhtPartitionDemander {
@Override public void onTimeout() {
exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
@Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> f) {
- cctx.shared().exchange().forceRebalance(exchFut);
+ ctx.exchange().forceRebalance(exchFut);
}
});
}
@@ -399,7 +408,7 @@ public class GridDhtPartitionDemander {
lastTimeoutObj.set(obj);
- cctx.time().addTimeoutObject(obj);
+ ctx.time().addTimeoutObject(obj);
}
return null;
@@ -433,17 +442,19 @@ public class GridDhtPartitionDemander {
Collection<Integer> parts= e.getValue().partitions();
- assert parts != null : "Partitions are null [cache=" + cctx.name() + ", fromNode=" + nodeId + "]";
+ assert parts != null : "Partitions are null [grp=" + grp.name() + ", fromNode=" + nodeId + "]";
fut.remaining.put(nodeId, new T2<>(U.currentTimeMillis(), parts));
}
}
+ final CacheConfiguration cfg = grp.config();
+
+ int lsnrCnt = ctx.gridConfig().getRebalanceThreadPoolSize();
+
for (Map.Entry<ClusterNode, GridDhtPartitionDemandMessage> e : assigns.entrySet()) {
final ClusterNode node = e.getKey();
- final CacheConfiguration cfg = cctx.config();
-
final Collection<Integer> parts = fut.remaining.get(node.id()).get2();
GridDhtPartitionDemandMessage d = e.getValue();
@@ -452,8 +463,6 @@ public class GridDhtPartitionDemander {
", fromNode=" + node.id() + ", partitionsCount=" + parts.size() +
", topology=" + fut.topologyVersion() + ", updateSeq=" + fut.updateSeq + "]");
- int lsnrCnt = cctx.gridConfig().getRebalanceThreadPoolSize();
-
List<Set<Integer>> sParts = new ArrayList<>(lsnrCnt);
for (int cnt = 0; cnt < lsnrCnt; cnt++)
@@ -473,15 +482,15 @@ public class GridDhtPartitionDemander {
initD.topic(rebalanceTopics.get(cnt));
initD.updateSequence(fut.updateSeq);
- initD.timeout(cctx.config().getRebalanceTimeout());
+ initD.timeout(grp.config().getRebalanceTimeout());
synchronized (fut) {
if (fut.isDone())
return;// Future can be already cancelled at this moment and all failovers happened.
// New requests will not be covered by failovers.
- cctx.io().sendOrderedMessage(node,
- rebalanceTopics.get(cnt), initD, cctx.ioPolicy(), initD.timeout());
+ ctx.io().sendOrderedMessage(node,
+ rebalanceTopics.get(cnt), initD, grp.ioPolicy(), initD.timeout());
}
@@ -505,11 +514,11 @@ public class GridDhtPartitionDemander {
for (Integer part : parts) {
try {
- if (cctx.shared().database().persistenceEnabled()) {
+ if (ctx.database().persistenceEnabled()) {
if (partCntrs == null)
partCntrs = new HashMap<>(parts.size(), 1.0f);
- GridDhtLocalPartition p = cctx.topology().localPartition(part, old.topologyVersion(), false);
+ GridDhtLocalPartition p = grp.topology().localPartition(part, old.topologyVersion(), false);
partCntrs.put(part, p.initialUpdateCounter());
}
@@ -585,7 +594,7 @@ public class GridDhtPartitionDemander {
final RebalanceFuture fut = rebalanceFut;
- ClusterNode node = cctx.node(id);
+ ClusterNode node = ctx.node(id);
if (node == null)
return;
@@ -609,14 +618,16 @@ public class GridDhtPartitionDemander {
return;
}
- final GridDhtPartitionTopology top = cctx.dht().topology();
+ final GridDhtPartitionTopology top = grp.topology();
try {
+ AffinityAssignment aff = grp.affinity().cachedAffinity(topVer);
+
// Preload.
for (Map.Entry<Integer, CacheEntryInfoCollection> e : supply.infos().entrySet()) {
int p = e.getKey();
- if (cctx.affinity().partitionLocalNode(p, topVer)) {
+ if (aff.get(p).contains(ctx.localNode())) {
GridDhtLocalPartition part = top.localPartition(p, topVer, true);
assert part != null;
@@ -627,7 +638,7 @@ public class GridDhtPartitionDemander {
boolean reserved = part.reserve();
assert reserved : "Failed to reserve partition [igniteInstanceName=" +
- cctx.igniteInstanceName() + ", cacheName=" + cctx.namex() + ", part=" + part + ']';
+ ctx.igniteInstanceName() + ", grp=" + grp.name() + ", part=" + part + ']';
part.lock();
@@ -686,7 +697,7 @@ public class GridDhtPartitionDemander {
// Only request partitions based on latest topology version.
for (Integer miss : supply.missed()) {
- if (cctx.affinity().partitionLocalNode(miss, topVer))
+ if (aff.get(miss).contains(ctx.localNode()))
fut.partitionMissed(id, miss);
}
@@ -696,14 +707,14 @@ public class GridDhtPartitionDemander {
GridDhtPartitionDemandMessage d = new GridDhtPartitionDemandMessage(
supply.updateSequence(), supply.topologyVersion(), cctx.cacheId());
- d.timeout(cctx.config().getRebalanceTimeout());
+ d.timeout(grp.config().getRebalanceTimeout());
d.topic(rebalanceTopics.get(idx));
if (!topologyChanged(fut) && !fut.isDone()) {
// Send demand message.
- cctx.io().sendOrderedMessage(node, rebalanceTopics.get(idx),
- d, cctx.ioPolicy(), cctx.config().getRebalanceTimeout());
+ ctx.io().sendOrderedMessage(node, rebalanceTopics.get(idx),
+ d, grp.ioPolicy(), grp.config().getRebalanceTimeout());
}
}
catch (IgniteCheckedException e) {
@@ -732,12 +743,15 @@ public class GridDhtPartitionDemander {
GridCacheEntryInfo entry,
AffinityTopologyVersion topVer
) throws IgniteCheckedException {
- cctx.shared().database().checkpointReadLock();
+ ctx.database().checkpointReadLock();
try {
GridCacheEntryEx cached = null;
try {
+ // TODO IGNITE-5075.
+ GridCacheContext cctx = grp.cacheContext();
+
cached = cctx.dht().entryEx(entry.key());
if (log.isDebugEnabled())
@@ -789,10 +803,10 @@ public class GridDhtPartitionDemander {
}
catch (IgniteCheckedException e) {
throw new IgniteCheckedException("Failed to cache rebalanced entry (will stop rebalancing) [local=" +
- cctx.nodeId() + ", node=" + pick.id() + ", key=" + entry.key() + ", part=" + p + ']', e);
+ ctx.localNode() + ", node=" + pick.id() + ", key=" + entry.key() + ", part=" + p + ']', e);
}
finally {
- cctx.shared().database().checkpointReadUnlock();
+ ctx.database().checkpointReadUnlock();
}
return true;
@@ -810,6 +824,12 @@ public class GridDhtPartitionDemander {
/** */
private static final long serialVersionUID = 1L;
+ /** */
+ private final GridCacheSharedContext<?, ?> ctx;
+
+ /** */
+ private final CacheGroupInfrastructure grp;
+
/** Should EVT_CACHE_REBALANCE_STARTED event be sent or not. */
private final AtomicBoolean startedEvtSent;
@@ -817,9 +837,6 @@ public class GridDhtPartitionDemander {
private final AtomicBoolean stoppedEvtSent;
/** */
- private final GridCacheContext<?, ?> cctx;
-
- /** */
private final IgniteLogger log;
/** Remaining. T2: startTime, partitions */
@@ -840,14 +857,15 @@ public class GridDhtPartitionDemander {
/**
* @param assigns Assigns.
- * @param cctx Context.
+ * @param grp Cache group.
* @param log Logger.
* @param startedEvtSent Start event sent flag.
* @param stoppedEvtSent Stop event sent flag.
* @param updateSeq Update sequence.
*/
- RebalanceFuture(GridDhtPreloaderAssignments assigns,
- GridCacheContext<?, ?> cctx,
+ RebalanceFuture(
+ CacheGroupInfrastructure grp,
+ GridDhtPreloaderAssignments assigns,
IgniteLogger log,
AtomicBoolean startedEvtSent,
AtomicBoolean stoppedEvtSent,
@@ -856,20 +874,23 @@ public class GridDhtPartitionDemander {
this.exchFut = assigns.exchangeFuture();
this.topVer = assigns.topologyVersion();
- this.cctx = cctx;
+ this.grp = grp;
this.log = log;
this.startedEvtSent = startedEvtSent;
this.stoppedEvtSent = stoppedEvtSent;
this.updateSeq = updateSeq;
+
+ ctx= grp.shared();
}
/**
* Dummy future. Will be done by real one.
*/
- public RebalanceFuture() {
+ RebalanceFuture() {
this.exchFut = null;
this.topVer = null;
- this.cctx = null;
+ this.ctx = null;
+ this.grp = null;
this.log = null;
this.startedEvtSent = null;
this.stoppedEvtSent = null;
@@ -910,7 +931,7 @@ public class GridDhtPartitionDemander {
U.log(log, "Cancelled rebalancing from all nodes [topology=" + topologyVersion() + ']');
- if (!cctx.kernalContext().isStopping()) {
+ if (!ctx.kernalContext().isStopping()) {
for (UUID nodeId : remaining.keySet())
cleanupRemoteContexts(nodeId);
}
@@ -931,7 +952,7 @@ public class GridDhtPartitionDemander {
if (isDone())
return;
- U.log(log, ("Cancelled rebalancing [cache=" + cctx.name() +
+ U.log(log, ("Cancelled rebalancing [grp=" + grp.name() +
", fromNode=" + nodeId + ", topology=" + topologyVersion() +
", time=" + (U.currentTimeMillis() - remaining.get(nodeId).get1()) + " ms]"));
@@ -966,7 +987,7 @@ public class GridDhtPartitionDemander {
* @param nodeId Node id.
*/
private void cleanupRemoteContexts(UUID nodeId) {
- ClusterNode node = cctx.discovery().node(nodeId);
+ ClusterNode node = ctx.discovery().node(nodeId);
if (node == null)
return;
@@ -974,14 +995,14 @@ public class GridDhtPartitionDemander {
GridDhtPartitionDemandMessage d = new GridDhtPartitionDemandMessage(
-1/* remove supply context signal */, this.topologyVersion(), cctx.cacheId());
- d.timeout(cctx.config().getRebalanceTimeout());
+ d.timeout(grp.config().getRebalanceTimeout());
try {
- for (int idx = 0; idx < cctx.gridConfig().getRebalanceThreadPoolSize(); idx++) {
+ for (int idx = 0; idx < ctx.gridConfig().getRebalanceThreadPoolSize(); idx++) {
d.topic(GridCachePartitionExchangeManager.rebalanceTopic(idx));
- cctx.io().sendOrderedMessage(node, GridCachePartitionExchangeManager.rebalanceTopic(idx),
- d, cctx.ioPolicy(), cctx.config().getRebalanceTimeout());
+ ctx.io().sendOrderedMessage(node, GridCachePartitionExchangeManager.rebalanceTopic(idx),
+ d, grp.ioPolicy(), grp.config().getRebalanceTimeout());
}
}
catch (IgniteCheckedException ignored) {
@@ -999,20 +1020,21 @@ public class GridDhtPartitionDemander {
if (isDone())
return;
- if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_LOADED))
- preloadEvent(p, EVT_CACHE_REBALANCE_PART_LOADED,
- exchFut.discoveryEvent());
+ // TODO IGNITE-5075.
+// if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_LOADED))
+// preloadEvent(p, EVT_CACHE_REBALANCE_PART_LOADED,
+// exchFut.discoveryEvent());
T2<Long, Collection<Integer>> t = remaining.get(nodeId);
- assert t != null : "Remaining not found [cache=" + cctx.name() + ", fromNode=" + nodeId +
+ assert t != null : "Remaining not found [grp=" + grp.name() + ", fromNode=" + nodeId +
", part=" + p + "]";
Collection<Integer> parts = t.get2();
boolean rmvd = parts.remove(p);
- assert rmvd : "Partition already done [cache=" + cctx.name() + ", fromNode=" + nodeId +
+ assert rmvd : "Partition already done [grp=" + grp.name() + ", fromNode=" + nodeId +
", part=" + p + ", left=" + parts + "]";
if (parts.isEmpty()) {
@@ -1035,7 +1057,8 @@ public class GridDhtPartitionDemander {
private void preloadEvent(int part, int type, DiscoveryEvent discoEvt) {
assert discoEvt != null;
- cctx.events().addPreloadEvent(part, type, discoEvt.eventNode(), discoEvt.type(), discoEvt.timestamp());
+ // TODO IGNITE-5075.
+ // cctx.events().addPreloadEvent(part, type, discoEvt.eventNode(), discoEvt.type(), discoEvt.timestamp());
}
/**
@@ -1063,7 +1086,7 @@ public class GridDhtPartitionDemander {
if (log.isDebugEnabled())
log.debug("Completed rebalance future: " + this);
- cctx.shared().exchange().scheduleResendPartitions();
+ ctx.exchange().scheduleResendPartitions();
Collection<Integer> m = new HashSet<>();
@@ -1077,13 +1100,13 @@ public class GridDhtPartitionDemander {
onDone(false); //Finished but has missed partitions, will force dummy exchange
- cctx.shared().exchange().forceDummyExchange(true, exchFut);
+ ctx.exchange().forceDummyExchange(true, exchFut);
return;
}
- if (!cancelled && !cctx.preloader().syncFuture().isDone())
- ((GridFutureAdapter)cctx.preloader().syncFuture()).onDone();
+ if (!cancelled && !grp.preloader().syncFuture().isDone())
+ ((GridFutureAdapter)grp.preloader().syncFuture()).onDone();
onDone(!cancelled);
}
@@ -1093,24 +1116,26 @@ public class GridDhtPartitionDemander {
*
*/
private void sendRebalanceStartedEvent() {
- if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_STARTED) &&
- (!cctx.isReplicated() || !startedEvtSent.get())) {
- preloadEvent(EVT_CACHE_REBALANCE_STARTED, exchFut.discoveryEvent());
-
- startedEvtSent.set(true);
- }
+ // TODO IGNITE-5075.
+// if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_STARTED) &&
+// (!cctx.isReplicated() || !startedEvtSent.get())) {
+// preloadEvent(EVT_CACHE_REBALANCE_STARTED, exchFut.discoveryEvent());
+//
+// startedEvtSent.set(true);
+// }
}
/**
*
*/
private void sendRebalanceFinishedEvent() {
- if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_STOPPED) &&
- (!cctx.isReplicated() || !stoppedEvtSent.get())) {
- preloadEvent(EVT_CACHE_REBALANCE_STOPPED, exchFut.discoveryEvent());
-
- stoppedEvtSent.set(true);
- }
+ // TODO IGNITE-5075.
+// if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_STOPPED) &&
+// (!cctx.isReplicated() || !stoppedEvtSent.get())) {
+// preloadEvent(EVT_CACHE_REBALANCE_STOPPED, exchFut.discoveryEvent());
+//
+// stoppedEvtSent.set(true);
+// }
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/0096266b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
index f7f0aff..84c3d23 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
@@ -26,6 +27,7 @@ import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheGroupInfrastructure;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
import org.apache.ignite.internal.processors.cache.IgniteRebalanceIterator;
@@ -47,7 +49,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
*/
class GridDhtPartitionSupplier {
/** */
- private final GridCacheContext<?, ?> cctx;
+ private final CacheGroupInfrastructure grp;
/** */
private final IgniteLogger log;
@@ -65,18 +67,18 @@ class GridDhtPartitionSupplier {
private final Map<T3<UUID, Integer, AffinityTopologyVersion>, SupplyContext> scMap = new HashMap<>();
/**
- * @param cctx Cache context.
+ * @param grp Cache group.
*/
- GridDhtPartitionSupplier(GridCacheContext<?, ?> cctx) {
- assert cctx != null;
+ GridDhtPartitionSupplier(CacheGroupInfrastructure grp) {
+ assert grp != null;
- this.cctx = cctx;
+ this.grp = grp;
- log = cctx.logger(getClass());
+ log = grp.shared().logger(getClass());
- top = cctx.dht().topology();
+ top = grp.topology();
- depEnabled = cctx.gridDeploy().enabled();
+ depEnabled = grp.shared().gridDeploy().enabled();
}
/**
@@ -171,7 +173,7 @@ class GridDhtPartitionSupplier {
assert d != null;
assert id != null;
- AffinityTopologyVersion cutTop = cctx.affinity().affinityTopologyVersion();
+ AffinityTopologyVersion cutTop = grp.affinity().lastVersion();
AffinityTopologyVersion demTop = d.topologyVersion();
T3<UUID, Integer, AffinityTopologyVersion> scId = new T3<>(id, idx, demTop);
@@ -199,7 +201,7 @@ class GridDhtPartitionSupplier {
GridDhtPartitionSupplyMessage s = new GridDhtPartitionSupplyMessage(
d.updateSequence(), cctx.cacheId(), d.topologyVersion(), cctx.deploymentEnabled());
- ClusterNode node = cctx.discovery().node(id);
+ ClusterNode node = grp.shared().discovery().node(id);
if (node == null)
return; // Context will be cleaned at topology change.
@@ -225,7 +227,7 @@ class GridDhtPartitionSupplier {
boolean newReq = true;
- long maxBatchesCnt = cctx.config().getRebalanceBatchesPrefetchCount();
+ long maxBatchesCnt = grp.config().getRebalanceBatchesPrefetchCount();
if (sctx != null) {
phase = sctx.phase;
@@ -234,7 +236,7 @@ class GridDhtPartitionSupplier {
}
else {
if (log.isDebugEnabled())
- log.debug("Starting supplying rebalancing [cache=" + cctx.name() +
+ log.debug("Starting supplying rebalancing [grp=" + grp.name() +
", fromNode=" + node.id() + ", partitionsCount=" + d.partitions().size() +
", topology=" + d.topologyVersion() + ", updateSeq=" + d.updateSequence() +
", idx=" + idx + "]");
@@ -280,7 +282,7 @@ class GridDhtPartitionSupplier {
IgniteRebalanceIterator iter;
if (sctx == null || sctx.entryIt == null) {
- iter = cctx.offheap().rebalanceIterator(part, d.topologyVersion(), d.partitionCounter(part));
+ iter = grp.offheap().rebalanceIterator(part, d.topologyVersion(), d.partitionCounter(part));
if (!iter.historical())
s.clean(part);
@@ -289,7 +291,9 @@ class GridDhtPartitionSupplier {
iter = (IgniteRebalanceIterator)sctx.entryIt;
while (iter.hasNext()) {
- if (!cctx.affinity().partitionBelongs(node, part, d.topologyVersion())) {
+ List<ClusterNode> nodes = grp.affinity().cachedAffinity(d.topologyVersion()).get(part);
+
+ if (!nodes.contains(node)) {
// Demander no longer needs this partition,
// so we send '-1' partition and move on.
s.missed(part);
@@ -313,7 +317,7 @@ class GridDhtPartitionSupplier {
break;
}
- if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
+ if (s.messageSize() >= grp.config().getRebalanceBatchSize()) {
if (++bCnt >= maxBatchesCnt) {
saveSupplyContext(scId,
phase,
@@ -400,7 +404,7 @@ class GridDhtPartitionSupplier {
reply(node, d, s, scId);
if (log.isDebugEnabled())
- log.debug("Finished supplying rebalancing [cache=" + cctx.name() +
+ log.debug("Finished supplying rebalancing [grp=" + grp.name() +
", fromNode=" + node.id() +
", topology=" + d.topologyVersion() + ", updateSeq=" + d.updateSequence() +
", idx=" + idx + "]");
@@ -427,16 +431,15 @@ class GridDhtPartitionSupplier {
GridDhtPartitionSupplyMessage s,
T3<UUID, Integer, AffinityTopologyVersion> scId)
throws IgniteCheckedException {
-
try {
if (log.isDebugEnabled())
log.debug("Replying to partition demand [node=" + n.id() + ", demand=" + d + ", supply=" + s + ']');
- cctx.io().sendOrderedMessage(n, d.topic(), s, cctx.ioPolicy(), d.timeout());
+ grp.shared().io().sendOrderedMessage(n, d.topic(), s, grp.ioPolicy(), d.timeout());
// Throttle preloading.
- if (cctx.config().getRebalanceThrottle() > 0)
- U.sleep(cctx.config().getRebalanceThrottle());
+ if (grp.config().getRebalanceThrottle() > 0)
+ U.sleep(grp.config().getRebalanceThrottle());
return true;
}
@@ -469,7 +472,7 @@ class GridDhtPartitionSupplier {
AffinityTopologyVersion topVer,
long updateSeq) {
synchronized (scMap) {
- if (cctx.affinity().affinityTopologyVersion().equals(topVer)) {
+ if (grp.affinity().lastVersion().equals(topVer)) {
assert scMap.get(t) == null;
scMap.put(t,
http://git-wip-us.apache.org/repos/asf/ignite/blob/0096266b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
index ee461ab..016ec6f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
@@ -206,32 +206,6 @@ public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements G
* @param ctx Cache context.
* @throws IgniteCheckedException If failed.
*/
- void addEntry(int p, GridCacheEntryInfo info, GridCacheContext ctx) throws IgniteCheckedException {
- assert info != null;
-
- marshalInfo(info, ctx);
-
- msgSize += info.marshalledSize(ctx);
-
- CacheEntryInfoCollection infoCol = infos().get(p);
-
- if (infoCol == null) {
- msgSize += 4;
-
- infos().put(p, infoCol = new CacheEntryInfoCollection());
-
- infoCol.init();
- }
-
- infoCol.add(info);
- }
-
- /**
- * @param p Partition.
- * @param info Entry to add.
- * @param ctx Cache context.
- * @throws IgniteCheckedException If failed.
- */
void addEntry0(int p, GridCacheEntryInfo info, GridCacheContext ctx) throws IgniteCheckedException {
assert info != null;
assert (info.key() != null || info.keyBytes() != null);
http://git-wip-us.apache.org/repos/asf/ignite/blob/0096266b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
index 74bbcb0..3f74f75 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
@@ -87,10 +87,10 @@ public abstract class GridDhtPartitionsAbstractMessage extends GridCacheMessage
}
/**
- * @param cacheId Cache ID.
+ * @param grpId Cache group ID.
* @return Parition update counters.
*/
- public abstract Map<Integer, T2<Long, Long>> partitionUpdateCounters(int cacheId);
+ public abstract Map<Integer, T2<Long, Long>> partitionUpdateCounters(int grpId);
/**
* @return Last used version among all nodes.
http://git-wip-us.apache.org/repos/asf/ignite/blob/0096266b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 499537d..1e656b0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -55,6 +55,7 @@ import org.apache.ignite.internal.pagemem.snapshot.StartFullSnapshotAckDiscovery
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage;
+import org.apache.ignite.internal.processors.cache.CacheGroupInfrastructure;
import org.apache.ignite.internal.processors.cache.CacheInvalidStateException;
import org.apache.ignite.internal.processors.cache.ClusterState;
import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
@@ -644,7 +645,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
}
for (GridClientPartitionTopology top : cctx.exchange().clientTopologies())
- top.updateTopologyVersion(exchId, this, -1, stopping(top.cacheId()));
+ top.updateTopologyVersion(exchId, this, -1, cacheGroupStopping(top.groupId()));
}
/**
@@ -738,14 +739,14 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
if (crd != null) {
if (crd.isLocal()) {
- for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
- boolean updateTop = !cacheCtx.isLocal() &&
- exchId.topologyVersion().equals(cacheCtx.startTopologyVersion());
+ for (CacheGroupInfrastructure grp : cctx.cache().cacheGroups()) {
+ boolean updateTop = !grp.isLocal() &&
+ exchId.topologyVersion().equals(grp.localStartVersion());
if (updateTop) {
for (GridClientPartitionTopology top : cctx.exchange().clientTopologies()) {
- if (top.cacheId() == cacheCtx.cacheId()) {
- cacheCtx.topology().update(exchId,
+ if (top.groupId() == grp.groupId()) {
+ grp.topology().update(exchId,
top.partitionMap(true),
top.updateCounters(false));
@@ -766,8 +767,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
}
else {
if (centralizedAff) { // Last server node failed.
- for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
- GridAffinityAssignmentCache aff = cacheCtx.affinity().affinityCache();
+ for (CacheGroupInfrastructure grp : cctx.cache().cacheGroups()) {
+ GridAffinityAssignmentCache aff = grp.affinity();
aff.initialize(topologyVersion(), aff.idealAssignment());
}
@@ -1009,6 +1010,10 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
U.dumpThreads(log);
}
+ public boolean cacheGroupStopping(int grpId) {
+ return exchActions != null && exchActions.cacheGroupStopping(grpId);
+ }
+
/**
* @param cacheId Cache ID to check.
* @return {@code True} if cache is stopping by this exchange.
@@ -1456,9 +1461,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
Map<Integer, CounterWithNodes> maxCntrs = new HashMap<>();
for (Map.Entry<UUID, GridDhtPartitionsAbstractMessage> e : msgs.entrySet()) {
- assert e.getValue().partitionUpdateCounters(top.cacheId()) != null;
+ assert e.getValue().partitionUpdateCounters(top.groupId()) != null;
- for (Map.Entry<Integer, T2<Long, Long>> e0 : e.getValue().partitionUpdateCounters(top.cacheId()).entrySet()) {
+ for (Map.Entry<Integer, T2<Long, Long>> e0 : e.getValue().partitionUpdateCounters(top.groupId()).entrySet()) {
int p = e0.getKey();
UUID uuid = e.getKey();
@@ -1755,19 +1760,19 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
cctx.versions().onExchange(msg.lastVersion().order());
for (Map.Entry<Integer, GridDhtPartitionFullMap> entry : msg.partitions().entrySet()) {
- Integer cacheId = entry.getKey();
+ Integer grpId = entry.getKey();
- Map<Integer, T2<Long, Long>> cntrMap = msg.partitionUpdateCounters(cacheId);
+ Map<Integer, T2<Long, Long>> cntrMap = msg.partitionUpdateCounters(grpId);
- GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
+ CacheGroupInfrastructure grp = cctx.cache().cacheGroup(grpId);
- if (cacheCtx != null)
- cacheCtx.topology().update(exchId, entry.getValue(), cntrMap);
+ if (grp != null)
+ grp.topology().update(exchId, entry.getValue(), cntrMap);
else {
ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(AffinityTopologyVersion.NONE);
if (oldest != null && oldest.isLocal())
- cctx.exchange().clientTopology(cacheId, this).update(exchId, entry.getValue(), cntrMap);
+ cctx.exchange().clientTopology(grpId, this).update(exchId, entry.getValue(), cntrMap);
}
}
}
@@ -1781,13 +1786,13 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
msgs.put(node.id(), msg);
for (Map.Entry<Integer, GridDhtPartitionMap> entry : msg.partitions().entrySet()) {
- Integer cacheId = entry.getKey();
- GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
+ Integer grpId = entry.getKey();
+ CacheGroupInfrastructure grp = cctx.cache().cacheGroup(grpId);
- GridDhtPartitionTopology top = cacheCtx != null ? cacheCtx.topology() :
- cctx.exchange().clientTopology(cacheId, this);
+ GridDhtPartitionTopology top = grp != null ? grp.topology() :
+ cctx.exchange().clientTopology(grpId, this);
- top.update(exchId, entry.getValue(), msg.partitionUpdateCounters(cacheId));
+ top.update(exchId, entry.getValue(), msg.partitionUpdateCounters(grpId));
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/0096266b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
index 73a3481..b327ad1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
@@ -122,24 +122,24 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
}
/**
- * @param cacheId Cache ID.
+ * @param grpId Cache group ID.
* @return {@code True} if message contains full map for given cache.
*/
- public boolean containsCache(int cacheId) {
- return parts != null && parts.containsKey(cacheId);
+ public boolean containsGroup(int grpId) {
+ return parts != null && parts.containsKey(grpId);
}
/**
- * @param cacheId Cache ID.
+ * @param grpId Cache group ID.
* @param fullMap Full partitions map.
* @param dupDataCache Optional ID of cache with the same partition state map.
*/
- public void addFullPartitionsMap(int cacheId, GridDhtPartitionFullMap fullMap, @Nullable Integer dupDataCache) {
+ public void addFullPartitionsMap(int grpId, GridDhtPartitionFullMap fullMap, @Nullable Integer dupDataCache) {
if (parts == null)
parts = new HashMap<>();
- if (!parts.containsKey(cacheId)) {
- parts.put(cacheId, fullMap);
+ if (!parts.containsKey(grpId)) {
+ parts.put(grpId, fullMap);
if (dupDataCache != null) {
assert compress;
@@ -148,30 +148,30 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
if (dupPartsData == null)
dupPartsData = new HashMap<>();
- dupPartsData.put(cacheId, dupDataCache);
+ dupPartsData.put(grpId, dupDataCache);
}
}
}
/**
- * @param cacheId Cache ID.
+ * @param grpId Cache group ID.
* @param cntrMap Partition update counters.
*/
- public void addPartitionUpdateCounters(int cacheId, Map<Integer, T2<Long, Long>> cntrMap) {
+ public void addPartitionUpdateCounters(int grpId, Map<Integer, T2<Long, Long>> cntrMap) {
if (partCntrs == null)
partCntrs = new HashMap<>();
- if (!partCntrs.containsKey(cacheId))
- partCntrs.put(cacheId, cntrMap);
+ if (!partCntrs.containsKey(grpId))
+ partCntrs.put(grpId, cntrMap);
}
/**
- * @param cacheId Cache ID.
+ * @param grpId Cache group ID.
* @return Partition update counters.
*/
- @Override public Map<Integer, T2<Long, Long>> partitionUpdateCounters(int cacheId) {
+ @Override public Map<Integer, T2<Long, Long>> partitionUpdateCounters(int grpId) {
if (partCntrs != null) {
- Map<Integer, T2<Long, Long>> res = partCntrs.get(cacheId);
+ Map<Integer, T2<Long, Long>> res = partCntrs.get(grpId);
return res != null ? res : Collections.<Integer, T2<Long, Long>>emptyMap();
}
@@ -427,7 +427,6 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
return 46;
}
- //todo
/** {@inheritDoc} */
@Override public byte fieldsCount() {
return 11;
http://git-wip-us.apache.org/repos/asf/ignite/blob/0096266b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index e197864..f2c9158 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -37,6 +37,7 @@ import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheGroupInfrastructure;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
@@ -127,7 +128,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
DiscoveryEvent e = (DiscoveryEvent)evt;
try {
- ClusterNode loc = cctx.localNode();
+ ClusterNode loc = ctx.localNode();
assert e.type() == EVT_NODE_JOINED || e.type() == EVT_NODE_LEFT || e.type() == EVT_NODE_FAILED;
@@ -148,12 +149,12 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
};
/**
- * @param cctx Cache context.
+ * @param grp Cache group.
*/
- public GridDhtPreloader(GridCacheContext<?, ?> cctx) {
- super(cctx);
+ public GridDhtPreloader(CacheGroupInfrastructure grp) {
+ super(grp);
- top = cctx.dht().topology();
+ top = grp.topology();
startFut = new GridFutureAdapter<>();
}
@@ -163,26 +164,26 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
if (log.isDebugEnabled())
log.debug("Starting DHT rebalancer...");
- cctx.io().addHandler(cctx.cacheId(), GridDhtForceKeysRequest.class,
+ ctx.io().addHandler(grp.groupId(), GridDhtForceKeysRequest.class,
new MessageHandler<GridDhtForceKeysRequest>() {
@Override public void onMessage(ClusterNode node, GridDhtForceKeysRequest msg) {
processForceKeysRequest(node, msg);
}
});
- cctx.io().addHandler(cctx.cacheId(), GridDhtForceKeysResponse.class,
+ ctx.io().addHandler(grp.groupId(), GridDhtForceKeysResponse.class,
new MessageHandler<GridDhtForceKeysResponse>() {
@Override public void onMessage(ClusterNode node, GridDhtForceKeysResponse msg) {
processForceKeyResponse(node, msg);
}
});
- supplier = new GridDhtPartitionSupplier(cctx);
+ supplier = new GridDhtPartitionSupplier(grp);
demander = new GridDhtPartitionDemander(cctx);
demander.start();
- cctx.events().addListener(discoLsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED);
+ ctx.gridEvents().addLocalEventListener(discoLsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED);
}
/** {@inheritDoc} */
@@ -203,7 +204,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
stopping = true;
- cctx.events().removeListener(discoLsnr);
+ ctx.gridEvents().removeLocalEventListener(discoLsnr);
// Acquire write busy lock.
busyLock.writeLock().lock();
@@ -253,25 +254,27 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
/** {@inheritDoc} */
@Override public GridDhtPreloaderAssignments assign(GridDhtPartitionsExchangeFuture exchFut) {
// No assignments for disabled preloader.
- GridDhtPartitionTopology top = cctx.dht().topology();
+ GridDhtPartitionTopology top = grp.topology();
- if (!cctx.rebalanceEnabled() || !cctx.shared().kernalContext().state().active())
+ if (!grp.rebalanceEnabled() || !grp.shared().kernalContext().state().active())
return new GridDhtPreloaderAssignments(exchFut, top.topologyVersion());
- int partCnt = cctx.affinity().partitions();
+ int partCnt = grp.affinity().partitions();
assert exchFut.forcePreload() || exchFut.dummyReassign() ||
exchFut.exchangeId().topologyVersion().equals(top.topologyVersion()) :
"Topology version mismatch [exchId=" + exchFut.exchangeId() +
- ", cache=" + cctx.name() +
+ ", grp=" + grp.name() +
", topVer=" + top.topologyVersion() + ']';
GridDhtPreloaderAssignments assigns = new GridDhtPreloaderAssignments(exchFut, top.topologyVersion());
AffinityTopologyVersion topVer = assigns.topologyVersion();
+ AffinityAssignment aff = grp.affinity().cachedAffinity(topVer);
+
for (int p = 0; p < partCnt; p++) {
- if (cctx.shared().exchange().hasPendingExchange()) {
+ if (ctx.exchange().hasPendingExchange()) {
if (log.isDebugEnabled())
log.debug("Skipping assignments creation, exchange worker has pending assignments: " +
exchFut.exchangeId());
@@ -282,7 +285,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
}
// If partition belongs to local node.
- if (cctx.affinity().partitionLocalNode(p, topVer)) {
+ if (aff.get(p).contains(ctx.localNode())) {
GridDhtLocalPartition part = top.localPartition(p, topVer, true);
assert part != null;
@@ -300,13 +303,14 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
if (picked.isEmpty()) {
top.own(part);
- if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_DATA_LOST)) {
- DiscoveryEvent discoEvt = exchFut.discoveryEvent();
-
- cctx.events().addPreloadEvent(p,
- EVT_CACHE_REBALANCE_PART_DATA_LOST, discoEvt.eventNode(),
- discoEvt.type(), discoEvt.timestamp());
- }
+// TODO IGNITE-5075.
+// if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_DATA_LOST)) {
+// DiscoveryEvent discoEvt = exchFut.discoveryEvent();
+//
+// cctx.events().addPreloadEvent(p,
+// EVT_CACHE_REBALANCE_PART_DATA_LOST, discoEvt.eventNode(),
+// discoEvt.type(), discoEvt.timestamp());
+// }
if (log.isDebugEnabled())
log.debug("Owning partition as there are no other owners: " + part);
@@ -342,7 +346,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
* @return Picked owners.
*/
private Collection<ClusterNode> pickedOwners(int p, AffinityTopologyVersion topVer) {
- Collection<ClusterNode> affNodes = cctx.affinity().nodesByPartition(p, topVer);
+ Collection<ClusterNode> affNodes = grp.affinity().cachedAffinity(topVer).get(p);
int affCnt = affNodes.size();
@@ -368,7 +372,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
* @return Nodes owning this partition.
*/
private Collection<ClusterNode> remoteOwners(int p, AffinityTopologyVersion topVer) {
- return F.view(cctx.dht().topology().owners(p, topVer), F.remoteNodes(cctx.nodeId()));
+ return F.view(grp.topology().owners(p, topVer), F.remoteNodes(ctx.localNodeId()));
}
/** {@inheritDoc} */
@@ -423,12 +427,12 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
/** {@inheritDoc} */
@Override public IgniteInternalFuture<?> syncFuture() {
- return cctx.kernalContext().clientNode() ? startFut : demander.syncFuture();
+ return ctx.kernalContext().clientNode() ? startFut : demander.syncFuture();
}
/** {@inheritDoc} */
@Override public IgniteInternalFuture<Boolean> rebalanceFuture() {
- return cctx.kernalContext().clientNode() ? new GridFinishedFuture<>(true) : demander.rebalanceFuture();
+ return ctx.kernalContext().clientNode() ? new GridFinishedFuture<>(true) : demander.rebalanceFuture();
}
/**
@@ -459,7 +463,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
* @param msg Force keys message.
*/
private void processForceKeysRequest(final ClusterNode node, final GridDhtForceKeysRequest msg) {
- IgniteInternalFuture<?> fut = cctx.mvcc().finishKeys(msg.keys(), msg.cacheId(), msg.topologyVersion());
+ IgniteInternalFuture<?> fut = ctx.mvcc().finishKeys(msg.keys(), msg.cacheId(), msg.topologyVersion());
if (fut.isDone())
processForceKeysRequest0(node, msg);
@@ -480,6 +484,8 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
return;
try {
+ GridCacheContext cctx = ctx.cacheContext(msg.cacheId());
+
ClusterNode loc = cctx.localNode();
GridDhtForceKeysResponse res = new GridDhtForceKeysResponse(
@@ -591,11 +597,12 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
try {
top.onEvicted(part, updateSeq);
- if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_UNLOADED))
- cctx.events().addUnloadEvent(part.id());
+// TODO IGNITE-5075.
+// if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_UNLOADED))
+// cctx.events().addUnloadEvent(part.id());
if (updateSeq)
- cctx.shared().exchange().scheduleResendPartitions();
+ ctx.exchange().scheduleResendPartitions();
}
finally {
leaveBusy();
@@ -604,7 +611,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
/** {@inheritDoc} */
@Override public boolean needForceKeys() {
- if (cctx.rebalanceEnabled()) {
+ if (grp.rebalanceEnabled()) {
IgniteInternalFuture<Boolean> rebalanceFut = rebalanceFuture();
if (rebalanceFut.isDone() && Boolean.TRUE.equals(rebalanceFut.result()))
@@ -615,12 +622,13 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
}
/** {@inheritDoc} */
- @Override public IgniteInternalFuture<Object> request(GridNearAtomicAbstractUpdateRequest req,
+ @Override public IgniteInternalFuture<Object> request(GridCacheContext cctx,
+ GridNearAtomicAbstractUpdateRequest req,
AffinityTopologyVersion topVer) {
if (!needForceKeys())
return null;
- return request0(req.keys(), topVer);
+ return request0(cctx, req.keys(), topVer);
}
/**
@@ -628,11 +636,13 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
* @return Future for request.
*/
@SuppressWarnings({"unchecked", "RedundantCast"})
- @Override public GridDhtFuture<Object> request(Collection<KeyCacheObject> keys, AffinityTopologyVersion topVer) {
+ @Override public GridDhtFuture<Object> request(GridCacheContext cctx,
+ Collection<KeyCacheObject> keys,
+ AffinityTopologyVersion topVer) {
if (!needForceKeys())
return null;
- return request0(keys, topVer);
+ return request0(cctx, keys, topVer);
}
/**
@@ -641,7 +651,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
* @return Future for request.
*/
@SuppressWarnings({"unchecked", "RedundantCast"})
- private GridDhtFuture<Object> request0(Collection<KeyCacheObject> keys, AffinityTopologyVersion topVer) {
+ private GridDhtFuture<Object> request0(GridCacheContext cctx, Collection<KeyCacheObject> keys, AffinityTopologyVersion topVer) {
final GridDhtForceKeysFuture<?, ?> fut = new GridDhtForceKeysFuture<>(cctx, topVer, keys, this);
IgniteInternalFuture<?> topReadyFut = cctx.affinity().affinityReadyFuturex(topVer);
@@ -652,7 +662,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
if (topReadyFut == null)
startFut.listen(new CI1<IgniteInternalFuture<?>>() {
@Override public void apply(IgniteInternalFuture<?> syncFut) {
- cctx.kernalContext().closure().runLocalSafe(
+ ctx.kernalContext().closure().runLocalSafe(
new GridPlainRunnable() {
@Override public void run() {
fut.init();
@@ -689,7 +699,8 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
demandLock.writeLock().lock();
try {
- cctx.deploy().unwind(cctx);
+ // TODO IGNITE-5075.
+ // cctx.deploy().unwind(cctx);
}
finally {
demandLock.writeLock().unlock();
@@ -728,7 +739,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
partsToEvict.putIfAbsent(part.id(), part);
if (partsEvictOwning.get() == 0 && partsEvictOwning.compareAndSet(0, 1)) {
- cctx.closures().callLocalSafe(new GPC<Boolean>() {
+ ctx.kernalContext().closure().callLocalSafe(new GPC<Boolean>() {
@Override public Boolean call() {
boolean locked = true;
@@ -749,7 +760,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
partsToEvict.put(part.id(), part);
}
catch (Throwable ex) {
- if (cctx.kernalContext().isStopping()) {
+ if (ctx.kernalContext().isStopping()) {
LT.warn(log, ex, "Partition eviction failed (current node is stopping).",
false,
true);
@@ -785,7 +796,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
/** {@inheritDoc} */
@Override public void dumpDebugInfo() {
if (!forceKeyFuts.isEmpty()) {
- U.warn(log, "Pending force key futures [cache=" + cctx.name() + "]:");
+ U.warn(log, "Pending force key futures [grp=" + grp.name() + "]:");
for (GridDhtForceKeysFuture fut : forceKeyFuts.values())
U.warn(log, ">>> " + fut);
@@ -803,7 +814,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
/** {@inheritDoc} */
@Override public void apply(UUID nodeId, M msg) {
- ClusterNode node = cctx.node(nodeId);
+ ClusterNode node = ctx.node(nodeId);
if (node == null) {
if (log.isDebugEnabled())
[3/4] ignite git commit: ignite-5075
Posted by sb...@apache.org.
http://git-wip-us.apache.org/repos/asf/ignite/blob/0096266b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index 650f65e..8ddea9c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicReference;
import javax.cache.Cache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.DataPageEvictionMode;
import org.apache.ignite.internal.NodeStoppingException;
@@ -81,7 +82,16 @@ import static org.apache.ignite.internal.pagemem.PageIdUtils.pageId;
*
*/
@SuppressWarnings("PublicInnerClass")
-public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter implements IgniteCacheOffheapManager {
+public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager {
+ /** */
+ private GridCacheSharedContext ctx;
+
+ /** */
+ private CacheGroupInfrastructure grp;
+
+ /** */
+ private IgniteLogger log;
+
/** */
// TODO GG-11208 need restore size after restart.
private CacheDataStore locCacheDataStore;
@@ -116,25 +126,24 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
}
/** {@inheritDoc} */
- @Override protected void start0() throws IgniteCheckedException {
- super.start0();
+ @Override public void start(GridCacheSharedContext ctx, CacheGroupInfrastructure grp) throws IgniteCheckedException {
+ this.ctx = ctx;
+ this.grp = grp;
+ this.log = ctx.logger(getClass());
- updateValSizeThreshold = cctx.shared().database().pageSize() / 2;
+ updateValSizeThreshold = ctx.database().pageSize() / 2;
- if (cctx.affinityNode()) {
- cctx.shared().database().checkpointReadLock();
+ if (grp.affinityNode()) {
+ ctx.database().checkpointReadLock();
try {
initDataStructures();
- if (cctx.isLocal()) {
- assert cctx.cache() instanceof GridLocalCache : cctx.cache();
-
+ if (grp.isLocal())
locCacheDataStore = createCacheDataStore(0);
- }
}
finally {
- cctx.shared().database().checkpointReadUnlock();
+ ctx.database().checkpointReadUnlock();
}
}
}
@@ -143,32 +152,29 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
* @throws IgniteCheckedException If failed.
*/
protected void initDataStructures() throws IgniteCheckedException {
- if (cctx.shared().ttl().eagerTtlEnabled()) {
+ if (ctx.ttl().eagerTtlEnabled()) {
String name = "PendingEntries";
long rootPage = allocateForTree();
- pendingEntries = new PendingEntriesTree(cctx,
+ pendingEntries = new PendingEntriesTree(
+ grp,
name,
- cctx.memoryPolicy().pageMemory(),
+ grp.memoryPolicy().pageMemory(),
rootPage,
- cctx.reuseList(),
+ grp.reuseList(),
true);
}
}
/** {@inheritDoc} */
- @Override protected void stop0(final boolean cancel, final boolean destroy) {
- super.stop0(cancel, destroy);
-
- if (destroy && cctx.affinityNode())
+ @Override public void stop(final boolean destroy) {
+ if (destroy && grp.affinityNode())
destroyCacheDataStructures(destroy);
}
/** {@inheritDoc} */
- @Override protected void onKernalStop0(boolean cancel) {
- super.onKernalStop0(cancel);
-
+ @Override public void onKernalStop() {
busyLock.block();
}
@@ -176,7 +182,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
*
*/
protected void destroyCacheDataStructures(boolean destroy) {
- assert cctx.affinityNode();
+ assert grp.affinityNode();
try {
if (locCacheDataStore != null)
@@ -198,7 +204,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
* @return Data store for given entry.
*/
public CacheDataStore dataStore(GridDhtLocalPartition part) {
- if (cctx.isLocal())
+ if (grp.isLocal())
return locCacheDataStore;
else {
assert part != null;
@@ -209,7 +215,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
/** {@inheritDoc} */
@Override public long entriesCount() {
- if (cctx.isLocal())
+ if (grp.isLocal())
return locCacheDataStore.size();
long size = 0;
@@ -225,10 +231,10 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
* @return Partition data.
*/
@Nullable private CacheDataStore partitionData(int p) {
- if (cctx.isLocal())
+ if (grp.isLocal())
return locCacheDataStore;
else {
- GridDhtLocalPartition part = cctx.topology().localPartition(p, AffinityTopologyVersion.NONE, false);
+ GridDhtLocalPartition part = grp.topology().localPartition(p, AffinityTopologyVersion.NONE, false);
return part != null ? part.dataStore() : null;
}
@@ -240,16 +246,19 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
boolean backup,
AffinityTopologyVersion topVer
) throws IgniteCheckedException {
- if (cctx.isLocal())
+ if (grp.isLocal())
return entriesCount(0);
else {
- ClusterNode locNode = cctx.localNode();
+ ClusterNode locNode = ctx.localNode();
long cnt = 0;
- for (GridDhtLocalPartition locPart : cctx.topology().currentLocalPartitions()) {
+ Set<Integer> primaryParts = grp.affinity().cachedAffinity(topVer).primaryPartitions(locNode.id());
+ Set<Integer> backupParts = grp.affinity().cachedAffinity(topVer).backupPartitions(locNode.id());
+
+ for (GridDhtLocalPartition locPart : grp.topology().currentLocalPartitions()) {
if (primary) {
- if (cctx.affinity().primaryByPartition(locNode, locPart.id(), topVer)) {
+ if (primaryParts.contains(locPart.id())) {
cnt += locPart.dataStore().size();
continue;
@@ -257,7 +266,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
}
if (backup) {
- if (cctx.affinity().backupByPartition(locNode, locPart.id(), topVer))
+ if (backupParts.contains(locPart.id()))
cnt += locPart.dataStore().size();
}
}
@@ -268,13 +277,13 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
/** {@inheritDoc} */
@Override public long entriesCount(int part) {
- if (cctx.isLocal()) {
+ if (grp.isLocal()) {
assert part == 0;
return locCacheDataStore.size();
}
else {
- GridDhtLocalPartition locPart = cctx.topology().localPartition(part, AffinityTopologyVersion.NONE, false);
+ GridDhtLocalPartition locPart = grp.topology().localPartition(part, AffinityTopologyVersion.NONE, false);
return locPart == null ? 0 : locPart.dataStore().size();
}
@@ -289,10 +298,10 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
private Iterator<CacheDataStore> cacheData(boolean primary, boolean backup, AffinityTopologyVersion topVer) {
assert primary || backup;
- if (cctx.isLocal())
+ if (grp.isLocal())
return Collections.singleton(locCacheDataStore).iterator();
else {
- final Iterator<GridDhtLocalPartition> it = cctx.topology().currentLocalPartitions().iterator();
+ final Iterator<GridDhtLocalPartition> it = grp.topology().currentLocalPartitions().iterator();
if (primary && backup) {
return F.iterator(it, new IgniteClosure<GridDhtLocalPartition, CacheDataStore>() {
@@ -302,8 +311,8 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
}, true);
}
- final Set<Integer> parts = primary ? cctx.affinity().primaryPartitions(cctx.localNodeId(), topVer) :
- cctx.affinity().backupPartitions(cctx.localNodeId(), topVer);
+ final Set<Integer> parts = primary ? grp.affinity().primaryPartitions(ctx.localNodeId(), topVer) :
+ grp.affinity().backupPartitions(ctx.localNodeId(), topVer);
return F.iterator(it, new IgniteClosure<GridDhtLocalPartition, CacheDataStore>() {
@Override public CacheDataStore apply(GridDhtLocalPartition part) {
@@ -319,15 +328,18 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
}
/** {@inheritDoc} */
- @Override public void invoke(KeyCacheObject key,
+ @Override public void invoke(
+ GridCacheContext cctx,
+ KeyCacheObject key,
GridDhtLocalPartition part,
OffheapInvokeClosure c)
throws IgniteCheckedException {
- dataStore(part).invoke(key, c);
+ dataStore(part).invoke(cctx, key, c);
}
/** {@inheritDoc} */
@Override public void update(
+ GridCacheContext cctx,
KeyCacheObject key,
CacheObject val,
GridCacheVersion ver,
@@ -338,16 +350,17 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
) throws IgniteCheckedException {
assert expireTime >= 0;
- dataStore(part).update(key, partId, val, ver, expireTime, oldRow);
+ dataStore(part).update(cctx, key, partId, val, ver, expireTime, oldRow);
}
/** {@inheritDoc} */
@Override public void remove(
+ GridCacheContext cctx,
KeyCacheObject key,
int partId,
GridDhtLocalPartition part
) throws IgniteCheckedException {
- dataStore(part).remove(key, partId);
+ dataStore(part).remove(cctx, key, partId);
}
/** {@inheritDoc} */
@@ -356,9 +369,9 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
throws IgniteCheckedException {
KeyCacheObject key = entry.key();
- assert cctx.isLocal() || entry.localPartition() != null : entry;
+ assert grp.isLocal() || entry.localPartition() != null : entry;
- return dataStore(entry.localPartition()).find(key);
+ return dataStore(entry.localPartition()).find(entry.context(), key);
}
/** {@inheritDoc} */
@@ -394,7 +407,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
* @param readers {@code True} to clear readers.
*/
@SuppressWarnings("unchecked")
- @Override public void clear(boolean readers) {
+ @Override public void clear(GridCacheContext cctx, boolean readers) {
GridCacheVersion obsoleteVer = null;
GridIterator<CacheDataRow> it = rowsIterator(true, true, null);
@@ -404,7 +417,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
try {
if (obsoleteVer == null)
- obsoleteVer = cctx.versions().next();
+ obsoleteVer = ctx.versions().next();
GridCacheEntryEx entry = cctx.cache().entryEx(key);
@@ -439,7 +452,9 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
* @throws IgniteCheckedException If failed.
*/
@SuppressWarnings("unchecked")
- @Override public <K, V> GridCloseableIterator<Cache.Entry<K, V>> entriesIterator(final boolean primary,
+ @Override public <K, V> GridCloseableIterator<Cache.Entry<K, V>> entriesIterator(
+ final GridCacheContext cctx,
+ final boolean primary,
final boolean backup,
final AffinityTopologyVersion topVer,
final boolean keepBinary) throws IgniteCheckedException {
@@ -622,12 +637,12 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
* @throws IgniteCheckedException If failed.
*/
private long allocateForTree() throws IgniteCheckedException {
- ReuseList reuseList = cctx.reuseList();
+ ReuseList reuseList = grp.reuseList();
long pageId;
if (reuseList == null || (pageId = reuseList.takeRecycledPage()) == 0L)
- pageId = cctx.memoryPolicy().pageMemory().allocatePage(cctx.cacheId(), INDEX_PARTITION, FLAG_IDX);
+ pageId = grp.memoryPolicy().pageMemory().allocatePage(grp.groupId(), INDEX_PARTITION, FLAG_IDX);
return pageId;
}
@@ -636,7 +651,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
@Override public RootPage rootPageForIndex(String idxName) throws IgniteCheckedException {
long pageId = allocateForTree();
- return new RootPage(new FullPageId(pageId, cctx.cacheId()), true);
+ return new RootPage(new FullPageId(pageId, grp.groupId()), true);
}
/** {@inheritDoc} */
@@ -646,7 +661,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
/** {@inheritDoc} */
@Override public ReuseList reuseListForIndex(String idxName) {
- return cctx.reuseList();
+ return grp.reuseList();
}
/** {@inheritDoc} */
@@ -721,14 +736,15 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
throws IgniteCheckedException {
final long rootPage = allocateForTree();
- CacheDataRowStore rowStore = new CacheDataRowStore(cctx, cctx.freeList(), p);
+ CacheDataRowStore rowStore = new CacheDataRowStore(grp, grp.freeList(), p);
String idxName = treeName(p);
- CacheDataTree dataTree = new CacheDataTree(idxName,
- cctx.reuseList(),
+ CacheDataTree dataTree = new CacheDataTree(
+ grp,
+ idxName,
+ grp.reuseList(),
rowStore,
- cctx,
rootPage,
true);
@@ -737,7 +753,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
/** {@inheritDoc} */
@Override public Iterable<CacheDataStore> cacheDataStores() {
- if (cctx.isLocal())
+ if (grp.isLocal())
return Collections.singleton(locCacheDataStore);
return new Iterable<CacheDataStore>() {
@@ -786,18 +802,18 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
if (amount != -1 && cleared > amount)
return true;
-
- if (row.key.partition() == -1)
- row.key.partition(cctx.affinity().partition(row.key));
-
- assert row.key != null && row.link != 0 && row.expireTime != 0 : row;
-
- if (pendingEntries.remove(row) != null) {
- if (obsoleteVer == null)
- obsoleteVer = cctx.versions().next();
-
- c.apply(cctx.cache().entryEx(row.key), obsoleteVer);
- }
+// TODO: IGNITE-5075.
+// if (row.key.partition() == -1)
+// row.key.partition(cctx.affinity().partition(row.key));
+//
+// assert row.key != null && row.link != 0 && row.expireTime != 0 : row;
+//
+// if (pendingEntries.remove(row) != null) {
+// if (obsoleteVer == null)
+// obsoleteVer = ctx.versions().next();
+//
+// c.apply(cctx.cache().entryEx(row.key), obsoleteVer);
+// }
cleared++;
}
@@ -895,7 +911,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
* @return {@code True} if it is possible to update old row data.
* @throws IgniteCheckedException If failed.
*/
- private boolean canUpdateOldRow(@Nullable CacheDataRow oldRow, DataRow dataRow)
+ private boolean canUpdateOldRow(GridCacheContext cctx, @Nullable CacheDataRow oldRow, DataRow dataRow)
throws IgniteCheckedException {
if (oldRow == null || cctx.queries().enabled())
return false;
@@ -916,7 +932,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
}
/** {@inheritDoc} */
- @Override public void invoke(KeyCacheObject key, OffheapInvokeClosure c)
+ @Override public void invoke(GridCacheContext cctx, KeyCacheObject key, OffheapInvokeClosure c)
throws IgniteCheckedException {
if (!busyLock.enterBusy())
throw new NodeStoppingException("Operation has been cancelled (node is stopping).");
@@ -930,7 +946,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
CacheDataRow oldRow = c.oldRow();
- finishUpdate(c.newRow(), oldRow);
+ finishUpdate(cctx, c.newRow(), oldRow);
break;
}
@@ -938,7 +954,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
case REMOVE: {
CacheDataRow oldRow = c.oldRow();
- finishRemove(key, oldRow);
+ finishRemove(cctx, key, oldRow);
break;
}
@@ -956,18 +972,20 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
}
/** {@inheritDoc} */
- @Override public CacheDataRow createRow(KeyCacheObject key,
+ @Override public CacheDataRow createRow(
+ GridCacheContext cctx,
+ KeyCacheObject key,
CacheObject val,
GridCacheVersion ver,
long expireTime,
@Nullable CacheDataRow oldRow) throws IgniteCheckedException
{
- int cacheId = cctx.memoryPolicy().config().getPageEvictionMode() == DataPageEvictionMode.DISABLED ?
+ int cacheId = grp.memoryPolicy().config().getPageEvictionMode() == DataPageEvictionMode.DISABLED ?
0 : cctx.cacheId();
DataRow dataRow = new DataRow(key, val, ver, partId, expireTime, cacheId);
- if (canUpdateOldRow(oldRow, dataRow) && rowStore.updateRow(oldRow.link(), dataRow))
+ if (canUpdateOldRow(cctx, oldRow, dataRow) && rowStore.updateRow(oldRow.link(), dataRow))
dataRow.link(oldRow.link());
else {
CacheObjectContext coCtx = cctx.cacheObjectContext();
@@ -984,7 +1002,9 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
}
/** {@inheritDoc} */
- @Override public void update(KeyCacheObject key,
+ @Override public void update(
+ GridCacheContext cctx,
+ KeyCacheObject key,
int p,
CacheObject val,
GridCacheVersion ver,
@@ -996,7 +1016,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
throw new NodeStoppingException("Operation has been cancelled (node is stopping).");
try {
- int cacheId = cctx.memoryPolicy().config().getPageEvictionMode() != DataPageEvictionMode.DISABLED ?
+ int cacheId = grp.memoryPolicy().config().getPageEvictionMode() != DataPageEvictionMode.DISABLED ?
cctx.cacheId() : 0;
DataRow dataRow = new DataRow(key, val, ver, p, expireTime, cacheId);
@@ -1009,7 +1029,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
CacheDataRow old;
- if (canUpdateOldRow(oldRow, dataRow) && rowStore.updateRow(oldRow.link(), dataRow)) {
+ if (canUpdateOldRow(cctx, oldRow, dataRow) && rowStore.updateRow(oldRow.link(), dataRow)) {
old = oldRow;
dataRow.link(oldRow.link());
@@ -1028,7 +1048,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
old = dataTree.put(dataRow);
}
- finishUpdate(dataRow, old);
+ finishUpdate(cctx, dataRow, old);
}
finally {
busyLock.leaveBusy();
@@ -1040,7 +1060,8 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
* @param oldRow Old row if available.
* @throws IgniteCheckedException If failed.
*/
- private void finishUpdate(CacheDataRow newRow, @Nullable CacheDataRow oldRow) throws IgniteCheckedException {
+ private void finishUpdate(GridCacheContext cctx, CacheDataRow newRow, @Nullable CacheDataRow oldRow)
+ throws IgniteCheckedException {
if (oldRow == null)
storageSize.incrementAndGet();
@@ -1085,18 +1106,18 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
hasPendingEntries = true;
}
- updateIgfsMetrics(key, (oldRow != null ? oldRow.value() : null), newRow.value());
+ updateIgfsMetrics(cctx, key, (oldRow != null ? oldRow.value() : null), newRow.value());
}
/** {@inheritDoc} */
- @Override public void remove(KeyCacheObject key, int partId) throws IgniteCheckedException {
+ @Override public void remove(GridCacheContext cctx, KeyCacheObject key, int partId) throws IgniteCheckedException {
if (!busyLock.enterBusy())
throw new NodeStoppingException("Operation has been cancelled (node is stopping).");
try {
CacheDataRow oldRow = dataTree.remove(new SearchRow(key));
- finishRemove(key, oldRow);
+ finishRemove(cctx, key, oldRow);
}
finally {
busyLock.leaveBusy();
@@ -1108,7 +1129,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
* @param oldRow Removed row.
* @throws IgniteCheckedException If failed.
*/
- private void finishRemove(KeyCacheObject key, @Nullable CacheDataRow oldRow) throws IgniteCheckedException {
+ private void finishRemove(GridCacheContext cctx, KeyCacheObject key, @Nullable CacheDataRow oldRow) throws IgniteCheckedException {
CacheObject val = null;
GridCacheVersion ver = null;
@@ -1133,11 +1154,11 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
if (oldRow != null)
rowStore.removeRow(oldRow.link());
- updateIgfsMetrics(key, (oldRow != null ? oldRow.value() : null), null);
+ updateIgfsMetrics(cctx, key, (oldRow != null ? oldRow.value() : null), null);
}
/** {@inheritDoc} */
- @Override public CacheDataRow find(KeyCacheObject key) throws IgniteCheckedException {
+ @Override public CacheDataRow find(GridCacheContext cctx, KeyCacheObject key) throws IgniteCheckedException {
key.valueBytes(cctx.cacheObjectContext());
CacheDataRow row = dataTree.findOne(new SearchRow(key), CacheDataRowAdapter.RowData.NO_KEY);
@@ -1145,7 +1166,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
if (row != null) {
row.key(key);
- cctx.memoryPolicy().evictionTracker().touchPage(row.link());
+ grp.memoryPolicy().evictionTracker().touchPage(row.link());
}
return row;
@@ -1235,6 +1256,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
* @param newVal New value.
*/
private void updateIgfsMetrics(
+ GridCacheContext cctx,
KeyCacheObject key,
CacheObject oldVal,
CacheObject newVal
@@ -1242,11 +1264,11 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
// In case we deal with IGFS cache, count updated data
if (cctx.cache().isIgfsDataCache() &&
!cctx.isNear() &&
- cctx.kernalContext()
+ ctx.kernalContext()
.igfsHelper()
.isIgfsBlockKey(key.value(cctx.cacheObjectContext(), false))) {
- int oldSize = valueLength(oldVal);
- int newSize = valueLength(newVal);
+ int oldSize = valueLength(cctx, oldVal);
+ int newSize = valueLength(cctx, newVal);
int delta = newSize - oldSize;
@@ -1261,7 +1283,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
* @param val Value.
* @return Length of value.
*/
- private int valueLength(@Nullable CacheObject val) {
+ private int valueLength(GridCacheContext cctx, @Nullable CacheObject val) {
if (val == null)
return 0;
@@ -1333,7 +1355,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
try {
// We can not init data row lazily because underlying buffer can be concurrently cleared.
- initFromLink(cctx, rowData);
+ initFromLink(grp, rowData);
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
@@ -1386,30 +1408,29 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
private final CacheDataRowStore rowStore;
/** */
- private final GridCacheContext cctx;
+ private final CacheGroupInfrastructure grp;
/**
* @param name Tree name.
* @param reuseList Reuse list.
* @param rowStore Row store.
- * @param cctx Context.
* @param metaPageId Meta page ID.
* @param initNew Initialize new index.
* @throws IgniteCheckedException If failed.
*/
public CacheDataTree(
+ CacheGroupInfrastructure grp,
String name,
ReuseList reuseList,
CacheDataRowStore rowStore,
- GridCacheContext cctx,
long metaPageId,
boolean initNew
) throws IgniteCheckedException {
super(name,
- cctx.cacheId(),
- cctx.memoryPolicy().pageMemory(),
- cctx.shared().wal(),
- cctx.offheap().globalRemoveId(),
+ grp.groupId(),
+ grp.memoryPolicy().pageMemory(),
+ grp.shared().wal(),
+ grp.offheap().globalRemoveId(),
metaPageId,
reuseList,
DataInnerIO.VERSIONS,
@@ -1418,7 +1439,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
assert rowStore != null;
this.rowStore = rowStore;
- this.cctx = cctx;
+ this.grp = grp;
initTree(initNew);
}
@@ -1460,7 +1481,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
* @throws IgniteCheckedException If failed.
*/
private int compareKeys(KeyCacheObject key, final long link) throws IgniteCheckedException {
- byte[] bytes = key.valueBytes(cctx.cacheObjectContext());
+ byte[] bytes = key.valueBytes(grp.cacheObjectContext());
final long pageId = pageId(link);
final long page = acquirePage(pageId);
@@ -1479,7 +1500,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
if (data.nextLink() == 0) {
long addr = pageAddr + data.offset();
- if (cctx.memoryPolicy().config().getPageEvictionMode() != DataPageEvictionMode.DISABLED)
+ if (grp.memoryPolicy().config().getPageEvictionMode() != DataPageEvictionMode.DISABLED)
addr += 4; // Skip cache id.
final int len = PageUtils.getInt(addr, 0);
@@ -1526,10 +1547,10 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
// TODO GG-11768.
CacheDataRowAdapter other = new CacheDataRowAdapter(link);
- other.initFromLink(cctx, CacheDataRowAdapter.RowData.KEY_ONLY);
+ other.initFromLink(grp, CacheDataRowAdapter.RowData.KEY_ONLY);
- byte[] bytes1 = other.key().valueBytes(cctx.cacheObjectContext());
- byte[] bytes2 = key.valueBytes(cctx.cacheObjectContext());
+ byte[] bytes1 = other.key().valueBytes(grp.cacheObjectContext());
+ byte[] bytes2 = key.valueBytes(grp.cacheObjectContext());
int lenCmp = Integer.compare(bytes1.length, bytes2.length);
@@ -1571,11 +1592,11 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
private final int partId;
/**
- * @param cctx Cache context.
+ * @param grp Cache group.
* @param freeList Free list.
*/
- public CacheDataRowStore(GridCacheContext<?, ?> cctx, FreeList freeList, int partId) {
- super(cctx, freeList);
+ public CacheDataRowStore(CacheGroupInfrastructure grp, FreeList freeList, int partId) {
+ super(grp, freeList);
this.partId = partId;
}
@@ -1774,19 +1795,19 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
}
/**
- * @param cctx Context.
+ * @param grp Cache group.
* @param expireTime Expire time.
* @param link Link.
* @return Row.
* @throws IgniteCheckedException If failed.
*/
- static PendingRow createRowWithKey(GridCacheContext cctx, long expireTime, long link)
+ static PendingRow createRowWithKey(CacheGroupInfrastructure grp, long expireTime, long link)
throws IgniteCheckedException {
PendingRow row = new PendingRow(expireTime, link);
CacheDataRowAdapter rowData = new CacheDataRowAdapter(link);
- rowData.initFromLink(cctx, CacheDataRowAdapter.RowData.KEY_ONLY);
+ rowData.initFromLink(grp, CacheDataRowAdapter.RowData.KEY_ONLY);
row.key = rowData.key();
@@ -1804,10 +1825,9 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
*/
protected static class PendingEntriesTree extends BPlusTree<PendingRow, PendingRow> {
/** */
- private final GridCacheContext cctx;
+ private final CacheGroupInfrastructure grp;
/**
- * @param cctx Cache context.
* @param name Tree name.
* @param pageMem Page memory.
* @param metaPageId Meta page ID.
@@ -1816,7 +1836,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
* @throws IgniteCheckedException If failed.
*/
public PendingEntriesTree(
- GridCacheContext cctx,
+ CacheGroupInfrastructure grp,
String name,
PageMemory pageMem,
long metaPageId,
@@ -1824,16 +1844,16 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
boolean initNew)
throws IgniteCheckedException {
super(name,
- cctx.cacheId(),
+ grp.groupId(),
pageMem,
- cctx.shared().wal(),
- cctx.offheap().globalRemoveId(),
+ grp.shared().wal(),
+ grp.offheap().globalRemoveId(),
metaPageId,
reuseList,
PendingEntryInnerIO.VERSIONS,
PendingEntryLeafIO.VERSIONS);
- this.cctx = cctx;
+ this.grp = grp;
initTree(initNew);
}
@@ -1925,7 +1945,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
/** {@inheritDoc} */
@Override public PendingRow getLookupRow(BPlusTree<PendingRow, ?> tree, long pageAddr, int idx)
throws IgniteCheckedException {
- return PendingRow.createRowWithKey(((PendingEntriesTree)tree).cctx,
+ return PendingRow.createRowWithKey(((PendingEntriesTree)tree).grp,
getExpireTime(pageAddr, idx),
getLink(pageAddr, idx));
}
@@ -1984,7 +2004,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
/** {@inheritDoc} */
@Override public PendingRow getLookupRow(BPlusTree<PendingRow, ?> tree, long pageAddr, int idx)
throws IgniteCheckedException {
- return PendingRow.createRowWithKey(((PendingEntriesTree)tree).cctx,
+ return PendingRow.createRowWithKey(((PendingEntriesTree)tree).grp,
getExpireTime(pageAddr, idx),
getLink(pageAddr, idx));
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/0096266b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java
index afeada5..0ce0a0e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java
@@ -23,6 +23,7 @@ import org.apache.ignite.configuration.DataPageEvictionMode;
import org.apache.ignite.internal.pagemem.PageIdUtils;
import org.apache.ignite.internal.pagemem.PageMemory;
import org.apache.ignite.internal.pagemem.PageUtils;
+import org.apache.ignite.internal.processors.cache.CacheGroupInfrastructure;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.CacheObjectContext;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
@@ -94,25 +95,25 @@ public class CacheDataRowAdapter implements CacheDataRow {
/**
* Read row from data pages.
*
- * @param cctx Cache context.
+ * @param grp Cache group.
* @param rowData Required row data.
* @throws IgniteCheckedException If failed.
*/
- public final void initFromLink(GridCacheContext<?, ?> cctx, RowData rowData) throws IgniteCheckedException {
- initFromLink(cctx, cctx.shared(), cctx.memoryPolicy().pageMemory(), rowData);
+ public final void initFromLink(CacheGroupInfrastructure grp, RowData rowData) throws IgniteCheckedException {
+ initFromLink(grp, grp.shared(), grp.memoryPolicy().pageMemory(), rowData);
}
/**
* Read row from data pages.
* Can be called with cctx == null, if cache instance is unknown, but its ID is stored in the data row.
*
- * @param cctx Cctx.
+ * @param grp Cache group.
* @param sharedCtx Shared context.
* @param pageMem Page memory.
* @param rowData Row data.
*/
public final void initFromLink(
- @Nullable GridCacheContext<?, ?> cctx,
+ @Nullable CacheGroupInfrastructure grp,
GridCacheSharedContext<?, ?> sharedCtx,
PageMemory pageMem,
RowData rowData)
@@ -122,11 +123,11 @@ public class CacheDataRowAdapter implements CacheDataRow {
CacheObjectContext coctx = null;
- if (cctx != null) {
- cacheId = cctx.memoryPolicy().config().getPageEvictionMode() == DataPageEvictionMode.DISABLED ?
- cctx.cacheId() : 0; // Force cacheId reading for evictable memory policies.
+ if (grp != null) {
+ cacheId = grp.memoryPolicy().config().getPageEvictionMode() == DataPageEvictionMode.DISABLED ?
+ -1 : 0; // Force cacheId reading for evictable memory policies.
- coctx = cctx.cacheObjectContext();
+ coctx = grp.cacheObjectContext();
}
long nextLink = link;
@@ -401,7 +402,6 @@ public class CacheDataRowAdapter implements CacheDataRow {
* @param buf Buffer.
* @param incomplete Incomplete object.
* @return Incomplete object.
- * @throws IgniteCheckedException If failed.
*/
private IncompleteObject<?> readIncompleteExpireTime(
ByteBuffer buf,
http://git-wip-us.apache.org/repos/asf/ignite/blob/0096266b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/RowStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/RowStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/RowStore.java
index 1c4c89e..233fa55 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/RowStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/RowStore.java
@@ -19,8 +19,8 @@ package org.apache.ignite.internal.processors.cache.database;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.pagemem.PageMemory;
+import org.apache.ignite.internal.processors.cache.CacheGroupInfrastructure;
import org.apache.ignite.internal.processors.cache.CacheObjectContext;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.database.freelist.FreeList;
/**
@@ -34,24 +34,20 @@ public class RowStore {
protected final PageMemory pageMem;
/** */
- protected final GridCacheContext<?,?> cctx;
-
- /** */
protected final CacheObjectContext coctx;
/**
- * @param cctx Cache context.
+ * @param grp Cache group.
* @param freeList Free list.
*/
- public RowStore(GridCacheContext<?,?> cctx, FreeList freeList) {
- assert cctx != null;
+ public RowStore(CacheGroupInfrastructure grp, FreeList freeList) {
+ assert grp != null;
assert freeList != null;
- this.cctx = cctx;
this.freeList = freeList;
- coctx = cctx.cacheObjectContext();
- pageMem = cctx.memoryPolicy().pageMemory();
+ coctx = grp.cacheObjectContext();
+ pageMem = grp.memoryPolicy().pageMemory();
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/0096266b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedConcurrentMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedConcurrentMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedConcurrentMap.java
index 357bf89..1fa909b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedConcurrentMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedConcurrentMap.java
@@ -25,6 +25,7 @@ import java.util.NoSuchElementException;
import java.util.Set;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
+import org.apache.ignite.internal.processors.cache.CacheGroupInfrastructure;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.GridCacheConcurrentMap;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
@@ -39,14 +40,13 @@ import org.jetbrains.annotations.Nullable;
*/
public class GridCachePartitionedConcurrentMap implements GridCacheConcurrentMap {
/** Context. */
- private final GridCacheContext ctx;
+ private final CacheGroupInfrastructure grp;
/**
- * Constructor.
- * @param ctx Context.
+ * @param grp Cache group.
*/
- public GridCachePartitionedConcurrentMap(GridCacheContext ctx) {
- this.ctx = ctx;
+ GridCachePartitionedConcurrentMap(CacheGroupInfrastructure grp) {
+ this.grp = grp;
}
/**
@@ -56,6 +56,7 @@ public class GridCachePartitionedConcurrentMap implements GridCacheConcurrentMap
* @return Local partition.
*/
@Nullable private GridDhtLocalPartition localPartition(
+ GridCacheContext cctx,
KeyCacheObject key,
AffinityTopologyVersion topVer,
boolean create
@@ -63,31 +64,31 @@ public class GridCachePartitionedConcurrentMap implements GridCacheConcurrentMap
int p = key.partition();
if (p == -1)
- p = ctx.affinity().partition(key);
+ p = cctx.affinity().partition(key);
- return ctx.topology().localPartition(p, topVer, create);
+ return grp.topology().localPartition(p, topVer, create);
}
/** {@inheritDoc} */
- @Nullable @Override public GridCacheMapEntry getEntry(KeyCacheObject key) {
- GridDhtLocalPartition part = localPartition(key, AffinityTopologyVersion.NONE, false);
+ @Nullable @Override public GridCacheMapEntry getEntry(GridCacheContext ctx, KeyCacheObject key) {
+ GridDhtLocalPartition part = localPartition(ctx, key, AffinityTopologyVersion.NONE, false);
if (part == null)
return null;
- return part.getEntry(key);
+ return part.getEntry(ctx, key);
}
/** {@inheritDoc} */
- @Override public GridCacheMapEntry putEntryIfObsoleteOrAbsent(AffinityTopologyVersion topVer, KeyCacheObject key,
+ @Override public GridCacheMapEntry putEntryIfObsoleteOrAbsent(GridCacheContext ctx, AffinityTopologyVersion topVer, KeyCacheObject key,
@Nullable CacheObject val, boolean create, boolean touch) {
while (true) {
- GridDhtLocalPartition part = localPartition(key, topVer, create);
+ GridDhtLocalPartition part = localPartition(ctx, key, topVer, create);
if (part == null)
return null;
- GridCacheMapEntry res = part.putEntryIfObsoleteOrAbsent(topVer, key, val, create, touch);
+ GridCacheMapEntry res = part.putEntryIfObsoleteOrAbsent(ctx, topVer, key, val, create, touch);
if (res != null || !create)
return res;
@@ -100,7 +101,7 @@ public class GridCachePartitionedConcurrentMap implements GridCacheConcurrentMap
@Override public int size() {
int size = 0;
- for (GridDhtLocalPartition part : ctx.topology().currentLocalPartitions())
+ for (GridDhtLocalPartition part : grp.topology().currentLocalPartitions())
size += part.size();
return size;
@@ -110,7 +111,7 @@ public class GridCachePartitionedConcurrentMap implements GridCacheConcurrentMap
@Override public int publicSize() {
int size = 0;
- for (GridDhtLocalPartition part : ctx.topology().currentLocalPartitions())
+ for (GridDhtLocalPartition part : grp.topology().currentLocalPartitions())
size += part.publicSize();
return size;
@@ -118,17 +119,17 @@ public class GridCachePartitionedConcurrentMap implements GridCacheConcurrentMap
/** {@inheritDoc} */
@Override public void incrementPublicSize(GridCacheEntryEx e) {
- localPartition(e.key(), AffinityTopologyVersion.NONE, true).incrementPublicSize(e);
+ localPartition(e.context(), e.key(), AffinityTopologyVersion.NONE, true).incrementPublicSize(e);
}
/** {@inheritDoc} */
@Override public void decrementPublicSize(GridCacheEntryEx e) {
- localPartition(e.key(), AffinityTopologyVersion.NONE, true).decrementPublicSize(e);
+ localPartition(e.context(), e.key(), AffinityTopologyVersion.NONE, true).decrementPublicSize(e);
}
/** {@inheritDoc} */
@Override public boolean removeEntry(GridCacheEntryEx entry) {
- GridDhtLocalPartition part = localPartition(entry.key(), AffinityTopologyVersion.NONE, false);
+ GridDhtLocalPartition part = localPartition(entry.context(), entry.key(), AffinityTopologyVersion.NONE, false);
if (part == null)
return false;
@@ -185,7 +186,7 @@ public class GridCachePartitionedConcurrentMap implements GridCacheConcurrentMap
*/
private abstract class PartitionedIterator<T> implements Iterator<T> {
/** Partitions iterator. */
- private Iterator<GridDhtLocalPartition> partsIter = ctx.topology().currentLocalPartitions().iterator();
+ private Iterator<GridDhtLocalPartition> partsIter = grp.topology().currentLocalPartitions().iterator();
/** Current partition iterator. */
private Iterator<T> currIter = partsIter.hasNext() ? iterator(partsIter.next()) :
@@ -249,7 +250,7 @@ public class GridCachePartitionedConcurrentMap implements GridCacheConcurrentMap
@Override public int size() {
int size = 0;
- for (GridDhtLocalPartition part : ctx.topology().currentLocalPartitions())
+ for (GridDhtLocalPartition part : grp.topology().currentLocalPartitions())
size += set(part).size();
return size;
@@ -257,7 +258,7 @@ public class GridCachePartitionedConcurrentMap implements GridCacheConcurrentMap
/** {@inheritDoc} */
@Override public boolean contains(Object o) {
- for (GridDhtLocalPartition part : ctx.topology().currentLocalPartitions()) {
+ for (GridDhtLocalPartition part : grp.topology().currentLocalPartitions()) {
if (set(part).contains(o))
return true;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/0096266b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index f3c3a1b..ef7221f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -71,7 +71,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
private GridCacheSharedContext cctx;
/** Cache ID. */
- private int cacheId;
+ private int grpId;
/** Logger. */
private final IgniteLogger log;
@@ -111,18 +111,18 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
/**
* @param cctx Context.
- * @param cacheId Cache ID.
+ * @param grpId Group ID.
* @param exchFut Exchange ID.
* @param similarAffKey Key to find caches with similar affinity.
*/
public GridClientPartitionTopology(
GridCacheSharedContext cctx,
- int cacheId,
+ int grpId,
GridDhtPartitionsExchangeFuture exchFut,
Object similarAffKey
) {
this.cctx = cctx;
- this.cacheId = cacheId;
+ this.grpId = grpId;
this.similarAffKey = similarAffKey;
topVer = exchFut.topologyVersion();
@@ -166,8 +166,8 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
}
/** {@inheritDoc} */
- @Override public int cacheId() {
- return cacheId;
+ @Override public int groupId() {
+ return grpId;
}
/** {@inheritDoc} */
@@ -281,7 +281,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
long updateSeq = this.updateSeq.incrementAndGet();
// If this is the oldest node.
- if (oldest.id().equals(loc.id()) || exchFut.isCacheAdded(cacheId, exchId.topologyVersion())) {
+ if (oldest.id().equals(loc.id())) {
if (node2part == null) {
node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq);
@@ -353,8 +353,8 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
}
/** {@inheritDoc} */
- @Override public GridDhtLocalPartition localPartition(Object key, boolean create) {
- return localPartition(1, AffinityTopologyVersion.NONE, create);
+ @Override public GridDhtLocalPartition localPartition(int p) {
+ return localPartition(p, AffinityTopologyVersion.NONE, false);
}
/** {@inheritDoc} */
@@ -997,7 +997,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
/** {@inheritDoc} */
@Override public void printMemoryStats(int threshold) {
X.println(">>> Cache partition topology stats [igniteInstanceName=" + cctx.igniteInstanceName() +
- ", cacheId=" + cacheId + ']');
+ ", grpId=" + grpId + ']');
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/0096266b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index 2ee6f83..36b501b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -96,9 +96,6 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
/** */
private static final long serialVersionUID = 0L;
- /** Preloader. */
- protected GridCachePreloader preldr;
-
/** Multi tx future holder. */
private ThreadLocal<IgniteBiTuple<IgniteUuid, GridDhtTopologyFuture>> multiTxHolder = new ThreadLocal<>();
@@ -157,7 +154,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
* @param ctx Context.
*/
protected GridDhtCacheAdapter(GridCacheContext<K, V> ctx) {
- this(ctx, new GridCachePartitionedConcurrentMap(ctx));
+ this(ctx, new GridCachePartitionedConcurrentMap(ctx.group()));
}
/**
@@ -182,43 +179,16 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
}
/** {@inheritDoc} */
- @Override public void stop() {
- super.stop();
-
- if (preldr != null)
- preldr.stop();
-
- // Clean up to help GC.
- preldr = null;
- }
-
- /** {@inheritDoc} */
@Override public void onReconnected() {
super.onReconnected();
ctx.affinity().onReconnected();
// TODO IGNITE-5075.
- //top.onReconnected();
-
- if (preldr != null)
- preldr.onReconnected();
- }
-
- /** {@inheritDoc} */
- @Override public void onKernalStart() throws IgniteCheckedException {
- super.onKernalStart();
-
- if (preldr != null)
- preldr.onKernalStart();
- }
-
- /** {@inheritDoc} */
- @Override public void onKernalStop() {
- super.onKernalStop();
-
- if (preldr != null)
- preldr.onKernalStop();
+// top.onReconnected();
+//
+// if (preldr != null)
+// preldr.onReconnected();
}
/** {@inheritDoc} */
@@ -259,16 +229,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
/** {@inheritDoc} */
@Override public GridCachePreloader preloader() {
- return preldr;
- }
-
- /**
- * @return DHT preloader.
- */
- public GridDhtPreloader dhtPreloader() {
- assert preldr instanceof GridDhtPreloader;
-
- return (GridDhtPreloader)preldr;
+ return ctx.group().preloader();
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/0096266b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
index 7bc17a1..8031c8f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
@@ -166,7 +166,7 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
* Initializes future.
*/
void init() {
- GridDhtFuture<Object> fut = cctx.dht().dhtPreloader().request(keys.keySet(), topVer);
+ GridDhtFuture<Object> fut = cctx.group().preloader().request(cctx, keys.keySet(), topVer);
if (fut != null) {
if (!F.isEmpty(fut.invalidPartitions())) {
@@ -292,9 +292,11 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
*/
private boolean map(KeyCacheObject key) {
try {
+ int keyPart = cctx.affinity().partition(key);
+
GridDhtLocalPartition part = topVer.topologyVersion() > 0 ?
- cache().topology().localPartition(cctx.affinity().partition(key), topVer, true) :
- cache().topology().localPartition(key, false);
+ cache().topology().localPartition(keyPart, topVer, true) :
+ cache().topology().localPartition(keyPart);
if (part == null)
return false;
http://git-wip-us.apache.org/repos/asf/ignite/blob/0096266b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
index 9cc69b5..8860b5e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
@@ -197,8 +197,9 @@ public final class GridDhtGetSingleFuture<K, V> extends GridFutureAdapter<GridCa
*
*/
private void map() {
- if (cctx.dht().dhtPreloader().needForceKeys()) {
- GridDhtFuture<Object> fut = cctx.dht().dhtPreloader().request(
+ if (cctx.group().preloader().needForceKeys()) {
+ GridDhtFuture<Object> fut = cctx.group().preloader().request(
+ cctx,
Collections.singleton(key),
topVer);
@@ -268,9 +269,11 @@ public final class GridDhtGetSingleFuture<K, V> extends GridFutureAdapter<GridCa
* @return {@code True} if mapped.
*/
private boolean map(KeyCacheObject key) {
+ int keyPart = cctx.affinity().partition(key);
+
GridDhtLocalPartition part = topVer.topologyVersion() > 0 ?
- cache().topology().localPartition(cctx.affinity().partition(key), topVer, true) :
- cache().topology().localPartition(key, false);
+ cache().topology().localPartition(keyPart, topVer, true) :
+ cache().topology().localPartition(keyPart);
if (part == null)
return false;
http://git-wip-us.apache.org/repos/asf/ignite/blob/0096266b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
index 5425954..62fe24f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
@@ -30,16 +31,19 @@ import java.util.concurrent.locks.ReentrantLock;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.pagemem.wal.record.delta.PartitionMetaStateRecord;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheGroupInfrastructure;
import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheConcurrentMapImpl;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheMapEntry;
import org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.database.CacheDataRow;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader;
@@ -101,7 +105,10 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
private final GridFutureAdapter<?> rent;
/** Context. */
- private final GridCacheContext cctx;
+ private final GridCacheSharedContext ctx;
+
+ /** */
+ private final CacheGroupInfrastructure grp;
/** Create time. */
@GridToStringExclude
@@ -133,18 +140,22 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
private volatile boolean shouldBeRenting;
/**
- * @param cctx Context.
+ * @param ctx Context.
* @param id Partition ID.
* @param entryFactory Entry factory.
*/
@SuppressWarnings("ExternalizableWithoutPublicNoArgConstructor")
- GridDhtLocalPartition(GridCacheContext cctx, int id, GridCacheMapEntryFactory entryFactory) {
- super(cctx, entryFactory, Math.max(10, GridCacheAdapter.DFLT_START_CACHE_SIZE / cctx.affinity().partitions()));
+ GridDhtLocalPartition(GridCacheSharedContext ctx,
+ CacheGroupInfrastructure grp,
+ int id,
+ GridCacheMapEntryFactory entryFactory) {
+ super(entryFactory, Math.max(10, GridCacheAdapter.DFLT_START_CACHE_SIZE / grp.affinity().partitions()));
this.id = id;
- this.cctx = cctx;
+ this.ctx = ctx;
+ this.grp = grp;
- log = U.logger(cctx.kernalContext(), logRef, this);
+ log = U.logger(ctx.kernalContext(), logRef, this);
rent = new GridFutureAdapter<Object>() {
@Override public String toString() {
@@ -152,15 +163,16 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
}
};
- int delQueueSize = CU.isSystemCache(cctx.name()) ? 100 :
- Math.max(MAX_DELETE_QUEUE_SIZE / cctx.affinity().partitions(), 20);
+ // TODO IGNITE-5075.
+ int delQueueSize = CU.isSystemCache(grp.name()) ? 100 :
+ Math.max(MAX_DELETE_QUEUE_SIZE / grp.affinity().partitions(), 20);
rmvQueueMaxSize = U.ceilPow2(delQueueSize);
rmvdEntryTtl = Long.getLong(IGNITE_CACHE_REMOVED_ENTRIES_TTL, 10_000);
try {
- store = cctx.offheap().createCacheDataStore(id);
+ store = grp.offheap().createCacheDataStore(id);
}
catch (IgniteCheckedException e) {
// TODO ignite-db
@@ -235,7 +247,7 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
* @return {@code True} if partition is empty.
*/
public boolean isEmpty() {
- if (cctx.allowFastEviction())
+ if (grp.allowFastEviction())
return size() == 0;
return store.size() == 0 && size() == 0;
@@ -307,6 +319,17 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
}
/**
+ * @param key Key.
+ * @param ver Version.
+ */
+ private void removeVersionedEntry(KeyCacheObject key, GridCacheVersion ver) {
+ GridCacheMapEntry entry = getEntry(null, key);
+
+ if (entry != null && entry.markObsoleteVersion(ver))
+ removeEntry(entry);
+ }
+
+ /**
*
*/
public void cleanupRemoveQueue() {
@@ -314,10 +337,10 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
RemovedEntryHolder item = rmvQueue.pollFirst();
if (item != null)
- cctx.dht().removeVersionedEntry(item.key(), item.version());
+ removeVersionedEntry(item.key(), item.version());
}
- if (!cctx.isDrEnabled()) {
+ if (!grp.isDrEnabled()) {
RemovedEntryHolder item = rmvQueue.peekFirst();
while (item != null && item.expireTime() < U.currentTimeMillis()) {
@@ -326,7 +349,7 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
if (item == null)
break;
- cctx.dht().removeVersionedEntry(item.key(), item.version());
+ removeVersionedEntry(item.key(), item.version());
item = rmvQueue.peekFirst();
}
@@ -486,13 +509,14 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
* @return {@code true} if cas succeeds.
*/
private boolean casState(long state, GridDhtPartitionState toState) {
- if (cctx.shared().database().persistenceEnabled()) {
+ if (ctx.database().persistenceEnabled()) {
synchronized (this) {
boolean update = this.state.compareAndSet(state, setPartState(state, toState));
if (update)
try {
- cctx.shared().wal().log(new PartitionMetaStateRecord(cctx.cacheId(), id, toState, updateCounter()));
+ // TODO IGNITE-5075.
+ ctx.wal().log(new PartitionMetaStateRecord(grp.groupId(), id, toState, updateCounter()));
}
catch (IgniteCheckedException e) {
log.error("Error while writing to log", e);
@@ -610,13 +634,13 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
* @param updateSeq Update sequence.
*/
void tryEvictAsync(boolean updateSeq) {
- assert cctx.kernalContext().state().active();
+ assert ctx.kernalContext().state().active();
long state = this.state.get();
GridDhtPartitionState partState = getPartState(state);
- if (isEmpty() && !QueryUtils.isEnabled(cctx.config()) && getSize(state) == 0 &&
+ if (isEmpty() && !grp.queriesEnabled() && getSize(state) == 0 &&
partState == RENTING && getReservations(state) == 0 && !groupReserved() &&
casState(state, EVICTED)) {
if (log.isDebugEnabled())
@@ -626,7 +650,7 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
finishDestroy(updateSeq);
}
else if (partState == RENTING || shouldBeRenting())
- cctx.preloader().evictPartitionAsync(this);
+ grp.preloader().evictPartitionAsync(this);
}
/**
@@ -702,18 +726,19 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
assert state() == EVICTED : this;
assert evictGuard.get() == -1;
- if (cctx.isDrEnabled())
- cctx.dr().partitionEvicted(id);
-
- cctx.continuousQueries().onPartitionEvicted(id);
-
- cctx.dataStructures().onPartitionEvicted(id);
+// TODO IGNITE-5075.
+// if (cctx.isDrEnabled())
+// cctx.dr().partitionEvicted(id);
+//
+// cctx.continuousQueries().onPartitionEvicted(id);
+//
+// cctx.dataStructures().onPartitionEvicted(id);
destroyCacheDataStore();
rent.onDone();
- ((GridDhtPreloader)cctx.preloader()).onPartitionEvicted(this, updateSeq);
+ ((GridDhtPreloader)grp.preloader()).onPartitionEvicted(this, updateSeq);
clearDeferredDeletes();
}
@@ -753,7 +778,7 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
try {
CacheDataStore store = dataStore();
- cctx.offheap().destroyCacheDataStore(id, store);
+ grp.offheap().destroyCacheDataStore(id, store);
}
catch (IgniteCheckedException e) {
log.error("Unable to destroy cache data store on partition eviction [id=" + id + "]", e);
@@ -777,7 +802,9 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
* @return {@code True} if local node is primary for this partition.
*/
public boolean primary(AffinityTopologyVersion topVer) {
- return cctx.affinity().primaryByPartition(cctx.localNode(), id, topVer);
+ List<ClusterNode> nodes = grp.affinity().cachedAffinity(topVer).get(id);
+
+ return !nodes.isEmpty() && ctx.localNode().equals(nodes.get(0));
}
/**
@@ -785,7 +812,9 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
* @return {@code True} if local node is backup for this partition.
*/
public boolean backup(AffinityTopologyVersion topVer) {
- return cctx.affinity().backupByPartition(cctx.localNode(), id, topVer);
+ List<ClusterNode> nodes = grp.affinity().cachedAffinity(topVer).get(id);
+
+ return nodes.indexOf(ctx.localNode()) > 0;
}
/**
@@ -829,9 +858,10 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
* @throws NodeStoppingException If node stopping.
*/
public void clearAll() throws NodeStoppingException {
- GridCacheVersion clearVer = cctx.versions().next();
+ GridCacheVersion clearVer = ctx.versions().next();
- boolean rec = cctx.events().isRecordable(EVT_CACHE_REBALANCE_OBJECT_UNLOADED);
+ // TODO IGNITE-5075.
+ boolean rec = grp.shared().gridEvents().isRecordable(EVT_CACHE_REBALANCE_OBJECT_UNLOADED);
Iterator<GridCacheMapEntry> it = allEntries().iterator();
@@ -840,7 +870,7 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
while (it.hasNext()) {
GridCacheMapEntry cached = null;
- cctx.shared().database().checkpointReadLock();
+ ctx.database().checkpointReadLock();
try {
cached = it.next();
@@ -850,20 +880,21 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
if (!cached.isInternal()) {
if (rec) {
- cctx.events().addEvent(cached.partition(),
- cached.key(),
- cctx.localNodeId(),
- (IgniteUuid)null,
- null,
- EVT_CACHE_REBALANCE_OBJECT_UNLOADED,
- null,
- false,
- cached.rawGet(),
- cached.hasValue(),
- null,
- null,
- null,
- false);
+ // TODO IGNITE-5075.
+// cctx.events().addEvent(cached.partition(),
+// cached.key(),
+// ctx.localNodeId(),
+// (IgniteUuid)null,
+// null,
+// EVT_CACHE_REBALANCE_OBJECT_UNLOADED,
+// null,
+// false,
+// cached.rawGet(),
+// cached.hasValue(),
+// null,
+// null,
+// null,
+// false);
}
}
}
@@ -885,28 +916,34 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
U.error(log, "Failed to clear cache entry for evicted partition: " + cached, e);
}
finally {
- cctx.shared().database().checkpointReadUnlock();
+ ctx.database().checkpointReadUnlock();
}
}
- if (!cctx.allowFastEviction()) {
+ if (!grp.allowFastEviction()) {
+ GridCacheContext cctx = grp.cacheContext();
+
try {
- GridIterator<CacheDataRow> it0 = cctx.offheap().iterator(id);
+ GridIterator<CacheDataRow> it0 = grp.offheap().iterator(id);
while (it0.hasNext()) {
- cctx.shared().database().checkpointReadLock();
+ ctx.database().checkpointReadLock();
try {
CacheDataRow row = it0.next();
- GridCacheMapEntry cached = putEntryIfObsoleteOrAbsent(cctx.affinity().affinityTopologyVersion(),
- row.key(), null, true, false);
+ GridCacheMapEntry cached = putEntryIfObsoleteOrAbsent(cctx,
+ grp.affinity().lastVersion(),
+ row.key(),
+ null,
+ true,
+ false);
if (cached instanceof GridDhtCacheEntry && ((GridDhtCacheEntry)cached).clearInternal(clearVer, extras)) {
if (rec) {
cctx.events().addEvent(cached.partition(),
cached.key(),
- cctx.localNodeId(),
+ ctx.localNodeId(),
(IgniteUuid)null,
null,
EVT_CACHE_REBALANCE_OBJECT_UNLOADED,
@@ -927,7 +964,7 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
break; // Partition is already concurrently cleared and evicted.
}
finally {
- cctx.shared().database().checkpointReadUnlock();
+ ctx.database().checkpointReadUnlock();
}
}
}
@@ -950,7 +987,7 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
*/
private void clearDeferredDeletes() {
for (RemovedEntryHolder e : rmvQueue)
- cctx.dht().removeVersionedEntry(e.key(), e.version());
+ removeVersionedEntry(e.key(), e.version());
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/0096266b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
index cf12986..9617a0d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
@@ -140,7 +140,7 @@ public interface GridDhtPartitionTopology {
* @throws GridDhtInvalidPartitionException If partition is evicted or absent and
* does not belong to this node.
*/
- @Nullable public GridDhtLocalPartition localPartition(Object key, boolean create)
+ @Nullable public GridDhtLocalPartition localPartition(int part)
throws GridDhtInvalidPartitionException;
/**
[4/4] ignite git commit: ignite-5075
Posted by sb...@apache.org.
ignite-5075
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0096266b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0096266b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0096266b
Branch: refs/heads/ignite-5075
Commit: 0096266b5158b607f090e947042c2f1c473cb580
Parents: e6ebae1
Author: sboikov <sb...@gridgain.com>
Authored: Fri May 5 12:33:06 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri May 5 17:47:35 2017 +0300
----------------------------------------------------------------------
.../cache/CacheGroupInfrastructure.java | 174 ++++++++++++-
.../processors/cache/ExchangeActions.java | 6 +-
.../processors/cache/GridCacheAdapter.java | 19 +-
.../cache/GridCacheClearAllRunnable.java | 2 +-
.../cache/GridCacheConcurrentMap.java | 3 +-
.../cache/GridCacheConcurrentMapImpl.java | 19 +-
.../processors/cache/GridCacheContext.java | 40 +--
.../cache/GridCacheLocalConcurrentMap.java | 13 +-
.../processors/cache/GridCacheMapEntry.java | 18 +-
.../GridCachePartitionExchangeManager.java | 76 +++---
.../processors/cache/GridCachePreloader.java | 20 +-
.../cache/GridCachePreloaderAdapter.java | 40 ++-
.../processors/cache/GridCacheProcessor.java | 85 +++---
.../processors/cache/GridNoStorageCacheMap.java | 14 +-
.../cache/IgniteCacheOffheapManager.java | 32 ++-
.../cache/IgniteCacheOffheapManagerImpl.java | 260 ++++++++++---------
.../cache/database/CacheDataRowAdapter.java | 20 +-
.../processors/cache/database/RowStore.java | 16 +-
.../dht/GridCachePartitionedConcurrentMap.java | 43 +--
.../dht/GridClientPartitionTopology.java | 20 +-
.../distributed/dht/GridDhtCacheAdapter.java | 51 +---
.../cache/distributed/dht/GridDhtGetFuture.java | 8 +-
.../distributed/dht/GridDhtGetSingleFuture.java | 11 +-
.../distributed/dht/GridDhtLocalPartition.java | 145 +++++++----
.../dht/GridDhtPartitionTopology.java | 2 +-
.../dht/GridDhtPartitionTopologyImpl.java | 143 +++++-----
.../dht/GridDhtTransactionalCacheAdapter.java | 6 +-
.../dht/colocated/GridDhtColocatedCache.java | 2 +-
.../dht/preloader/GridDhtPartitionDemander.java | 179 +++++++------
.../dht/preloader/GridDhtPartitionSupplier.java | 45 ++--
.../GridDhtPartitionSupplyMessage.java | 26 --
.../GridDhtPartitionsAbstractMessage.java | 4 +-
.../GridDhtPartitionsExchangeFuture.java | 47 ++--
.../preloader/GridDhtPartitionsFullMessage.java | 31 ++-
.../dht/preloader/GridDhtPreloader.java | 97 ++++---
.../distributed/near/GridNearCacheAdapter.java | 1 -
.../cache/GridCacheTtlManagerSelfTest.java | 3 +-
.../IgniteTxStoreExceptionAbstractSelfTest.java | 4 +-
.../expiry/IgniteCacheTtlCleanupSelfTest.java | 2 +-
.../TxOptimisticDeadlockDetectionTest.java | 2 +-
.../TxPessimisticDeadlockDetectionTest.java | 2 +-
.../loadtests/hashmap/GridCacheTestContext.java | 3 -
42 files changed, 949 insertions(+), 785 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/0096266b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
index 57e560f..7051547 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
@@ -26,11 +26,16 @@ import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
+import org.apache.ignite.internal.processors.cache.database.MemoryPolicy;
+import org.apache.ignite.internal.processors.cache.database.freelist.FreeList;
+import org.apache.ignite.internal.processors.cache.database.tree.reuse.ReuseList;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopologyImpl;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader;
+import org.apache.ignite.internal.processors.query.QueryUtils;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiInClosure;
@@ -64,10 +69,37 @@ public class CacheGroupInfrastructure {
private GridDhtPartitionTopologyImpl top;
/** */
- private AffinityTopologyVersion grpStartVer;
+ private final AffinityTopologyVersion grpStartVer;
/** */
- private AffinityTopologyVersion locStartVer;
+ private final AffinityTopologyVersion locStartVer;
+
+ /** */
+ private IgniteCacheOffheapManager offheapMgr;
+
+ /** Preloader. */
+ private GridCachePreloader preldr;
+
+ /** */
+ private final boolean affNode;
+
+ /** Memory policy. */
+ private final MemoryPolicy memPlc;
+
+ /** */
+ private final CacheObjectContext cacheObjCtx;
+
+ /** FreeList instance this group is associated with. */
+ private final FreeList freeList;
+
+ /** ReuseList instance this group is associated with */
+ private final ReuseList reuseList;
+
+ /** */
+ private final CacheType cacheType;
+
+ /** IO policy. */
+ private final byte ioPlc;
/**
* @param grpId Group ID.
@@ -76,21 +108,133 @@ public class CacheGroupInfrastructure {
*/
CacheGroupInfrastructure(GridCacheSharedContext ctx,
int grpId,
+ CacheType cacheType,
CacheConfiguration ccfg,
+ boolean affNode,
+ MemoryPolicy memPlc,
+ CacheObjectContext cacheObjCtx,
+ FreeList freeList,
+ ReuseList reuseList,
AffinityTopologyVersion grpStartVer,
AffinityTopologyVersion locStartVer) {
assert grpId != 0 : "Invalid group ID [cache=" + ccfg.getName() + ", grpName=" + ccfg.getGroupName() + ']';
assert ccfg != null;
this.grpId = grpId;
+ this.cacheType = cacheType;
this.ctx = ctx;
this.ccfg = ccfg;
+ this.affNode = affNode;
+ this.memPlc = memPlc;
+ this.cacheObjCtx = cacheObjCtx;
+ this.freeList = freeList;
+ this.reuseList = reuseList;
this.grpStartVer = grpStartVer;
this.locStartVer = locStartVer;
+ ioPlc = cacheType.ioPolicy();
+
log = ctx.kernalContext().log(getClass());
}
+ public GridCachePreloader preloader() {
+ return preldr;
+ }
+
+ /**
+ * @return IO policy for the given cache group.
+ */
+ public byte ioPolicy() {
+ return ioPlc;
+ }
+
+ /** */
+ private GridCacheContext singleCacheCtx;
+
+ public void cacheContext(GridCacheContext singleCacheCtx) {
+ assert !sharedGroup();
+
+ this.singleCacheCtx = singleCacheCtx;
+ }
+
+ public GridCacheContext cacheContext() {
+ assert !sharedGroup();
+
+ return singleCacheCtx;
+ }
+
+ // TODO IGNITE-5075: need separate caches with/without queries?
+ public boolean queriesEnabled() {
+ return QueryUtils.isEnabled(ccfg);
+ }
+
+ public boolean started() {
+ return true; // TODO IGNITE-5075.
+ }
+
+ /**
+ * @return Free List.
+ */
+ public FreeList freeList() {
+ return freeList;
+ }
+
+ /**
+ * @return Reuse List.
+ */
+ public ReuseList reuseList() {
+ return reuseList;
+ }
+
+ /**
+ * TODO IGNITE-5075: get rid of CacheObjectContext?
+ */
+ public CacheObjectContext cacheObjectContext() {
+ return cacheObjCtx;
+ }
+
+ public GridCacheSharedContext shared() {
+ return ctx;
+ }
+
+ /**
+ * @return Memory policy.
+ */
+ public MemoryPolicy memoryPolicy() {
+ return memPlc;
+ }
+
+ public boolean affinityNode() {
+ return affNode;
+ }
+
+ public IgniteCacheOffheapManager offheap() {
+ return offheapMgr;
+ }
+
+ /** Flag indicating that this cache is in a recovery mode. */
+ // TODO IGNITE-5075 see GridCacheContext#needsRecovery
+ private boolean needsRecovery;
+
+ /**
+ * @return Current cache state. Must only be modified during exchange.
+ */
+ public boolean needsRecovery() {
+ return needsRecovery;
+ }
+
+ /**
+ * @param needsRecovery Needs recovery flag.
+ */
+ public void needsRecovery(boolean needsRecovery) {
+ this.needsRecovery = needsRecovery;
+ }
+
+ public boolean allowFastEviction() {
+ // TODO IGNITE-5075 see GridCacheContext#allowFastEviction
+ return true;
+ }
+
public AffinityTopologyVersion groupStartVersion() {
return grpStartVer;
}
@@ -126,6 +270,16 @@ public class CacheGroupInfrastructure {
return ccfg.getGroupName() != null;
}
+ // TODO IGNITE-5075.
+ public boolean isDrEnabled() {
+ return false;
+ }
+
+ public void onKernalStop() {
+ if (preldr != null)
+ preldr.onKernalStop();
+ }
+
public void start() throws IgniteCheckedException {
aff = new GridAffinityAssignmentCache(ctx.kernalContext(),
name(),
@@ -148,7 +302,7 @@ public class CacheGroupInfrastructure {
}
};
- top = new GridDhtPartitionTopologyImpl(ctx, entryFactory);
+ top = new GridDhtPartitionTopologyImpl(ctx, this, entryFactory);
if (!ctx.kernalContext().clientNode()) {
ctx.io().addHandler(groupId(), GridDhtAffinityAssignmentRequest.class,
@@ -158,8 +312,17 @@ public class CacheGroupInfrastructure {
}
});
}
+
+ preldr = new GridDhtPreloader(this);
+
+ preldr.start();
}
+ // TODO IGNITE-5075 get from plugin.
+ offheapMgr = new IgniteCacheOffheapManagerImpl();
+
+ offheapMgr.start(ctx, this);
+
ctx.affinity().onCacheGroupCreated(this);
}
@@ -244,11 +407,14 @@ public class CacheGroupInfrastructure {
if (top != null)
top.onReconnected();
+
+ if (preldr != null)
+ preldr.onReconnected();
}
public GridDhtPartitionTopology topology() {
if (top == null)
- throw new IllegalStateException("Topology is not initialized: " + groupName());
+ throw new IllegalStateException("Topology is not initialized: " + name());
return top;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/0096266b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
index 383f20d..8c9833f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
@@ -292,7 +292,7 @@ public class ExchangeActions {
cacheGrpsToStart.add(grpDesc);
}
- List<CacheGroupDescriptor> cacheGroupsToStart() {
+ public List<CacheGroupDescriptor> cacheGroupsToStart() {
return cacheGrpsToStart != null ? cacheGrpsToStart : Collections.<CacheGroupDescriptor>emptyList();
}
@@ -303,11 +303,11 @@ public class ExchangeActions {
cacheGrpsToStop.add(grpDesc);
}
- List<CacheGroupDescriptor> cacheGroupsToStop() {
+ public List<CacheGroupDescriptor> cacheGroupsToStop() {
return cacheGrpsToStop != null ? cacheGrpsToStop : Collections.<CacheGroupDescriptor>emptyList();
}
- boolean cacheGroupStopping(int grpId) {
+ public boolean cacheGroupStopping(int grpId) {
if (cacheGrpsToStop != null) {
for (CacheGroupDescriptor grpToStop : cacheGrpsToStop) {
if (grpToStop.groupId() == grpId)
http://git-wip-us.apache.org/repos/asf/ignite/blob/0096266b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 0b1ab74..0d30ac8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -560,9 +560,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
* @throws IgniteCheckedException If start failed.
*/
public void start() throws IgniteCheckedException {
- if (map == null) {
- map = new GridCacheLocalConcurrentMap(ctx, entryFactory(), DFLT_START_CACHE_SIZE);
- }
+ // TODO: IGNITE-5075: make abstract?
+ if (map == null)
+ map = new GridCacheLocalConcurrentMap(entryFactory(), DFLT_START_CACHE_SIZE);
}
/**
@@ -717,7 +717,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
IgniteCacheOffheapManager offheapMgr = ctx.isNear() ? ctx.near().dht().context().offheap() : ctx.offheap();
- its.add(offheapMgr.<K, V>entriesIterator(modes.primary, modes.backup, topVer, ctx.keepBinary()));
+ its.add(offheapMgr.<K, V>entriesIterator(ctx, modes.primary, modes.backup, topVer, ctx.keepBinary()));
}
}
else if (modes.heap) {
@@ -944,7 +944,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
* @return Entry (never {@code null}).
*/
public GridCacheEntryEx entryEx(KeyCacheObject key, AffinityTopologyVersion topVer) {
- GridCacheEntryEx e = map.putEntryIfObsoleteOrAbsent(topVer, key, null, true, false);
+ GridCacheEntryEx e = map.putEntryIfObsoleteOrAbsent(ctx, topVer, key, null, true, false);
assert e != null;
@@ -960,10 +960,11 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
*/
@Nullable private GridCacheEntryEx entry0(KeyCacheObject key, AffinityTopologyVersion topVer, boolean create,
boolean touch) {
- GridCacheMapEntry cur = map.getEntry(key);
+ GridCacheMapEntry cur = map.getEntry(ctx, key);
if (cur == null || cur.obsolete()) {
cur = map.putEntryIfObsoleteOrAbsent(
+ ctx,
topVer,
key,
null,
@@ -1071,7 +1072,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
public final void removeIfObsolete(KeyCacheObject key) {
assert key != null;
- GridCacheMapEntry entry = map.getEntry(key);
+ GridCacheMapEntry entry = map.getEntry(ctx, key);
if (entry != null && entry.obsolete())
removeEntry(entry);
@@ -6518,7 +6519,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
/** {@inheritDoc} */
@Override public boolean contains(Object o) {
- GridCacheMapEntry entry = map.getEntry(ctx.toCacheKeyObject(o));
+ GridCacheMapEntry entry = map.getEntry(ctx, ctx.toCacheKeyObject(o));
return entry != null && internalSet.contains(entry);
}
@@ -6608,7 +6609,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
/** {@inheritDoc} */
@Override public boolean contains(Object o) {
- GridCacheMapEntry entry = map.getEntry(ctx.toCacheKeyObject(o));
+ GridCacheMapEntry entry = map.getEntry(ctx, ctx.toCacheKeyObject(o));
return entry != null && internalSet.contains(entry);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/0096266b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheClearAllRunnable.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheClearAllRunnable.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheClearAllRunnable.java
index df19225..ca89650 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheClearAllRunnable.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheClearAllRunnable.java
@@ -82,7 +82,7 @@ public class GridCacheClearAllRunnable<K, V> implements Runnable {
if (!ctx.isNear()) {
if (id == 0)
- ctx.offheap().clear(readers);
+ ctx.offheap().clear(ctx, readers);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/0096266b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java
index 9378f74..e00b9f8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java
@@ -33,7 +33,7 @@ public interface GridCacheConcurrentMap {
* @param key Key.
* @return Entry.
*/
- @Nullable public GridCacheMapEntry getEntry(KeyCacheObject key);
+ @Nullable public GridCacheMapEntry getEntry(GridCacheContext ctx, KeyCacheObject key);
/**
* @param topVer Topology version.
@@ -45,6 +45,7 @@ public interface GridCacheConcurrentMap {
* couldn't be created.
*/
@Nullable public GridCacheMapEntry putEntryIfObsoleteOrAbsent(
+ GridCacheContext ctx,
AffinityTopologyVersion topVer,
KeyCacheObject key,
@Nullable CacheObject val,
http://git-wip-us.apache.org/repos/asf/ignite/blob/0096266b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMapImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMapImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMapImpl.java
index 76d961a..d569005 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMapImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMapImpl.java
@@ -49,14 +49,10 @@ public abstract class GridCacheConcurrentMapImpl implements GridCacheConcurrentM
/** Map entry factory. */
private final GridCacheMapEntryFactory factory;
- /** Cache context. */
- private final GridCacheContext ctx;
-
/**
* Creates a new, empty map with the specified initial
* capacity.
*
- * @param ctx Cache context.
* @param factory Entry factory.
* @param initialCapacity the initial capacity. The implementation
* performs internal sizing to accommodate this many elements.
@@ -64,15 +60,14 @@ public abstract class GridCacheConcurrentMapImpl implements GridCacheConcurrentM
* @throws IllegalArgumentException if the initial capacity is
* negative.
*/
- public GridCacheConcurrentMapImpl(GridCacheContext ctx, GridCacheMapEntryFactory factory, int initialCapacity) {
- this(ctx, factory, initialCapacity, DFLT_LOAD_FACTOR, DFLT_CONCUR_LEVEL);
+ public GridCacheConcurrentMapImpl(GridCacheMapEntryFactory factory, int initialCapacity) {
+ this(factory, initialCapacity, DFLT_LOAD_FACTOR, DFLT_CONCUR_LEVEL);
}
/**
* Creates a new, empty map with the specified initial
* capacity, load factor and concurrency level.
*
- * @param ctx Cache context.
* @param factory Entry factory.
* @param initialCapacity the initial capacity. The implementation
* performs internal sizing to accommodate this many elements.
@@ -87,25 +82,25 @@ public abstract class GridCacheConcurrentMapImpl implements GridCacheConcurrentM
* non-positive.
*/
public GridCacheConcurrentMapImpl(
- GridCacheContext ctx,
GridCacheMapEntryFactory factory,
int initialCapacity,
float loadFactor,
int concurrencyLevel
) {
- this.ctx = ctx;
this.factory = factory;
map = new ConcurrentHashMap8<>(initialCapacity, loadFactor, concurrencyLevel);
}
/** {@inheritDoc} */
- @Nullable @Override public GridCacheMapEntry getEntry(KeyCacheObject key) {
+ @Nullable @Override public GridCacheMapEntry getEntry(GridCacheContext ctx, KeyCacheObject key) {
return map.get(key);
}
/** {@inheritDoc} */
- @Nullable @Override public GridCacheMapEntry putEntryIfObsoleteOrAbsent(final AffinityTopologyVersion topVer,
+ @Nullable @Override public GridCacheMapEntry putEntryIfObsoleteOrAbsent(
+ GridCacheContext ctx,
+ final AffinityTopologyVersion topVer,
KeyCacheObject key,
@Nullable final CacheObject val,
final boolean create,
@@ -273,6 +268,8 @@ public abstract class GridCacheConcurrentMapImpl implements GridCacheConcurrentM
boolean removed = map.remove(entry.key(), entry);
if (removed) {
+ GridCacheContext ctx = entry.context();
+
if (ctx.events().isRecordable(EVT_CACHE_ENTRY_DESTROYED))
// Event notification.
ctx.events().addEvent(entry.partition(), entry.key(), ctx.localNodeId(), (IgniteUuid)null, null,
http://git-wip-us.apache.org/repos/asf/ignite/blob/0096266b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index 7a4ad33..a6b2bcc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -135,15 +135,6 @@ public class GridCacheContext<K, V> implements Externalizable {
/** Cache shared context. */
private GridCacheSharedContext<K, V> sharedCtx;
- /** Memory policy. */
- private MemoryPolicy memPlc;
-
- /** FreeList instance this cache is associated with. */
- private FreeList freeList;
-
- /** ReuseList instance this cache is associated with */
- private ReuseList reuseList;
-
/** Logger. */
private IgniteLogger log;
@@ -177,9 +168,6 @@ public class GridCacheContext<K, V> implements Externalizable {
/** Replication manager. */
private GridCacheDrManager drMgr;
- /** */
- private IgniteCacheOffheapManager offheapMgr;
-
/** Conflict resolver manager. */
private CacheConflictResolutionManager rslvrMgr;
@@ -273,8 +261,6 @@ public class GridCacheContext<K, V> implements Externalizable {
* @param sharedCtx Cache shared context.
* @param cacheCfg Cache configuration.
* @param cacheType Cache type.
- * @param memPlc MemoryPolicy instance.
- * @param freeList FreeList instance.
* @param affNode {@code True} if local node is affinity node.
* @param updatesAllowed Updates allowed flag.
* @param evtMgr Cache event manager.
@@ -300,9 +286,6 @@ public class GridCacheContext<K, V> implements Externalizable {
AffinityTopologyVersion locStartTopVer,
boolean affNode,
boolean updatesAllowed,
- MemoryPolicy memPlc,
- FreeList freeList,
- ReuseList reuseList,
/*
* Managers in starting order!
@@ -352,10 +335,6 @@ public class GridCacheContext<K, V> implements Externalizable {
this.updatesAllowed = updatesAllowed;
this.depEnabled = ctx.deploy().enabled() && !cacheObjects().isBinaryEnabled(cacheCfg);
- this.memPlc = memPlc;
- this.freeList = freeList;
- this.reuseList = reuseList;
-
/*
* Managers in starting order!
* ===========================
@@ -368,7 +347,6 @@ public class GridCacheContext<K, V> implements Externalizable {
this.dataStructuresMgr = add(dataStructuresMgr);
this.ttlMgr = add(ttlMgr);
this.drMgr = add(drMgr);
- this.offheapMgr = add(offheapMgr);
this.rslvrMgr = add(rslvrMgr);
this.pluginMgr = add(pluginMgr);
this.affMgr = add(affMgr);
@@ -734,21 +712,7 @@ public class GridCacheContext<K, V> implements Externalizable {
* @return Memory policy.
*/
public MemoryPolicy memoryPolicy() {
- return memPlc;
- }
-
- /**
- * @return Free List.
- */
- public FreeList freeList() {
- return freeList;
- }
-
- /**
- * @return Reuse List.
- */
- public ReuseList reuseList() {
- return reuseList;
+ return grp.memoryPolicy();
}
/**
@@ -1106,7 +1070,7 @@ public class GridCacheContext<K, V> implements Externalizable {
* @return Offheap manager.
*/
public IgniteCacheOffheapManager offheap() {
- return offheapMgr;
+ return grp.offheap();
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/0096266b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheLocalConcurrentMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheLocalConcurrentMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheLocalConcurrentMap.java
index db99272..50488bd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheLocalConcurrentMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheLocalConcurrentMap.java
@@ -27,14 +27,15 @@ public class GridCacheLocalConcurrentMap extends GridCacheConcurrentMapImpl {
/** */
private final AtomicInteger pubSize = new AtomicInteger();
- public GridCacheLocalConcurrentMap(GridCacheContext ctx,
- GridCacheMapEntryFactory factory, int initialCapacity) {
- super(ctx, factory, initialCapacity);
+ public GridCacheLocalConcurrentMap(GridCacheMapEntryFactory factory, int initialCapacity) {
+ super(factory, initialCapacity);
}
- public GridCacheLocalConcurrentMap(GridCacheContext ctx,
- GridCacheMapEntryFactory factory, int initialCapacity, float loadFactor, int concurrencyLevel) {
- super(ctx, factory, initialCapacity, loadFactor, concurrencyLevel);
+ public GridCacheLocalConcurrentMap(GridCacheMapEntryFactory factory,
+ int initialCapacity,
+ float loadFactor,
+ int concurrencyLevel) {
+ super(factory, initialCapacity, loadFactor, concurrencyLevel);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/0096266b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 21c58fa..8510709 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -1690,7 +1690,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
c.call(dataRow);
}
else
- cctx.offheap().invoke(key, localPartition(), c);
+ cctx.offheap().invoke(cctx, key, localPartition(), c);
GridCacheUpdateAtomicResult updateRes = c.updateRes;
@@ -3238,7 +3238,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
assert Thread.holdsLock(this);
assert val != null : "null values in update for key: " + key;
- cctx.offheap().invoke(key, localPartition(), new UpdateClosure(this, val, ver, expireTime));
+ cctx.offheap().invoke(cctx, key, localPartition(), new UpdateClosure(this, val, ver, expireTime));
}
/**
@@ -3280,7 +3280,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
protected void removeValue() throws IgniteCheckedException {
assert Thread.holdsLock(this);
- cctx.offheap().remove(key, partition(), localPartition());
+ cctx.offheap().remove(cctx, key, partition(), localPartition());
}
/** {@inheritDoc} */
@@ -3934,7 +3934,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
if (oldRow != null)
oldRow.key(entry.key);
- newRow = entry.cctx.offheap().dataStore(entry.localPartition()).createRow(entry.key,
+ newRow = entry.cctx.offheap().dataStore(entry.localPartition()).createRow(
+ entry.cctx,
+ entry.key,
val,
ver,
expireTime,
@@ -4286,7 +4288,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
}
if (needUpdate) {
- newRow = entry.localPartition().dataStore().createRow(entry.key,
+ newRow = entry.localPartition().dataStore().createRow(
+ entry.cctx,
+ entry.key,
storeLoadedVal,
newVer,
entry.expireTimeExtras(),
@@ -4425,7 +4429,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
entry.logUpdate(op, updated, newVer, newExpireTime, updateCntr0);
if (!entry.isNear()) {
- newRow = entry.localPartition().dataStore().createRow(entry.key,
+ newRow = entry.localPartition().dataStore().createRow(
+ entry.cctx,
+ entry.key,
updated,
newVer,
newExpireTime,
http://git-wip-us.apache.org/repos/asf/ignite/blob/0096266b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 8f52ae6..b6dcf33 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -889,37 +889,35 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
final Map<Object, T2<Integer, GridDhtPartitionFullMap>> dupData = new HashMap<>();
- cctx.forAllCaches(new IgniteInClosure<GridCacheContext>() {
- @Override public void apply(GridCacheContext cacheCtx) {
- if (!cacheCtx.isLocal()) {
- boolean ready;
+ for (CacheGroupInfrastructure grp : cctx.cache().cacheGroups()) {
+ if (!grp.isLocal()) {
+ boolean ready;
- if (exchId != null) {
- AffinityTopologyVersion startTopVer = cacheCtx.cacheStartTopologyVersion();
+ if (exchId != null) {
+ AffinityTopologyVersion startTopVer = grp.groupStartVersion();
- ready = startTopVer.compareTo(exchId.topologyVersion()) <= 0;
- }
- else
- ready = cacheCtx.started();
+ ready = startTopVer.compareTo(exchId.topologyVersion()) <= 0;
+ }
+ else
+ ready = grp.started();
- if (ready) {
- GridAffinityAssignmentCache affCache = cacheCtx.affinity().affinityCache();
+ if (ready) {
+ GridAffinityAssignmentCache affCache = grp.affinity();
- GridDhtPartitionFullMap locMap = cacheCtx.topology().partitionMap(true);
+ GridDhtPartitionFullMap locMap = grp.topology().partitionMap(true);
- addFullPartitionsMap(m,
- dupData,
- compress,
- cacheCtx.cacheId(),
- locMap,
- affCache.similarAffinityKey());
+ addFullPartitionsMap(m,
+ dupData,
+ compress,
+ grp.groupId(),
+ locMap,
+ affCache.similarAffinityKey());
- if (exchId != null)
- m.addPartitionUpdateCounters(cacheCtx.cacheId(), cacheCtx.topology().updateCounters(true));
- }
+ if (exchId != null)
+ m.addPartitionUpdateCounters(grp.groupId(), grp.topology().updateCounters(true));
}
}
- });
+ }
// It is important that client topologies be added after contexts.
for (GridClientPartitionTopology top : cctx.exchange().clientTopologies()) {
@@ -928,12 +926,12 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
addFullPartitionsMap(m,
dupData,
compress,
- top.cacheId(),
+ top.groupId(),
map,
top.similarAffinityKey());
if (exchId != null)
- m.addPartitionUpdateCounters(top.cacheId(), top.updateCounters(true));
+ m.addPartitionUpdateCounters(top.groupId(), top.updateCounters(true));
}
return m;
@@ -943,19 +941,19 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
* @param m Message.
* @param dupData Duplicated data map.
* @param compress {@code True} if need check for duplicated partition state data.
- * @param cacheId Cache ID.
+ * @param grpId Cache group ID.
* @param map Map to add.
* @param affKey Cache affinity key.
*/
private void addFullPartitionsMap(GridDhtPartitionsFullMessage m,
Map<Object, T2<Integer, GridDhtPartitionFullMap>> dupData,
boolean compress,
- Integer cacheId,
+ Integer grpId,
GridDhtPartitionFullMap map,
Object affKey) {
Integer dupDataCache = null;
- if (compress && affKey != null && !m.containsCache(cacheId)) {
+ if (compress && affKey != null && !m.containsGroup(grpId)) {
T2<Integer, GridDhtPartitionFullMap> state0 = dupData.get(affKey);
if (state0 != null && state0.get2().partitionStateEquals(map)) {
@@ -971,10 +969,10 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
dupDataCache = state0.get1();
}
else
- dupData.put(affKey, new T2<>(cacheId, map));
+ dupData.put(affKey, new T2<>(grpId, map));
}
- m.addFullPartitionsMap(cacheId, map, dupDataCache);
+ m.addFullPartitionsMap(grpId, map, dupDataCache);
}
/**
@@ -1022,24 +1020,24 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
Map<Object, T2<Integer,Map<Integer, GridDhtPartitionState>>> dupData = new HashMap<>();
- for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
- if (!cacheCtx.isLocal()) {
- GridDhtPartitionMap locMap = cacheCtx.topology().localPartitionMap();
+ for (CacheGroupInfrastructure grp : cctx.cache().cacheGroups()) {
+ if (!grp.isLocal()) {
+ GridDhtPartitionMap locMap = grp.topology().localPartitionMap();
addPartitionMap(m,
dupData,
true,
- cacheCtx.cacheId(),
+ grp.groupId(),
locMap,
- cacheCtx.affinity().affinityCache().similarAffinityKey());
+ grp.affinity().similarAffinityKey());
if (sndCounters)
- m.partitionUpdateCounters(cacheCtx.cacheId(), cacheCtx.topology().updateCounters(true));
+ m.partitionUpdateCounters(grp.groupId(), grp.topology().updateCounters(true));
}
}
for (GridClientPartitionTopology top : clientTops.values()) {
- if (m.partitions() != null && m.partitions().containsKey(top.cacheId()))
+ if (m.partitions() != null && m.partitions().containsKey(top.groupId()))
continue;
GridDhtPartitionMap locMap = top.localPartitionMap();
@@ -1047,12 +1045,12 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
addPartitionMap(m,
dupData,
true,
- top.cacheId(),
+ top.groupId(),
locMap,
top.similarAffinityKey());
if (sndCounters)
- m.partitionUpdateCounters(top.cacheId(), top.updateCounters(true));
+ m.partitionUpdateCounters(top.groupId(), top.updateCounters(true));
}
return m;
http://git-wip-us.apache.org/repos/asf/ignite/blob/0096266b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
index 5ae68e8..9428d9c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
@@ -22,6 +22,7 @@ import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
@@ -45,18 +46,6 @@ public interface GridCachePreloader {
public void start() throws IgniteCheckedException;
/**
- * Stops preloading.
- */
- public void stop();
-
- /**
- * Kernal start callback.
- *
- * @throws IgniteCheckedException If failed.
- */
- public void onKernalStart() throws IgniteCheckedException;
-
- /**
* Kernal stop callback.
*/
public void onKernalStop();
@@ -140,7 +129,9 @@ public interface GridCachePreloader {
* @param topVer Topology version, {@code -1} if not required.
* @return Future to complete when all keys are preloaded.
*/
- public IgniteInternalFuture<Object> request(Collection<KeyCacheObject> keys, AffinityTopologyVersion topVer);
+ public GridDhtFuture<Object> request(GridCacheContext cctx,
+ Collection<KeyCacheObject> keys,
+ AffinityTopologyVersion topVer);
/**
* Requests that preloader sends the request for the key.
@@ -149,7 +140,8 @@ public interface GridCachePreloader {
* @param topVer Topology version, {@code -1} if not required.
* @return Future to complete when all keys are preloaded.
*/
- public IgniteInternalFuture<Object> request(GridNearAtomicAbstractUpdateRequest req,
+ public GridDhtFuture<Object> request(GridCacheContext cctx,
+ GridNearAtomicAbstractUpdateRequest req,
AffinityTopologyVersion topVer);
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/0096266b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
index 47c37f5..db1a2e9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
@@ -39,15 +39,15 @@ import org.jetbrains.annotations.Nullable;
* Adapter for preloading which always assumes that preloading finished.
*/
public class GridCachePreloaderAdapter implements GridCachePreloader {
- /** Cache context. */
- protected final GridCacheContext<?, ?> cctx;
+ /** */
+ protected final CacheGroupInfrastructure grp;
+
+ /** */
+ protected final GridCacheSharedContext ctx;
/** Logger. */
protected final IgniteLogger log;
- /** Affinity. */
- protected final AffinityFunction aff;
-
/** Start future (always completed by default). */
private final IgniteInternalFuture finFut;
@@ -55,15 +55,16 @@ public class GridCachePreloaderAdapter implements GridCachePreloader {
protected IgnitePredicate<GridCacheEntryInfo> preloadPred;
/**
- * @param cctx Cache context.
+ * @param grp Cache group.
*/
- public GridCachePreloaderAdapter(GridCacheContext<?, ?> cctx) {
- assert cctx != null;
+ public GridCachePreloaderAdapter(CacheGroupInfrastructure grp) {
+ assert grp != null;
+
+ this.grp = grp;
- this.cctx = cctx;
+ ctx = grp.shared();
- log = cctx.logger(getClass());
- aff = cctx.config().getAffinity();
+ log = ctx.logger(getClass());
finFut = new GridFinishedFuture();
}
@@ -74,16 +75,6 @@ public class GridCachePreloaderAdapter implements GridCachePreloader {
}
/** {@inheritDoc} */
- @Override public void stop() {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public void onKernalStart() throws IgniteCheckedException {
- // No-op.
- }
-
- /** {@inheritDoc} */
@Override public void onKernalStop() {
// No-op.
}
@@ -130,7 +121,8 @@ public class GridCachePreloaderAdapter implements GridCachePreloader {
/** {@inheritDoc} */
@Override public void unwindUndeploys() {
- cctx.deploy().unwind(cctx);
+ // TODO IGNITE-5075.
+ // cctx.deploy().unwind(cctx);
}
/** {@inheritDoc} */
@@ -144,13 +136,13 @@ public class GridCachePreloaderAdapter implements GridCachePreloader {
}
/** {@inheritDoc} */
- @Override public IgniteInternalFuture<Object> request(Collection<KeyCacheObject> keys,
+ @Override public IgniteInternalFuture<Object> request(GridCacheContext ctx, Collection<KeyCacheObject> keys,
AffinityTopologyVersion topVer) {
return new GridFinishedFuture<>();
}
/** {@inheritDoc} */
- @Override public IgniteInternalFuture<Object> request(GridNearAtomicAbstractUpdateRequest req,
+ @Override public IgniteInternalFuture<Object> request(GridCacheContext ctx, GridNearAtomicAbstractUpdateRequest req,
AffinityTopologyVersion topVer) {
return new GridFinishedFuture<>();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/0096266b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 3769274..2238dc1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -510,7 +510,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
*/
private List<GridCacheManager> dhtManagers(GridCacheContext ctx) {
return F.asList(ctx.store(), ctx.events(), ctx.evicts(), ctx.queries(), ctx.continuousQueries(),
- ctx.dr(), ctx.offheap());
+ ctx.dr());
}
/**
@@ -522,7 +522,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
if (ctx.config().getCacheMode() == LOCAL || !isNearEnabled(ctx))
return Collections.emptyList();
else
- return F.asList(ctx.queries(), ctx.continuousQueries(), ctx.store(), ctx.offheap());
+ return F.asList(ctx.queries(), ctx.continuousQueries(), ctx.store());
}
/**
@@ -1403,11 +1403,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
storeMgr.initialize(cfgStore, sesHolders);
- String memPlcName = cfg.getMemoryPolicyName();
-
- MemoryPolicy memPlc = sharedCtx.database().memoryPolicy(memPlcName);
- FreeList freeList = sharedCtx.database().freeList(memPlcName);
- ReuseList reuseList = sharedCtx.database().reuseList(memPlcName);
GridCacheContext<?, ?> cacheCtx = new GridCacheContext(
ctx,
@@ -1419,9 +1414,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
locStartTopVer,
affNode,
updatesAllowed,
- memPlc,
- freeList,
- reuseList,
/*
* Managers in starting order!
@@ -1491,14 +1483,14 @@ public class GridCacheProcessor extends GridProcessorAdapter {
case TRANSACTIONAL: {
cache = cacheCtx.affinityNode() ?
new GridDhtColocatedCache(cacheCtx) :
- new GridDhtColocatedCache(cacheCtx, new GridNoStorageCacheMap(cacheCtx));
+ new GridDhtColocatedCache(cacheCtx, new GridNoStorageCacheMap());
break;
}
case ATOMIC: {
cache = cacheCtx.affinityNode() ?
new GridDhtAtomicCache(cacheCtx) :
- new GridDhtAtomicCache(cacheCtx, new GridNoStorageCacheMap(cacheCtx));
+ new GridDhtAtomicCache(cacheCtx, new GridNoStorageCacheMap());
break;
}
@@ -1553,9 +1545,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
locStartTopVer,
affNode,
true,
- memPlc,
- freeList,
- reuseList,
/*
* Managers in starting order!
@@ -1587,7 +1576,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
GridDhtCache dhtCache = cacheCtx.affinityNode() ?
new GridDhtCache(cacheCtx) :
- new GridDhtCache(cacheCtx, new GridNoStorageCacheMap(cacheCtx));
+ new GridDhtCache(cacheCtx, new GridNoStorageCacheMap());
dhtCache.near(near);
@@ -1604,7 +1593,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
GridDhtAtomicCache dhtCache = cacheCtx.affinityNode() ?
new GridDhtAtomicCache(cacheCtx) :
- new GridDhtAtomicCache(cacheCtx, new GridNoStorageCacheMap(cacheCtx));
+ new GridDhtAtomicCache(cacheCtx, new GridNoStorageCacheMap());
dhtCache.near(near);
@@ -1817,25 +1806,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
) throws IgniteCheckedException {
assert !caches.containsKey(startCfg.getName()) : startCfg.getName();
- String grpName = startCfg.getGroupName();
-
- CacheGroupInfrastructure grp = null;
-
- if (grpName != null) {
- for (CacheGroupInfrastructure grp0 : cacheGrps.values()) {
- if (grp0.sharedGroup() && grpName.equals(grp0.groupName())) {
- grp = grp0;
-
- break;
- }
- }
-
- if (grp == null)
- grp = startCacheGroup(grpDesc, exchTopVer);
- }
- else
- grp = startCacheGroup(grpDesc, exchTopVer);
-
CacheConfiguration ccfg = new CacheConfiguration(startCfg);
CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(ccfg);
@@ -1855,6 +1825,25 @@ public class GridCacheProcessor extends GridProcessorAdapter {
ccfg.setNearConfiguration(reqNearCfg);
}
+ String grpName = startCfg.getGroupName();
+
+ CacheGroupInfrastructure grp = null;
+
+ if (grpName != null) {
+ for (CacheGroupInfrastructure grp0 : cacheGrps.values()) {
+ if (grp0.sharedGroup() && grpName.equals(grp0.name())) {
+ grp = grp0;
+
+ break;
+ }
+ }
+
+ if (grp == null)
+ grp = startCacheGroup(grpDesc, affNode, cacheObjCtx, exchTopVer);
+ }
+ else
+ grp = startCacheGroup(grpDesc, affNode, cacheObjCtx, exchTopVer);
+
GridCacheContext cacheCtx = createCache(ccfg,
grp,
null,
@@ -1865,6 +1854,9 @@ public class GridCacheProcessor extends GridProcessorAdapter {
affNode,
true);
+ if (!grp.sharedGroup())
+ grp.cacheContext(cacheCtx);
+
cacheCtx.dynamicDeploymentId(deploymentId);
GridCacheAdapter cache = cacheCtx.cache();
@@ -1878,13 +1870,28 @@ public class GridCacheProcessor extends GridProcessorAdapter {
onKernalStart(cache);
}
- private CacheGroupInfrastructure startCacheGroup(CacheGroupDescriptor desc, AffinityTopologyVersion exchTopVer)
+ private CacheGroupInfrastructure startCacheGroup(
+ CacheGroupDescriptor desc,
+ boolean affNode,
+ CacheObjectContext cacheObjCtx,
+ AffinityTopologyVersion exchTopVer)
throws IgniteCheckedException {
- CacheConfiguration ccfg = new CacheConfiguration(desc.config());
+ CacheConfiguration cfg = new CacheConfiguration(desc.config());
+
+ String memPlcName = cfg.getMemoryPolicyName();
+
+ MemoryPolicy memPlc = sharedCtx.database().memoryPolicy(memPlcName);
+ FreeList freeList = sharedCtx.database().freeList(memPlcName);
+ ReuseList reuseList = sharedCtx.database().reuseList(memPlcName);
CacheGroupInfrastructure grp = new CacheGroupInfrastructure(sharedCtx,
desc.groupId(),
- ccfg,
+ cfg,
+ affNode,
+ memPlc,
+ cacheObjCtx,
+ freeList,
+ reuseList,
desc.startTopologyVersion(),
exchTopVer);
http://git-wip-us.apache.org/repos/asf/ignite/blob/0096266b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridNoStorageCacheMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridNoStorageCacheMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridNoStorageCacheMap.java
index 00827ee..8faf52c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridNoStorageCacheMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridNoStorageCacheMap.java
@@ -29,23 +29,13 @@ import org.jetbrains.annotations.Nullable;
* Empty cache map that will never store any entries.
*/
public class GridNoStorageCacheMap implements GridCacheConcurrentMap {
- /** Context. */
- private final GridCacheContext ctx;
-
- /**
- * @param ctx Cache context.
- */
- public GridNoStorageCacheMap(GridCacheContext ctx) {
- this.ctx = ctx;
- }
-
/** {@inheritDoc} */
- @Nullable @Override public GridCacheMapEntry getEntry(KeyCacheObject key) {
+ @Nullable @Override public GridCacheMapEntry getEntry(GridCacheContext ctx, KeyCacheObject key) {
return null;
}
/** {@inheritDoc} */
- @Override public GridCacheMapEntry putEntryIfObsoleteOrAbsent(AffinityTopologyVersion topVer, KeyCacheObject key,
+ @Override public GridCacheMapEntry putEntryIfObsoleteOrAbsent(GridCacheContext ctx, AffinityTopologyVersion topVer, KeyCacheObject key,
@Nullable CacheObject val, boolean create, boolean touch) {
if (create)
return new GridDhtCacheEntry(ctx, topVer, key, key.hashCode(), val);
http://git-wip-us.apache.org/repos/asf/ignite/blob/0096266b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
index 9eb5368..64bc51c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
@@ -38,7 +38,13 @@ import org.jetbrains.annotations.Nullable;
*
*/
@SuppressWarnings("WeakerAccess")
-public interface IgniteCacheOffheapManager extends GridCacheManager {
+public interface IgniteCacheOffheapManager {
+ public void start(GridCacheSharedContext ctx, CacheGroupInfrastructure grp) throws IgniteCheckedException;;
+
+ public void onKernalStop();
+
+ public void stop(boolean destroy);
+
/**
* Partition counter update callback. May be overridden by plugin-provided subclasses.
*
@@ -119,7 +125,7 @@ public interface IgniteCacheOffheapManager extends GridCacheManager {
* @param c Tree update closure.
* @throws IgniteCheckedException If failed.
*/
- public void invoke(KeyCacheObject key, GridDhtLocalPartition part, OffheapInvokeClosure c)
+ public void invoke(GridCacheContext cctx, KeyCacheObject key, GridDhtLocalPartition part, OffheapInvokeClosure c)
throws IgniteCheckedException;
/**
@@ -133,6 +139,7 @@ public interface IgniteCacheOffheapManager extends GridCacheManager {
* @throws IgniteCheckedException If failed.
*/
public void update(
+ GridCacheContext cctx,
KeyCacheObject key,
CacheObject val,
GridCacheVersion ver,
@@ -149,6 +156,7 @@ public interface IgniteCacheOffheapManager extends GridCacheManager {
* @throws IgniteCheckedException If failed.
*/
public void remove(
+ GridCacheContext cctx,
KeyCacheObject key,
int partId,
GridDhtLocalPartition part
@@ -194,7 +202,9 @@ public interface IgniteCacheOffheapManager extends GridCacheManager {
* @return Entries iterator.
* @throws IgniteCheckedException If failed.
*/
- public <K, V> GridCloseableIterator<Cache.Entry<K, V>> entriesIterator(final boolean primary,
+ public <K, V> GridCloseableIterator<Cache.Entry<K, V>> entriesIterator(
+ GridCacheContext cctx,
+ final boolean primary,
final boolean backup,
final AffinityTopologyVersion topVer,
final boolean keepBinary) throws IgniteCheckedException;
@@ -221,7 +231,7 @@ public interface IgniteCacheOffheapManager extends GridCacheManager {
*
* @param readers {@code True} to clear readers.
*/
- public void clear(boolean readers);
+ public void clear(GridCacheContext cctx, boolean readers);
/**
* @param part Partition.
@@ -327,7 +337,9 @@ public interface IgniteCacheOffheapManager extends GridCacheManager {
* @return New row.
* @throws IgniteCheckedException If failed.
*/
- CacheDataRow createRow(KeyCacheObject key,
+ CacheDataRow createRow(
+ GridCacheContext cctx,
+ KeyCacheObject key,
CacheObject val,
GridCacheVersion ver,
long expireTime,
@@ -342,7 +354,9 @@ public interface IgniteCacheOffheapManager extends GridCacheManager {
* @param oldRow Old row if available.
* @throws IgniteCheckedException If failed.
*/
- void update(KeyCacheObject key,
+ void update(
+ GridCacheContext cctx,
+ KeyCacheObject key,
int part,
CacheObject val,
GridCacheVersion ver,
@@ -354,21 +368,21 @@ public interface IgniteCacheOffheapManager extends GridCacheManager {
* @param c Closure.
* @throws IgniteCheckedException If failed.
*/
- public void invoke(KeyCacheObject key, OffheapInvokeClosure c) throws IgniteCheckedException;
+ public void invoke(GridCacheContext cctx, KeyCacheObject key, OffheapInvokeClosure c) throws IgniteCheckedException;
/**
* @param key Key.
* @param partId Partition number.
* @throws IgniteCheckedException If failed.
*/
- public void remove(KeyCacheObject key, int partId) throws IgniteCheckedException;
+ public void remove(GridCacheContext cctx, KeyCacheObject key, int partId) throws IgniteCheckedException;
/**
* @param key Key.
* @return Data row.
* @throws IgniteCheckedException If failed.
*/
- public CacheDataRow find(KeyCacheObject key) throws IgniteCheckedException;
+ public CacheDataRow find(GridCacheContext cctx, KeyCacheObject key) throws IgniteCheckedException;
/**
* @return Data cursor.