You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/05/05 14:47:42 UTC
[2/4] ignite git commit: ignite-5075
http://git-wip-us.apache.org/repos/asf/ignite/blob/0096266b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 6634e98..e942b5b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -549,7 +549,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
cntrMap.clear();
// If this is the oldest node.
- if (oldest != null && (loc.equals(oldest) || exchFut.isCacheAdded(cctx.cacheId(), exchId.topologyVersion()))) {
+ if (oldest != null && (loc.equals(oldest) || grp.localStartVersion().equals(exchId.topologyVersion()))) {
if (node2part == null) {
node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq);
@@ -598,6 +598,10 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
waitForRent();
}
+ private boolean partitionLocalNode(int p, AffinityTopologyVersion topVer) {
+ return grp.affinity().nodes(p, topVer).contains(ctx.localNode());
+ }
+
/** {@inheritDoc} */
@Override public boolean afterExchange(GridDhtPartitionsExchangeFuture exchFut) throws IgniteCheckedException {
treatAllPartAsLoc = false;
@@ -631,7 +635,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
for (int p = 0; p < num; p++) {
GridDhtLocalPartition locPart = localPartition(p, topVer, false, false);
- if (grp.affinity().partitionLocalNode(p, topVer)) {
+ if (partitionLocalNode(p, topVer)) {
// This partition will be created during next topology event,
// which obviously has not happened at this point.
if (locPart == null) {
@@ -724,11 +728,12 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
GridDhtLocalPartition loc = locParts.get(p);
if (loc == null || loc.state() == EVICTED) {
- locParts.set(p, loc = new GridDhtLocalPartition(cctx, p, entryFactory));
+ locParts.set(p, loc = new GridDhtLocalPartition(ctx, grp, p, entryFactory));
- if (cctx.shared().pageStore() != null) {
+ if (ctx.pageStore() != null) {
try {
- cctx.shared().pageStore().onPartitionCreated(cctx.cacheId(), p);
+ // TODO IGNITE-5075.
+ ctx.pageStore().onPartitionCreated(grp.groupId(), p);
}
catch (IgniteCheckedException e) {
// TODO ignite-db
@@ -758,7 +763,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
GridDhtPartitionState state = loc != null ? loc.state() : null;
- if (loc != null && state != EVICTED && (state != RENTING || !cctx.allowFastEviction()))
+ if (loc != null && state != EVICTED && (state != RENTING || !grp.allowFastEviction()))
return loc;
if (!create)
@@ -773,7 +778,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
state = loc != null ? loc.state() : null;
- boolean belongs = cctx.affinity().partitionLocalNode(p, topVer);
+ boolean belongs = partitionLocalNode(p, topVer);
if (loc != null && state == EVICTED) {
locParts.set(p, loc = null);
@@ -783,7 +788,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
"(often may be caused by inconsistent 'key.hashCode()' implementation) " +
"[part=" + p + ", topVer=" + topVer + ", this.topVer=" + this.topVer + ']');
}
- else if (loc != null && state == RENTING && cctx.allowFastEviction())
+ else if (loc != null && state == RENTING && grp.allowFastEviction())
throw new GridDhtInvalidPartitionException(p, "Adding entry to partition that is concurrently evicted.");
if (loc == null) {
@@ -792,7 +797,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
"local node (often may be caused by inconsistent 'key.hashCode()' implementation) " +
"[part=" + p + ", topVer=" + topVer + ", this.topVer=" + this.topVer + ']');
- locParts.set(p, loc = new GridDhtLocalPartition(cctx, p, entryFactory));
+ locParts.set(p, loc = new GridDhtLocalPartition(ctx, grp, p, entryFactory));
if (updateSeq)
this.updateSeq.incrementAndGet();
@@ -807,9 +812,10 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
lock.writeLock().unlock();
}
- if (created && cctx.shared().pageStore() != null) {
+ if (created && ctx.pageStore() != null) {
try {
- cctx.shared().pageStore().onPartitionCreated(cctx.cacheId(), p);
+ // TODO IGNITE-5075.
+ ctx.pageStore().onPartitionCreated(grp.groupId(), p);
}
catch (IgniteCheckedException e) {
// TODO ignite-db
@@ -834,8 +840,8 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
}
/** {@inheritDoc} */
- @Override public GridDhtLocalPartition localPartition(Object key, boolean create) {
- return localPartition(cctx.affinity().partition(key), AffinityTopologyVersion.NONE, create);
+ @Override public GridDhtLocalPartition localPartition(int part) {
+ return locParts.get(part);
}
/** {@inheritDoc} */
@@ -891,7 +897,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
map.put(i, part.state());
}
- return new GridDhtPartitionMap(cctx.nodeId(),
+ return new GridDhtPartitionMap(ctx.localNodeId(),
updateSeq.get(),
topVer,
Collections.unmodifiableMap(map),
@@ -931,7 +937,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
/** {@inheritDoc} */
@Override public List<ClusterNode> nodes(int p, AffinityTopologyVersion topVer) {
- AffinityAssignment affAssignment = cctx.affinity().assignment(topVer);
+ AffinityAssignment affAssignment = grp.affinity().cachedAffinity(topVer);
List<ClusterNode> affNodes = affAssignment.get(p);
@@ -954,8 +960,8 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
try {
assert node2part != null && node2part.valid() : "Invalid node-to-partitions map [topVer1=" + topVer +
", topVer2=" + this.topVer +
- ", node=" + cctx.igniteInstanceName() +
- ", cache=" + cctx.name() +
+ ", node=" + ctx.igniteInstanceName() +
+ ", grp=" + grp.name() +
", node2part=" + node2part + ']';
List<ClusterNode> nodes = null;
@@ -967,7 +973,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
HashSet<UUID> affIds = affAssignment.getIds(p);
if (!affIds.contains(nodeId) && hasState(p, nodeId, OWNING, MOVING, RENTING)) {
- ClusterNode n = cctx.discovery().node(nodeId);
+ ClusterNode n = ctx.discovery().node(nodeId);
if (n != null && (topVer.topologyVersion() < 0 || n.order() <= topVer.topologyVersion())) {
if (nodes == null) {
@@ -1001,7 +1007,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
GridDhtPartitionState state,
GridDhtPartitionState... states) {
Collection<UUID> allIds = topVer.topologyVersion() > 0 ?
- F.nodeIds(discoCache.cacheGroupAffinityNodes(cctx.group().groupId())) :
+ F.nodeIds(discoCache.cacheGroupAffinityNodes(grp.groupId())) :
null;
lock.readLock().lock();
@@ -1010,7 +1016,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
assert node2part != null && node2part.valid() : "Invalid node-to-partitions map [topVer=" + topVer +
", allIds=" + allIds +
", node2part=" + node2part +
- ", cache=" + cctx.name() + ']';
+ ", grp=" + grp.name() + ']';
Collection<UUID> nodeIds = part2node.get(p);
@@ -1027,7 +1033,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
continue;
if (hasState(p, id, state, states)) {
- ClusterNode n = cctx.discovery().node(id);
+ ClusterNode n = ctx.discovery().node(id);
if (n != null && (topVer.topologyVersion() < 0 || n.order() <= topVer.topologyVersion()))
nodes.add(n);
@@ -1043,7 +1049,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
/** {@inheritDoc} */
@Override public List<ClusterNode> owners(int p, AffinityTopologyVersion topVer) {
- if (!cctx.rebalanceEnabled())
+ if (!grp.rebalanceEnabled())
return ownersAndMoving(p, topVer);
return nodes(p, topVer, OWNING);
@@ -1056,7 +1062,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
/** {@inheritDoc} */
@Override public List<ClusterNode> moving(int p) {
- if (!cctx.rebalanceEnabled())
+ if (!grp.rebalanceEnabled())
return ownersAndMoving(p, AffinityTopologyVersion.NONE);
return nodes(p, AffinityTopologyVersion.NONE, MOVING);
@@ -1082,11 +1088,10 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
try {
assert node2part != null && node2part.valid() : "Invalid node2part [node2part: " + node2part +
- ", cache=" + cctx.name() +
- ", started=" + cctx.started() +
+ ", grp=" + grp.name() +
", stopping=" + stopping +
- ", locNodeId=" + cctx.localNode().id() +
- ", locName=" + cctx.igniteInstanceName() + ']';
+ ", locNodeId=" + ctx.localNode().id() +
+ ", locName=" + ctx.igniteInstanceName() + ']';
GridDhtPartitionFullMap m = node2part;
@@ -1168,7 +1173,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
// then we keep the newer value.
if (newPart != null &&
(newPart.updateSequence() < part.updateSequence() ||
- (cctx.cacheStartTopologyVersion().compareTo(newPart.topologyVersion()) > 0))
+ (grp.groupStartVersion().compareTo(newPart.topologyVersion()) > 0))
) {
if (log.isDebugEnabled())
log.debug("Overriding partition map in full update map [exchId=" + exchId + ", curPart=" +
@@ -1182,7 +1187,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
for (Iterator<UUID> it = partMap.keySet().iterator(); it.hasNext(); ) {
UUID nodeId = it.next();
- if (!cctx.discovery().alive(nodeId)) {
+ if (!ctx.discovery().alive(nodeId)) {
if (log.isDebugEnabled())
log.debug("Removing left node from full map update [nodeId=" + nodeId + ", partMap=" +
partMap + ']');
@@ -1194,7 +1199,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
node2part = partMap;
- Map<Integer, Set<UUID>> p2n = new HashMap<>(cctx.affinity().partitions(), 1.0f);
+ Map<Integer, Set<UUID>> p2n = new HashMap<>(grp.affinity().partitions(), 1.0f);
for (Map.Entry<UUID, GridDhtPartitionMap> e : partMap.entrySet()) {
for (Integer p : e.getValue().keySet()) {
@@ -1213,11 +1218,11 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
boolean changed = false;
- AffinityTopologyVersion affVer = cctx.affinity().affinityTopologyVersion();
+ AffinityTopologyVersion affVer = grp.affinity().lastVersion();
- GridDhtPartitionMap nodeMap = partMap.get(cctx.localNodeId());
+ GridDhtPartitionMap nodeMap = partMap.get(ctx.localNodeId());
- if (nodeMap != null && cctx.shared().database().persistenceEnabled()) {
+ if (nodeMap != null && ctx.database().persistenceEnabled()) {
for (Map.Entry<Integer, GridDhtPartitionState> e : nodeMap.entrySet()) {
int p = e.getKey();
GridDhtPartitionState state = e.getValue();
@@ -1244,7 +1249,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
}
if (!affVer.equals(AffinityTopologyVersion.NONE) && affVer.compareTo(topVer) >= 0) {
- List<List<ClusterNode>> aff = cctx.affinity().assignments(topVer);
+ List<List<ClusterNode>> aff = grp.affinity().assignments(topVer);
changed |= checkEvictions(updateSeq, aff);
@@ -1257,7 +1262,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
log.debug("Partition map after full update: " + fullMapString());
if (changed)
- cctx.shared().exchange().scheduleResendPartitions();
+ ctx.exchange().scheduleResendPartitions();
return changed ? localPartitionMap() : null;
}
@@ -1276,7 +1281,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
if (log.isDebugEnabled())
log.debug("Updating single partition map [exchId=" + exchId + ", parts=" + mapString(parts) + ']');
- if (!cctx.discovery().alive(parts.nodeId())) {
+ if (!ctx.discovery().alive(parts.nodeId())) {
if (log.isDebugEnabled())
log.debug("Received partition update for non-existing node (will ignore) [exchId=" + exchId +
", parts=" + parts + ']');
@@ -1371,10 +1376,10 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
}
}
- AffinityTopologyVersion affVer = cctx.affinity().affinityTopologyVersion();
+ AffinityTopologyVersion affVer = grp.affinity().lastVersion();
if (!affVer.equals(AffinityTopologyVersion.NONE) && affVer.compareTo(topVer) >= 0) {
- List<List<ClusterNode>> aff = cctx.affinity().assignments(topVer);
+ List<List<ClusterNode>> aff = grp.affinity().assignments(topVer);
changed |= checkEvictions(updateSeq, aff);
@@ -1387,7 +1392,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
log.debug("Partition map after single update: " + fullMapString());
if (changed)
- cctx.shared().exchange().scheduleResendPartitions();
+ ctx.exchange().scheduleResendPartitions();
return changed ? localPartitionMap() : null;
}
@@ -1401,7 +1406,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
lock.writeLock().lock();
try {
- int parts = cctx.affinity().partitions();
+ int parts = grp.affinity().partitions();
Collection<Integer> lost = null;
@@ -1435,7 +1440,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
boolean changed = false;
if (lost != null) {
- PartitionLossPolicy plc = cctx.config().getPartitionLossPolicy();
+ PartitionLossPolicy plc = grp.config().getPartitionLossPolicy();
assert plc != null;
@@ -1467,13 +1472,14 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
}
}
- if (cctx.events().isRecordable(EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST))
- cctx.events().addPreloadEvent(part, EVT_CACHE_REBALANCE_PART_DATA_LOST,
- discoEvt.eventNode(), discoEvt.type(), discoEvt.timestamp());
+// TODO: IGNITE-5075.
+// if (cctx.events().isRecordable(EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST))
+// cctx.events().addPreloadEvent(part, EVT_CACHE_REBALANCE_PART_DATA_LOST,
+// discoEvt.eventNode(), discoEvt.type(), discoEvt.timestamp());
}
if (plc != PartitionLossPolicy.IGNORE)
- cctx.needsRecovery(true);
+ grp.needsRecovery(true);
}
return changed;
@@ -1488,7 +1494,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
lock.writeLock().lock();
try {
- int parts = cctx.affinity().partitions();
+ int parts = grp.affinity().partitions();
long updSeq = updateSeq.incrementAndGet();
for (int part = 0; part < parts; part++) {
@@ -1527,9 +1533,9 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
}
}
- checkEvictions(updSeq, cctx.affinity().assignments(topVer));
+ checkEvictions(updSeq, grp.affinity().assignments(topVer));
- cctx.needsRecovery(false);
+ grp.needsRecovery(false);
}
finally {
lock.writeLock().unlock();
@@ -1543,7 +1549,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
try {
Collection<Integer> res = null;
- int parts = cctx.affinity().partitions();
+ int parts = grp.affinity().partitions();
for (int part = 0; part < parts; part++) {
Set<UUID> nodeIds = part2node.get(part);
@@ -1580,7 +1586,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
GridDhtLocalPartition locPart = locParts.get(p);
if (locPart != null) {
- if (locPart.state() == OWNING && !owners.contains(cctx.localNodeId()))
+ if (locPart.state() == OWNING && !owners.contains(ctx.localNodeId()))
locPart.moving();
}
@@ -1605,12 +1611,12 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
* @return {@code True} if state changed.
*/
private boolean checkEvictions(long updateSeq) {
- AffinityTopologyVersion affVer = cctx.affinity().affinityTopologyVersion();
+ AffinityTopologyVersion affVer = grp.affinity().lastVersion();
boolean changed = false;
if (!affVer.equals(AffinityTopologyVersion.NONE) && affVer.compareTo(topVer) >= 0) {
- List<List<ClusterNode>> aff = cctx.affinity().assignments(topVer);
+ List<List<ClusterNode>> aff = grp.affinity().assignments(topVer);
changed = checkEvictions(updateSeq, aff);
@@ -1642,12 +1648,12 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
* @return Checks if any of the local partitions need to be evicted.
*/
private boolean checkEvictions(long updateSeq, List<List<ClusterNode>> aff) {
- if (!cctx.kernalContext().state().active())
+ if (!ctx.kernalContext().state().active())
return false;
boolean changed = false;
- UUID locId = cctx.nodeId();
+ UUID locId = ctx.localNodeId();
for (int p = 0; p < locParts.length(); p++) {
GridDhtLocalPartition part = locParts.get(p);
@@ -1660,7 +1666,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
if (state.active()) {
List<ClusterNode> affNodes = aff.get(p);
- if (!affNodes.contains(cctx.localNode())) {
+ if (!affNodes.contains(ctx.localNode())) {
List<ClusterNode> nodes = nodes(p, topVer, OWNING);
Collection<UUID> nodeIds = F.nodeIds(nodes);
@@ -1723,10 +1729,10 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
private long updateLocal(int p, GridDhtPartitionState state, long updateSeq) {
ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache();
- assert oldest != null || cctx.kernalContext().clientNode();
+ assert oldest != null || ctx.kernalContext().clientNode();
// If this node became the oldest node.
- if (cctx.localNode().equals(oldest)) {
+ if (ctx.localNode().equals(oldest)) {
long seq = node2part.updateSequence();
if (seq != updateSeq) {
@@ -1753,7 +1759,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
}
if (node2part != null) {
- UUID locNodeId = cctx.localNodeId();
+ UUID locNodeId = ctx.localNodeId();
GridDhtPartitionMap map = node2part.get(locNodeId);
@@ -1790,9 +1796,9 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
ClusterNode oldest = discoCache.oldestAliveServerNode();
- assert oldest != null || cctx.kernalContext().clientNode();
+ assert oldest != null || ctx.kernalContext().clientNode();
- ClusterNode loc = cctx.localNode();
+ ClusterNode loc = ctx.localNode();
if (node2part != null) {
if (loc.equals(oldest) && !node2part.nodeId().equals(loc.id())) {
@@ -1937,11 +1943,10 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
try {
assert node2part != null && node2part.valid() : "Invalid node2part [node2part: " + node2part +
- ", cache=" + cctx.name() +
- ", started=" + cctx.started() +
+ ", grp=" + grp.name() +
", stopping=" + stopping +
- ", locNodeId=" + cctx.localNode().id() +
- ", locName=" + cctx.igniteInstanceName() + ']';
+ ", locNodeId=" + ctx.localNodeId() +
+ ", locName=" + ctx.igniteInstanceName() + ']';
for (GridDhtPartitionMap map : node2part.values()) {
if (map.hasMovingPartitions())
@@ -1957,8 +1962,8 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
/** {@inheritDoc} */
@Override public void printMemoryStats(int threshold) {
- X.println(">>> Cache partition topology stats [igniteInstanceName=" + cctx.igniteInstanceName() +
- ", cache=" + cctx.name() + ']');
+ X.println(">>> Cache partition topology stats [igniteInstanceName=" + ctx.igniteInstanceName() +
+ ", grp=" + grp.name() + ']');
lock.readLock().lock();
@@ -1986,7 +1991,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
* @return {@code True} if given partition belongs to local node.
*/
private boolean localNode(int part, List<List<ClusterNode>> aff) {
- return aff.get(part).contains(cctx.localNode());
+ return aff.get(part).contains(ctx.localNode());
}
/**
@@ -1997,7 +2002,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
if (node2part == null || !node2part.valid())
return;
- for (int i = 0; i < cctx.affinity().partitions(); i++) {
+ for (int i = 0; i < grp.affinity().partitions(); i++) {
List<ClusterNode> affNodes = aff.get(i);
// Topology doesn't contain server nodes (just clients).
@@ -2013,7 +2018,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
rebalancedTopVer = topVer;
if (log.isDebugEnabled())
- log.debug("Updated rebalanced version [cache=" + cctx.name() + ", ver=" + rebalancedTopVer + ']');
+ log.debug("Updated rebalanced version [cache=" + grp.name() + ", ver=" + rebalancedTopVer + ']');
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/0096266b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
index c91eb7a..9b61f14 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
@@ -118,10 +118,6 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
@Override public void start() throws IgniteCheckedException {
super.start();
- preldr = new GridDhtPreloader(ctx);
-
- preldr.start();
-
ctx.io().addHandler(ctx.cacheId(), GridNearGetRequest.class, new CI2<UUID, GridNearGetRequest>() {
@Override public void apply(UUID nodeId, GridNearGetRequest req) {
processNearGetRequest(nodeId, req);
@@ -382,7 +378,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
}
IgniteInternalFuture<Object> keyFut = F.isEmpty(req.keys()) ? null :
- ctx.dht().dhtPreloader().request(req.keys(), req.topologyVersion());
+ ctx.group().preloader().request(ctx, req.keys(), req.topologyVersion());
if (keyFut == null || keyFut.isDone()) {
if (keyFut != null) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/0096266b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index 2292cb2..10ed584 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -899,7 +899,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
) {
assert keys != null;
- IgniteInternalFuture<Object> keyFut = ctx.dht().dhtPreloader().request(keys, topVer);
+ IgniteInternalFuture<Object> keyFut = ctx.group().preloader().request(cacheCtx, keys, topVer);
// Prevent embedded future creation if possible.
if (keyFut == null || keyFut.isDone()) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/0096266b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 75cbd00..505df91 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -38,13 +38,16 @@ import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheEntryInfoCollection;
+import org.apache.ignite.internal.processors.cache.CacheGroupInfrastructure;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
import org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
@@ -78,7 +81,10 @@ import static org.apache.ignite.internal.processors.dr.GridDrType.DR_PRELOAD;
@SuppressWarnings("NonConstantFieldWithUpperCaseName")
public class GridDhtPartitionDemander {
/** */
- private final GridCacheContext<?, ?> cctx;
+ private final GridCacheSharedContext<?, ?> ctx;
+
+ /** */
+ private final CacheGroupInfrastructure grp;
/** */
private final IgniteLogger log;
@@ -116,16 +122,18 @@ public class GridDhtPartitionDemander {
private final AtomicBoolean stoppedEvtSent = new AtomicBoolean();
/**
- * @param cctx Cctx.
+ * @param grp Ccahe group.
*/
- public GridDhtPartitionDemander(GridCacheContext<?, ?> cctx) {
- assert cctx != null;
+ public GridDhtPartitionDemander(CacheGroupInfrastructure grp) {
+ assert grp != null;
+
+ this.grp = grp;
- this.cctx = cctx;
+ ctx = grp.shared();
- log = cctx.logger(getClass());
+ log = ctx.logger(getClass());
- boolean enabled = cctx.rebalanceEnabled() && !cctx.kernalContext().clientNode();
+ boolean enabled = grp.rebalanceEnabled() && !ctx.kernalContext().clientNode();
rebalanceFut = new RebalanceFuture();//Dummy.
@@ -137,7 +145,7 @@ public class GridDhtPartitionDemander {
Map<Integer, Object> tops = new HashMap<>();
- for (int idx = 0; idx < cctx.gridConfig().getRebalanceThreadPoolSize(); idx++)
+ for (int idx = 0; idx < grp.shared().kernalContext().config().getRebalanceThreadPoolSize(); idx++)
tops.put(idx, GridCachePartitionExchangeManager.rebalanceTopic(idx));
rebalanceTopics = tops;
@@ -196,7 +204,7 @@ public class GridDhtPartitionDemander {
GridTimeoutObject obj = lastTimeoutObj.getAndSet(null);
if (obj != null)
- cctx.time().removeTimeoutObject(obj);
+ ctx.time().removeTimeoutObject(obj);
final GridDhtPartitionsExchangeFuture exchFut = lastExchangeFut;
@@ -208,7 +216,7 @@ public class GridDhtPartitionDemander {
exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
@Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
- IgniteInternalFuture<Boolean> fut0 = cctx.shared().exchange().forceRebalance(exchFut);
+ IgniteInternalFuture<Boolean> fut0 = ctx.exchange().forceRebalance(exchFut);
fut0.listen(new IgniteInClosure<IgniteInternalFuture<Boolean>>() {
@Override public void apply(IgniteInternalFuture<Boolean> future) {
@@ -237,7 +245,7 @@ public class GridDhtPartitionDemander {
*/
private boolean topologyChanged(RebalanceFuture fut) {
return
- !cctx.affinity().affinityTopologyVersion().equals(fut.topologyVersion()) || // Topology already changed.
+ !grp.affinity().lastVersion().equals(fut.topologyVersion()) || // Topology already changed.
fut != rebalanceFut; // Same topology, but dummy exchange forced because of missing partitions.
}
@@ -249,7 +257,8 @@ public class GridDhtPartitionDemander {
private void preloadEvent(int part, int type, DiscoveryEvent discoEvt) {
assert discoEvt != null;
- cctx.events().addPreloadEvent(part, type, discoEvt.eventNode(), discoEvt.type(), discoEvt.timestamp());
+ // TODO IGNITE-5075.
+ // cctx.events().addPreloadEvent(part, type, discoEvt.eventNode(), discoEvt.type(), discoEvt.timestamp());
}
/**
@@ -279,12 +288,12 @@ public class GridDhtPartitionDemander {
assert force == (forcedRebFut != null);
- long delay = cctx.config().getRebalanceDelay();
+ long delay = grp.config().getRebalanceDelay();
if (delay == 0 || force) {
final RebalanceFuture oldFut = rebalanceFut;
- final RebalanceFuture fut = new RebalanceFuture(assigns, cctx, log, startedEvtSent, stoppedEvtSent, cnt);
+ final RebalanceFuture fut = new RebalanceFuture(grp, assigns, log, startedEvtSent, stoppedEvtSent, cnt);
if (!oldFut.isInitial())
oldFut.cancel();
@@ -330,7 +339,7 @@ public class GridDhtPartitionDemander {
fut.onDone(true);
- ((GridFutureAdapter)cctx.preloader().syncFuture()).onDone();
+ ((GridFutureAdapter)grp.preloader().syncFuture()).onDone();
fut.sendRebalanceFinishedEvent();
@@ -381,7 +390,7 @@ public class GridDhtPartitionDemander {
GridTimeoutObject obj = lastTimeoutObj.get();
if (obj != null)
- cctx.time().removeTimeoutObject(obj);
+ ctx.time().removeTimeoutObject(obj);
final GridDhtPartitionsExchangeFuture exchFut = lastExchangeFut;
@@ -391,7 +400,7 @@ public class GridDhtPartitionDemander {
@Override public void onTimeout() {
exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
@Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> f) {
- cctx.shared().exchange().forceRebalance(exchFut);
+ ctx.exchange().forceRebalance(exchFut);
}
});
}
@@ -399,7 +408,7 @@ public class GridDhtPartitionDemander {
lastTimeoutObj.set(obj);
- cctx.time().addTimeoutObject(obj);
+ ctx.time().addTimeoutObject(obj);
}
return null;
@@ -433,17 +442,19 @@ public class GridDhtPartitionDemander {
Collection<Integer> parts= e.getValue().partitions();
- assert parts != null : "Partitions are null [cache=" + cctx.name() + ", fromNode=" + nodeId + "]";
+ assert parts != null : "Partitions are null [grp=" + grp.name() + ", fromNode=" + nodeId + "]";
fut.remaining.put(nodeId, new T2<>(U.currentTimeMillis(), parts));
}
}
+ final CacheConfiguration cfg = grp.config();
+
+ int lsnrCnt = ctx.gridConfig().getRebalanceThreadPoolSize();
+
for (Map.Entry<ClusterNode, GridDhtPartitionDemandMessage> e : assigns.entrySet()) {
final ClusterNode node = e.getKey();
- final CacheConfiguration cfg = cctx.config();
-
final Collection<Integer> parts = fut.remaining.get(node.id()).get2();
GridDhtPartitionDemandMessage d = e.getValue();
@@ -452,8 +463,6 @@ public class GridDhtPartitionDemander {
", fromNode=" + node.id() + ", partitionsCount=" + parts.size() +
", topology=" + fut.topologyVersion() + ", updateSeq=" + fut.updateSeq + "]");
- int lsnrCnt = cctx.gridConfig().getRebalanceThreadPoolSize();
-
List<Set<Integer>> sParts = new ArrayList<>(lsnrCnt);
for (int cnt = 0; cnt < lsnrCnt; cnt++)
@@ -473,15 +482,15 @@ public class GridDhtPartitionDemander {
initD.topic(rebalanceTopics.get(cnt));
initD.updateSequence(fut.updateSeq);
- initD.timeout(cctx.config().getRebalanceTimeout());
+ initD.timeout(grp.config().getRebalanceTimeout());
synchronized (fut) {
if (fut.isDone())
return;// Future can be already cancelled at this moment and all failovers happened.
// New requests will not be covered by failovers.
- cctx.io().sendOrderedMessage(node,
- rebalanceTopics.get(cnt), initD, cctx.ioPolicy(), initD.timeout());
+ ctx.io().sendOrderedMessage(node,
+ rebalanceTopics.get(cnt), initD, grp.ioPolicy(), initD.timeout());
}
@@ -505,11 +514,11 @@ public class GridDhtPartitionDemander {
for (Integer part : parts) {
try {
- if (cctx.shared().database().persistenceEnabled()) {
+ if (ctx.database().persistenceEnabled()) {
if (partCntrs == null)
partCntrs = new HashMap<>(parts.size(), 1.0f);
- GridDhtLocalPartition p = cctx.topology().localPartition(part, old.topologyVersion(), false);
+ GridDhtLocalPartition p = grp.topology().localPartition(part, old.topologyVersion(), false);
partCntrs.put(part, p.initialUpdateCounter());
}
@@ -585,7 +594,7 @@ public class GridDhtPartitionDemander {
final RebalanceFuture fut = rebalanceFut;
- ClusterNode node = cctx.node(id);
+ ClusterNode node = ctx.node(id);
if (node == null)
return;
@@ -609,14 +618,16 @@ public class GridDhtPartitionDemander {
return;
}
- final GridDhtPartitionTopology top = cctx.dht().topology();
+ final GridDhtPartitionTopology top = grp.topology();
try {
+ AffinityAssignment aff = grp.affinity().cachedAffinity(topVer);
+
// Preload.
for (Map.Entry<Integer, CacheEntryInfoCollection> e : supply.infos().entrySet()) {
int p = e.getKey();
- if (cctx.affinity().partitionLocalNode(p, topVer)) {
+ if (aff.get(p).contains(ctx.localNode())) {
GridDhtLocalPartition part = top.localPartition(p, topVer, true);
assert part != null;
@@ -627,7 +638,7 @@ public class GridDhtPartitionDemander {
boolean reserved = part.reserve();
assert reserved : "Failed to reserve partition [igniteInstanceName=" +
- cctx.igniteInstanceName() + ", cacheName=" + cctx.namex() + ", part=" + part + ']';
+ ctx.igniteInstanceName() + ", grp=" + grp.name() + ", part=" + part + ']';
part.lock();
@@ -686,7 +697,7 @@ public class GridDhtPartitionDemander {
// Only request partitions based on latest topology version.
for (Integer miss : supply.missed()) {
- if (cctx.affinity().partitionLocalNode(miss, topVer))
+ if (aff.get(miss).contains(ctx.localNode()))
fut.partitionMissed(id, miss);
}
@@ -696,14 +707,14 @@ public class GridDhtPartitionDemander {
GridDhtPartitionDemandMessage d = new GridDhtPartitionDemandMessage(
supply.updateSequence(), supply.topologyVersion(), cctx.cacheId());
- d.timeout(cctx.config().getRebalanceTimeout());
+ d.timeout(grp.config().getRebalanceTimeout());
d.topic(rebalanceTopics.get(idx));
if (!topologyChanged(fut) && !fut.isDone()) {
// Send demand message.
- cctx.io().sendOrderedMessage(node, rebalanceTopics.get(idx),
- d, cctx.ioPolicy(), cctx.config().getRebalanceTimeout());
+ ctx.io().sendOrderedMessage(node, rebalanceTopics.get(idx),
+ d, grp.ioPolicy(), grp.config().getRebalanceTimeout());
}
}
catch (IgniteCheckedException e) {
@@ -732,12 +743,15 @@ public class GridDhtPartitionDemander {
GridCacheEntryInfo entry,
AffinityTopologyVersion topVer
) throws IgniteCheckedException {
- cctx.shared().database().checkpointReadLock();
+ ctx.database().checkpointReadLock();
try {
GridCacheEntryEx cached = null;
try {
+ // TODO IGNITE-5075.
+ GridCacheContext cctx = grp.cacheContext();
+
cached = cctx.dht().entryEx(entry.key());
if (log.isDebugEnabled())
@@ -789,10 +803,10 @@ public class GridDhtPartitionDemander {
}
catch (IgniteCheckedException e) {
throw new IgniteCheckedException("Failed to cache rebalanced entry (will stop rebalancing) [local=" +
- cctx.nodeId() + ", node=" + pick.id() + ", key=" + entry.key() + ", part=" + p + ']', e);
+ ctx.localNode() + ", node=" + pick.id() + ", key=" + entry.key() + ", part=" + p + ']', e);
}
finally {
- cctx.shared().database().checkpointReadUnlock();
+ ctx.database().checkpointReadUnlock();
}
return true;
@@ -810,6 +824,12 @@ public class GridDhtPartitionDemander {
/** */
private static final long serialVersionUID = 1L;
+ /** */
+ private final GridCacheSharedContext<?, ?> ctx;
+
+ /** */
+ private final CacheGroupInfrastructure grp;
+
/** Should EVT_CACHE_REBALANCE_STARTED event be sent or not. */
private final AtomicBoolean startedEvtSent;
@@ -817,9 +837,6 @@ public class GridDhtPartitionDemander {
private final AtomicBoolean stoppedEvtSent;
/** */
- private final GridCacheContext<?, ?> cctx;
-
- /** */
private final IgniteLogger log;
/** Remaining. T2: startTime, partitions */
@@ -840,14 +857,15 @@ public class GridDhtPartitionDemander {
/**
* @param assigns Assigns.
- * @param cctx Context.
+ * @param grp Cache group.
* @param log Logger.
* @param startedEvtSent Start event sent flag.
* @param stoppedEvtSent Stop event sent flag.
* @param updateSeq Update sequence.
*/
- RebalanceFuture(GridDhtPreloaderAssignments assigns,
- GridCacheContext<?, ?> cctx,
+ RebalanceFuture(
+ CacheGroupInfrastructure grp,
+ GridDhtPreloaderAssignments assigns,
IgniteLogger log,
AtomicBoolean startedEvtSent,
AtomicBoolean stoppedEvtSent,
@@ -856,20 +874,23 @@ public class GridDhtPartitionDemander {
this.exchFut = assigns.exchangeFuture();
this.topVer = assigns.topologyVersion();
- this.cctx = cctx;
+ this.grp = grp;
this.log = log;
this.startedEvtSent = startedEvtSent;
this.stoppedEvtSent = stoppedEvtSent;
this.updateSeq = updateSeq;
+
+ ctx= grp.shared();
}
/**
* Dummy future. Will be done by real one.
*/
- public RebalanceFuture() {
+ RebalanceFuture() {
this.exchFut = null;
this.topVer = null;
- this.cctx = null;
+ this.ctx = null;
+ this.grp = null;
this.log = null;
this.startedEvtSent = null;
this.stoppedEvtSent = null;
@@ -910,7 +931,7 @@ public class GridDhtPartitionDemander {
U.log(log, "Cancelled rebalancing from all nodes [topology=" + topologyVersion() + ']');
- if (!cctx.kernalContext().isStopping()) {
+ if (!ctx.kernalContext().isStopping()) {
for (UUID nodeId : remaining.keySet())
cleanupRemoteContexts(nodeId);
}
@@ -931,7 +952,7 @@ public class GridDhtPartitionDemander {
if (isDone())
return;
- U.log(log, ("Cancelled rebalancing [cache=" + cctx.name() +
+ U.log(log, ("Cancelled rebalancing [grp=" + grp.name() +
", fromNode=" + nodeId + ", topology=" + topologyVersion() +
", time=" + (U.currentTimeMillis() - remaining.get(nodeId).get1()) + " ms]"));
@@ -966,7 +987,7 @@ public class GridDhtPartitionDemander {
* @param nodeId Node id.
*/
private void cleanupRemoteContexts(UUID nodeId) {
- ClusterNode node = cctx.discovery().node(nodeId);
+ ClusterNode node = ctx.discovery().node(nodeId);
if (node == null)
return;
@@ -974,14 +995,14 @@ public class GridDhtPartitionDemander {
GridDhtPartitionDemandMessage d = new GridDhtPartitionDemandMessage(
-1/* remove supply context signal */, this.topologyVersion(), cctx.cacheId());
- d.timeout(cctx.config().getRebalanceTimeout());
+ d.timeout(grp.config().getRebalanceTimeout());
try {
- for (int idx = 0; idx < cctx.gridConfig().getRebalanceThreadPoolSize(); idx++) {
+ for (int idx = 0; idx < ctx.gridConfig().getRebalanceThreadPoolSize(); idx++) {
d.topic(GridCachePartitionExchangeManager.rebalanceTopic(idx));
- cctx.io().sendOrderedMessage(node, GridCachePartitionExchangeManager.rebalanceTopic(idx),
- d, cctx.ioPolicy(), cctx.config().getRebalanceTimeout());
+ ctx.io().sendOrderedMessage(node, GridCachePartitionExchangeManager.rebalanceTopic(idx),
+ d, grp.ioPolicy(), grp.config().getRebalanceTimeout());
}
}
catch (IgniteCheckedException ignored) {
@@ -999,20 +1020,21 @@ public class GridDhtPartitionDemander {
if (isDone())
return;
- if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_LOADED))
- preloadEvent(p, EVT_CACHE_REBALANCE_PART_LOADED,
- exchFut.discoveryEvent());
+ // TODO IGNITE-5075.
+// if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_LOADED))
+// preloadEvent(p, EVT_CACHE_REBALANCE_PART_LOADED,
+// exchFut.discoveryEvent());
T2<Long, Collection<Integer>> t = remaining.get(nodeId);
- assert t != null : "Remaining not found [cache=" + cctx.name() + ", fromNode=" + nodeId +
+ assert t != null : "Remaining not found [grp=" + grp.name() + ", fromNode=" + nodeId +
", part=" + p + "]";
Collection<Integer> parts = t.get2();
boolean rmvd = parts.remove(p);
- assert rmvd : "Partition already done [cache=" + cctx.name() + ", fromNode=" + nodeId +
+ assert rmvd : "Partition already done [grp=" + grp.name() + ", fromNode=" + nodeId +
", part=" + p + ", left=" + parts + "]";
if (parts.isEmpty()) {
@@ -1035,7 +1057,8 @@ public class GridDhtPartitionDemander {
private void preloadEvent(int part, int type, DiscoveryEvent discoEvt) {
assert discoEvt != null;
- cctx.events().addPreloadEvent(part, type, discoEvt.eventNode(), discoEvt.type(), discoEvt.timestamp());
+ // TODO IGNITE-5075.
+ // cctx.events().addPreloadEvent(part, type, discoEvt.eventNode(), discoEvt.type(), discoEvt.timestamp());
}
/**
@@ -1063,7 +1086,7 @@ public class GridDhtPartitionDemander {
if (log.isDebugEnabled())
log.debug("Completed rebalance future: " + this);
- cctx.shared().exchange().scheduleResendPartitions();
+ ctx.exchange().scheduleResendPartitions();
Collection<Integer> m = new HashSet<>();
@@ -1077,13 +1100,13 @@ public class GridDhtPartitionDemander {
onDone(false); //Finished but has missed partitions, will force dummy exchange
- cctx.shared().exchange().forceDummyExchange(true, exchFut);
+ ctx.exchange().forceDummyExchange(true, exchFut);
return;
}
- if (!cancelled && !cctx.preloader().syncFuture().isDone())
- ((GridFutureAdapter)cctx.preloader().syncFuture()).onDone();
+ if (!cancelled && !grp.preloader().syncFuture().isDone())
+ ((GridFutureAdapter)grp.preloader().syncFuture()).onDone();
onDone(!cancelled);
}
@@ -1093,24 +1116,26 @@ public class GridDhtPartitionDemander {
*
*/
private void sendRebalanceStartedEvent() {
- if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_STARTED) &&
- (!cctx.isReplicated() || !startedEvtSent.get())) {
- preloadEvent(EVT_CACHE_REBALANCE_STARTED, exchFut.discoveryEvent());
-
- startedEvtSent.set(true);
- }
+ // TODO IGNITE-5075.
+// if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_STARTED) &&
+// (!cctx.isReplicated() || !startedEvtSent.get())) {
+// preloadEvent(EVT_CACHE_REBALANCE_STARTED, exchFut.discoveryEvent());
+//
+// startedEvtSent.set(true);
+// }
}
/**
*
*/
private void sendRebalanceFinishedEvent() {
- if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_STOPPED) &&
- (!cctx.isReplicated() || !stoppedEvtSent.get())) {
- preloadEvent(EVT_CACHE_REBALANCE_STOPPED, exchFut.discoveryEvent());
-
- stoppedEvtSent.set(true);
- }
+ // TODO IGNITE-5075.
+// if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_STOPPED) &&
+// (!cctx.isReplicated() || !stoppedEvtSent.get())) {
+// preloadEvent(EVT_CACHE_REBALANCE_STOPPED, exchFut.discoveryEvent());
+//
+// stoppedEvtSent.set(true);
+// }
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/0096266b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
index f7f0aff..84c3d23 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
@@ -26,6 +27,7 @@ import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheGroupInfrastructure;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
import org.apache.ignite.internal.processors.cache.IgniteRebalanceIterator;
@@ -47,7 +49,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
*/
class GridDhtPartitionSupplier {
/** */
- private final GridCacheContext<?, ?> cctx;
+ private final CacheGroupInfrastructure grp;
/** */
private final IgniteLogger log;
@@ -65,18 +67,18 @@ class GridDhtPartitionSupplier {
private final Map<T3<UUID, Integer, AffinityTopologyVersion>, SupplyContext> scMap = new HashMap<>();
/**
- * @param cctx Cache context.
+ * @param grp Cache group.
*/
- GridDhtPartitionSupplier(GridCacheContext<?, ?> cctx) {
- assert cctx != null;
+ GridDhtPartitionSupplier(CacheGroupInfrastructure grp) {
+ assert grp != null;
- this.cctx = cctx;
+ this.grp = grp;
- log = cctx.logger(getClass());
+ log = grp.shared().logger(getClass());
- top = cctx.dht().topology();
+ top = grp.topology();
- depEnabled = cctx.gridDeploy().enabled();
+ depEnabled = grp.shared().gridDeploy().enabled();
}
/**
@@ -171,7 +173,7 @@ class GridDhtPartitionSupplier {
assert d != null;
assert id != null;
- AffinityTopologyVersion cutTop = cctx.affinity().affinityTopologyVersion();
+ AffinityTopologyVersion cutTop = grp.affinity().lastVersion();
AffinityTopologyVersion demTop = d.topologyVersion();
T3<UUID, Integer, AffinityTopologyVersion> scId = new T3<>(id, idx, demTop);
@@ -199,7 +201,7 @@ class GridDhtPartitionSupplier {
GridDhtPartitionSupplyMessage s = new GridDhtPartitionSupplyMessage(
d.updateSequence(), cctx.cacheId(), d.topologyVersion(), cctx.deploymentEnabled());
- ClusterNode node = cctx.discovery().node(id);
+ ClusterNode node = grp.shared().discovery().node(id);
if (node == null)
return; // Context will be cleaned at topology change.
@@ -225,7 +227,7 @@ class GridDhtPartitionSupplier {
boolean newReq = true;
- long maxBatchesCnt = cctx.config().getRebalanceBatchesPrefetchCount();
+ long maxBatchesCnt = grp.config().getRebalanceBatchesPrefetchCount();
if (sctx != null) {
phase = sctx.phase;
@@ -234,7 +236,7 @@ class GridDhtPartitionSupplier {
}
else {
if (log.isDebugEnabled())
- log.debug("Starting supplying rebalancing [cache=" + cctx.name() +
+ log.debug("Starting supplying rebalancing [grp=" + grp.name() +
", fromNode=" + node.id() + ", partitionsCount=" + d.partitions().size() +
", topology=" + d.topologyVersion() + ", updateSeq=" + d.updateSequence() +
", idx=" + idx + "]");
@@ -280,7 +282,7 @@ class GridDhtPartitionSupplier {
IgniteRebalanceIterator iter;
if (sctx == null || sctx.entryIt == null) {
- iter = cctx.offheap().rebalanceIterator(part, d.topologyVersion(), d.partitionCounter(part));
+ iter = grp.offheap().rebalanceIterator(part, d.topologyVersion(), d.partitionCounter(part));
if (!iter.historical())
s.clean(part);
@@ -289,7 +291,9 @@ class GridDhtPartitionSupplier {
iter = (IgniteRebalanceIterator)sctx.entryIt;
while (iter.hasNext()) {
- if (!cctx.affinity().partitionBelongs(node, part, d.topologyVersion())) {
+ List<ClusterNode> nodes = grp.affinity().cachedAffinity(d.topologyVersion()).get(part);
+
+ if (!nodes.contains(node)) {
// Demander no longer needs this partition,
// so we send '-1' partition and move on.
s.missed(part);
@@ -313,7 +317,7 @@ class GridDhtPartitionSupplier {
break;
}
- if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
+ if (s.messageSize() >= grp.config().getRebalanceBatchSize()) {
if (++bCnt >= maxBatchesCnt) {
saveSupplyContext(scId,
phase,
@@ -400,7 +404,7 @@ class GridDhtPartitionSupplier {
reply(node, d, s, scId);
if (log.isDebugEnabled())
- log.debug("Finished supplying rebalancing [cache=" + cctx.name() +
+ log.debug("Finished supplying rebalancing [grp=" + grp.name() +
", fromNode=" + node.id() +
", topology=" + d.topologyVersion() + ", updateSeq=" + d.updateSequence() +
", idx=" + idx + "]");
@@ -427,16 +431,15 @@ class GridDhtPartitionSupplier {
GridDhtPartitionSupplyMessage s,
T3<UUID, Integer, AffinityTopologyVersion> scId)
throws IgniteCheckedException {
-
try {
if (log.isDebugEnabled())
log.debug("Replying to partition demand [node=" + n.id() + ", demand=" + d + ", supply=" + s + ']');
- cctx.io().sendOrderedMessage(n, d.topic(), s, cctx.ioPolicy(), d.timeout());
+ grp.shared().io().sendOrderedMessage(n, d.topic(), s, grp.ioPolicy(), d.timeout());
// Throttle preloading.
- if (cctx.config().getRebalanceThrottle() > 0)
- U.sleep(cctx.config().getRebalanceThrottle());
+ if (grp.config().getRebalanceThrottle() > 0)
+ U.sleep(grp.config().getRebalanceThrottle());
return true;
}
@@ -469,7 +472,7 @@ class GridDhtPartitionSupplier {
AffinityTopologyVersion topVer,
long updateSeq) {
synchronized (scMap) {
- if (cctx.affinity().affinityTopologyVersion().equals(topVer)) {
+ if (grp.affinity().lastVersion().equals(topVer)) {
assert scMap.get(t) == null;
scMap.put(t,
http://git-wip-us.apache.org/repos/asf/ignite/blob/0096266b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
index ee461ab..016ec6f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
@@ -206,32 +206,6 @@ public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements G
* @param ctx Cache context.
* @throws IgniteCheckedException If failed.
*/
- void addEntry(int p, GridCacheEntryInfo info, GridCacheContext ctx) throws IgniteCheckedException {
- assert info != null;
-
- marshalInfo(info, ctx);
-
- msgSize += info.marshalledSize(ctx);
-
- CacheEntryInfoCollection infoCol = infos().get(p);
-
- if (infoCol == null) {
- msgSize += 4;
-
- infos().put(p, infoCol = new CacheEntryInfoCollection());
-
- infoCol.init();
- }
-
- infoCol.add(info);
- }
-
- /**
- * @param p Partition.
- * @param info Entry to add.
- * @param ctx Cache context.
- * @throws IgniteCheckedException If failed.
- */
void addEntry0(int p, GridCacheEntryInfo info, GridCacheContext ctx) throws IgniteCheckedException {
assert info != null;
assert (info.key() != null || info.keyBytes() != null);
http://git-wip-us.apache.org/repos/asf/ignite/blob/0096266b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
index 74bbcb0..3f74f75 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
@@ -87,10 +87,10 @@ public abstract class GridDhtPartitionsAbstractMessage extends GridCacheMessage
}
/**
- * @param cacheId Cache ID.
+ * @param grpId Cache group ID.
* @return Parition update counters.
*/
- public abstract Map<Integer, T2<Long, Long>> partitionUpdateCounters(int cacheId);
+ public abstract Map<Integer, T2<Long, Long>> partitionUpdateCounters(int grpId);
/**
* @return Last used version among all nodes.
http://git-wip-us.apache.org/repos/asf/ignite/blob/0096266b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 499537d..1e656b0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -55,6 +55,7 @@ import org.apache.ignite.internal.pagemem.snapshot.StartFullSnapshotAckDiscovery
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage;
+import org.apache.ignite.internal.processors.cache.CacheGroupInfrastructure;
import org.apache.ignite.internal.processors.cache.CacheInvalidStateException;
import org.apache.ignite.internal.processors.cache.ClusterState;
import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
@@ -644,7 +645,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
}
for (GridClientPartitionTopology top : cctx.exchange().clientTopologies())
- top.updateTopologyVersion(exchId, this, -1, stopping(top.cacheId()));
+ top.updateTopologyVersion(exchId, this, -1, cacheGroupStopping(top.groupId()));
}
/**
@@ -738,14 +739,14 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
if (crd != null) {
if (crd.isLocal()) {
- for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
- boolean updateTop = !cacheCtx.isLocal() &&
- exchId.topologyVersion().equals(cacheCtx.startTopologyVersion());
+ for (CacheGroupInfrastructure grp : cctx.cache().cacheGroups()) {
+ boolean updateTop = !grp.isLocal() &&
+ exchId.topologyVersion().equals(grp.localStartVersion());
if (updateTop) {
for (GridClientPartitionTopology top : cctx.exchange().clientTopologies()) {
- if (top.cacheId() == cacheCtx.cacheId()) {
- cacheCtx.topology().update(exchId,
+ if (top.groupId() == grp.groupId()) {
+ grp.topology().update(exchId,
top.partitionMap(true),
top.updateCounters(false));
@@ -766,8 +767,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
}
else {
if (centralizedAff) { // Last server node failed.
- for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
- GridAffinityAssignmentCache aff = cacheCtx.affinity().affinityCache();
+ for (CacheGroupInfrastructure grp : cctx.cache().cacheGroups()) {
+ GridAffinityAssignmentCache aff = grp.affinity();
aff.initialize(topologyVersion(), aff.idealAssignment());
}
@@ -1009,6 +1010,10 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
U.dumpThreads(log);
}
+ public boolean cacheGroupStopping(int grpId) {
+ return exchActions != null && exchActions.cacheGroupStopping(grpId);
+ }
+
/**
* @param cacheId Cache ID to check.
* @return {@code True} if cache is stopping by this exchange.
@@ -1456,9 +1461,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
Map<Integer, CounterWithNodes> maxCntrs = new HashMap<>();
for (Map.Entry<UUID, GridDhtPartitionsAbstractMessage> e : msgs.entrySet()) {
- assert e.getValue().partitionUpdateCounters(top.cacheId()) != null;
+ assert e.getValue().partitionUpdateCounters(top.groupId()) != null;
- for (Map.Entry<Integer, T2<Long, Long>> e0 : e.getValue().partitionUpdateCounters(top.cacheId()).entrySet()) {
+ for (Map.Entry<Integer, T2<Long, Long>> e0 : e.getValue().partitionUpdateCounters(top.groupId()).entrySet()) {
int p = e0.getKey();
UUID uuid = e.getKey();
@@ -1755,19 +1760,19 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
cctx.versions().onExchange(msg.lastVersion().order());
for (Map.Entry<Integer, GridDhtPartitionFullMap> entry : msg.partitions().entrySet()) {
- Integer cacheId = entry.getKey();
+ Integer grpId = entry.getKey();
- Map<Integer, T2<Long, Long>> cntrMap = msg.partitionUpdateCounters(cacheId);
+ Map<Integer, T2<Long, Long>> cntrMap = msg.partitionUpdateCounters(grpId);
- GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
+ CacheGroupInfrastructure grp = cctx.cache().cacheGroup(grpId);
- if (cacheCtx != null)
- cacheCtx.topology().update(exchId, entry.getValue(), cntrMap);
+ if (grp != null)
+ grp.topology().update(exchId, entry.getValue(), cntrMap);
else {
ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(AffinityTopologyVersion.NONE);
if (oldest != null && oldest.isLocal())
- cctx.exchange().clientTopology(cacheId, this).update(exchId, entry.getValue(), cntrMap);
+ cctx.exchange().clientTopology(grpId, this).update(exchId, entry.getValue(), cntrMap);
}
}
}
@@ -1781,13 +1786,13 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
msgs.put(node.id(), msg);
for (Map.Entry<Integer, GridDhtPartitionMap> entry : msg.partitions().entrySet()) {
- Integer cacheId = entry.getKey();
- GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
+ Integer grpId = entry.getKey();
+ CacheGroupInfrastructure grp = cctx.cache().cacheGroup(grpId);
- GridDhtPartitionTopology top = cacheCtx != null ? cacheCtx.topology() :
- cctx.exchange().clientTopology(cacheId, this);
+ GridDhtPartitionTopology top = grp != null ? grp.topology() :
+ cctx.exchange().clientTopology(grpId, this);
- top.update(exchId, entry.getValue(), msg.partitionUpdateCounters(cacheId));
+ top.update(exchId, entry.getValue(), msg.partitionUpdateCounters(grpId));
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/0096266b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
index 73a3481..b327ad1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
@@ -122,24 +122,24 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
}
/**
- * @param cacheId Cache ID.
+ * @param grpId Cache group ID.
* @return {@code True} if message contains full map for given cache.
*/
- public boolean containsCache(int cacheId) {
- return parts != null && parts.containsKey(cacheId);
+ public boolean containsGroup(int grpId) {
+ return parts != null && parts.containsKey(grpId);
}
/**
- * @param cacheId Cache ID.
+ * @param grpId Cache group ID.
* @param fullMap Full partitions map.
* @param dupDataCache Optional ID of cache with the same partition state map.
*/
- public void addFullPartitionsMap(int cacheId, GridDhtPartitionFullMap fullMap, @Nullable Integer dupDataCache) {
+ public void addFullPartitionsMap(int grpId, GridDhtPartitionFullMap fullMap, @Nullable Integer dupDataCache) {
if (parts == null)
parts = new HashMap<>();
- if (!parts.containsKey(cacheId)) {
- parts.put(cacheId, fullMap);
+ if (!parts.containsKey(grpId)) {
+ parts.put(grpId, fullMap);
if (dupDataCache != null) {
assert compress;
@@ -148,30 +148,30 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
if (dupPartsData == null)
dupPartsData = new HashMap<>();
- dupPartsData.put(cacheId, dupDataCache);
+ dupPartsData.put(grpId, dupDataCache);
}
}
}
/**
- * @param cacheId Cache ID.
+ * @param grpId Cache group ID.
* @param cntrMap Partition update counters.
*/
- public void addPartitionUpdateCounters(int cacheId, Map<Integer, T2<Long, Long>> cntrMap) {
+ public void addPartitionUpdateCounters(int grpId, Map<Integer, T2<Long, Long>> cntrMap) {
if (partCntrs == null)
partCntrs = new HashMap<>();
- if (!partCntrs.containsKey(cacheId))
- partCntrs.put(cacheId, cntrMap);
+ if (!partCntrs.containsKey(grpId))
+ partCntrs.put(grpId, cntrMap);
}
/**
- * @param cacheId Cache ID.
+ * @param grpId Cache group ID.
* @return Partition update counters.
*/
- @Override public Map<Integer, T2<Long, Long>> partitionUpdateCounters(int cacheId) {
+ @Override public Map<Integer, T2<Long, Long>> partitionUpdateCounters(int grpId) {
if (partCntrs != null) {
- Map<Integer, T2<Long, Long>> res = partCntrs.get(cacheId);
+ Map<Integer, T2<Long, Long>> res = partCntrs.get(grpId);
return res != null ? res : Collections.<Integer, T2<Long, Long>>emptyMap();
}
@@ -427,7 +427,6 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
return 46;
}
- //todo
/** {@inheritDoc} */
@Override public byte fieldsCount() {
return 11;
http://git-wip-us.apache.org/repos/asf/ignite/blob/0096266b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index e197864..f2c9158 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -37,6 +37,7 @@ import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheGroupInfrastructure;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
@@ -127,7 +128,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
DiscoveryEvent e = (DiscoveryEvent)evt;
try {
- ClusterNode loc = cctx.localNode();
+ ClusterNode loc = ctx.localNode();
assert e.type() == EVT_NODE_JOINED || e.type() == EVT_NODE_LEFT || e.type() == EVT_NODE_FAILED;
@@ -148,12 +149,12 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
};
/**
- * @param cctx Cache context.
+ * @param grp Cache group.
*/
- public GridDhtPreloader(GridCacheContext<?, ?> cctx) {
- super(cctx);
+ public GridDhtPreloader(CacheGroupInfrastructure grp) {
+ super(grp);
- top = cctx.dht().topology();
+ top = grp.topology();
startFut = new GridFutureAdapter<>();
}
@@ -163,26 +164,26 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
if (log.isDebugEnabled())
log.debug("Starting DHT rebalancer...");
- cctx.io().addHandler(cctx.cacheId(), GridDhtForceKeysRequest.class,
+ ctx.io().addHandler(grp.groupId(), GridDhtForceKeysRequest.class,
new MessageHandler<GridDhtForceKeysRequest>() {
@Override public void onMessage(ClusterNode node, GridDhtForceKeysRequest msg) {
processForceKeysRequest(node, msg);
}
});
- cctx.io().addHandler(cctx.cacheId(), GridDhtForceKeysResponse.class,
+ ctx.io().addHandler(grp.groupId(), GridDhtForceKeysResponse.class,
new MessageHandler<GridDhtForceKeysResponse>() {
@Override public void onMessage(ClusterNode node, GridDhtForceKeysResponse msg) {
processForceKeyResponse(node, msg);
}
});
- supplier = new GridDhtPartitionSupplier(cctx);
+ supplier = new GridDhtPartitionSupplier(grp);
demander = new GridDhtPartitionDemander(cctx);
demander.start();
- cctx.events().addListener(discoLsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED);
+ ctx.gridEvents().addLocalEventListener(discoLsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED);
}
/** {@inheritDoc} */
@@ -203,7 +204,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
stopping = true;
- cctx.events().removeListener(discoLsnr);
+ ctx.gridEvents().removeLocalEventListener(discoLsnr);
// Acquire write busy lock.
busyLock.writeLock().lock();
@@ -253,25 +254,27 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
/** {@inheritDoc} */
@Override public GridDhtPreloaderAssignments assign(GridDhtPartitionsExchangeFuture exchFut) {
// No assignments for disabled preloader.
- GridDhtPartitionTopology top = cctx.dht().topology();
+ GridDhtPartitionTopology top = grp.topology();
- if (!cctx.rebalanceEnabled() || !cctx.shared().kernalContext().state().active())
+ if (!grp.rebalanceEnabled() || !grp.shared().kernalContext().state().active())
return new GridDhtPreloaderAssignments(exchFut, top.topologyVersion());
- int partCnt = cctx.affinity().partitions();
+ int partCnt = grp.affinity().partitions();
assert exchFut.forcePreload() || exchFut.dummyReassign() ||
exchFut.exchangeId().topologyVersion().equals(top.topologyVersion()) :
"Topology version mismatch [exchId=" + exchFut.exchangeId() +
- ", cache=" + cctx.name() +
+ ", grp=" + grp.name() +
", topVer=" + top.topologyVersion() + ']';
GridDhtPreloaderAssignments assigns = new GridDhtPreloaderAssignments(exchFut, top.topologyVersion());
AffinityTopologyVersion topVer = assigns.topologyVersion();
+ AffinityAssignment aff = grp.affinity().cachedAffinity(topVer);
+
for (int p = 0; p < partCnt; p++) {
- if (cctx.shared().exchange().hasPendingExchange()) {
+ if (ctx.exchange().hasPendingExchange()) {
if (log.isDebugEnabled())
log.debug("Skipping assignments creation, exchange worker has pending assignments: " +
exchFut.exchangeId());
@@ -282,7 +285,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
}
// If partition belongs to local node.
- if (cctx.affinity().partitionLocalNode(p, topVer)) {
+ if (aff.get(p).contains(ctx.localNode())) {
GridDhtLocalPartition part = top.localPartition(p, topVer, true);
assert part != null;
@@ -300,13 +303,14 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
if (picked.isEmpty()) {
top.own(part);
- if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_DATA_LOST)) {
- DiscoveryEvent discoEvt = exchFut.discoveryEvent();
-
- cctx.events().addPreloadEvent(p,
- EVT_CACHE_REBALANCE_PART_DATA_LOST, discoEvt.eventNode(),
- discoEvt.type(), discoEvt.timestamp());
- }
+// TODO IGNITE-5075.
+// if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_DATA_LOST)) {
+// DiscoveryEvent discoEvt = exchFut.discoveryEvent();
+//
+// cctx.events().addPreloadEvent(p,
+// EVT_CACHE_REBALANCE_PART_DATA_LOST, discoEvt.eventNode(),
+// discoEvt.type(), discoEvt.timestamp());
+// }
if (log.isDebugEnabled())
log.debug("Owning partition as there are no other owners: " + part);
@@ -342,7 +346,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
* @return Picked owners.
*/
private Collection<ClusterNode> pickedOwners(int p, AffinityTopologyVersion topVer) {
- Collection<ClusterNode> affNodes = cctx.affinity().nodesByPartition(p, topVer);
+ Collection<ClusterNode> affNodes = grp.affinity().cachedAffinity(topVer).get(p);
int affCnt = affNodes.size();
@@ -368,7 +372,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
* @return Nodes owning this partition.
*/
private Collection<ClusterNode> remoteOwners(int p, AffinityTopologyVersion topVer) {
- return F.view(cctx.dht().topology().owners(p, topVer), F.remoteNodes(cctx.nodeId()));
+ return F.view(grp.topology().owners(p, topVer), F.remoteNodes(ctx.localNodeId()));
}
/** {@inheritDoc} */
@@ -423,12 +427,12 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
/** {@inheritDoc} */
@Override public IgniteInternalFuture<?> syncFuture() {
- return cctx.kernalContext().clientNode() ? startFut : demander.syncFuture();
+ return ctx.kernalContext().clientNode() ? startFut : demander.syncFuture();
}
/** {@inheritDoc} */
@Override public IgniteInternalFuture<Boolean> rebalanceFuture() {
- return cctx.kernalContext().clientNode() ? new GridFinishedFuture<>(true) : demander.rebalanceFuture();
+ return ctx.kernalContext().clientNode() ? new GridFinishedFuture<>(true) : demander.rebalanceFuture();
}
/**
@@ -459,7 +463,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
* @param msg Force keys message.
*/
private void processForceKeysRequest(final ClusterNode node, final GridDhtForceKeysRequest msg) {
- IgniteInternalFuture<?> fut = cctx.mvcc().finishKeys(msg.keys(), msg.cacheId(), msg.topologyVersion());
+ IgniteInternalFuture<?> fut = ctx.mvcc().finishKeys(msg.keys(), msg.cacheId(), msg.topologyVersion());
if (fut.isDone())
processForceKeysRequest0(node, msg);
@@ -480,6 +484,8 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
return;
try {
+ GridCacheContext cctx = ctx.cacheContext(msg.cacheId());
+
ClusterNode loc = cctx.localNode();
GridDhtForceKeysResponse res = new GridDhtForceKeysResponse(
@@ -591,11 +597,12 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
try {
top.onEvicted(part, updateSeq);
- if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_UNLOADED))
- cctx.events().addUnloadEvent(part.id());
+// TODO IGNITE-5075.
+// if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_UNLOADED))
+// cctx.events().addUnloadEvent(part.id());
if (updateSeq)
- cctx.shared().exchange().scheduleResendPartitions();
+ ctx.exchange().scheduleResendPartitions();
}
finally {
leaveBusy();
@@ -604,7 +611,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
/** {@inheritDoc} */
@Override public boolean needForceKeys() {
- if (cctx.rebalanceEnabled()) {
+ if (grp.rebalanceEnabled()) {
IgniteInternalFuture<Boolean> rebalanceFut = rebalanceFuture();
if (rebalanceFut.isDone() && Boolean.TRUE.equals(rebalanceFut.result()))
@@ -615,12 +622,13 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
}
/** {@inheritDoc} */
- @Override public IgniteInternalFuture<Object> request(GridNearAtomicAbstractUpdateRequest req,
+ @Override public IgniteInternalFuture<Object> request(GridCacheContext cctx,
+ GridNearAtomicAbstractUpdateRequest req,
AffinityTopologyVersion topVer) {
if (!needForceKeys())
return null;
- return request0(req.keys(), topVer);
+ return request0(cctx, req.keys(), topVer);
}
/**
@@ -628,11 +636,13 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
* @return Future for request.
*/
@SuppressWarnings({"unchecked", "RedundantCast"})
- @Override public GridDhtFuture<Object> request(Collection<KeyCacheObject> keys, AffinityTopologyVersion topVer) {
+ @Override public GridDhtFuture<Object> request(GridCacheContext cctx,
+ Collection<KeyCacheObject> keys,
+ AffinityTopologyVersion topVer) {
if (!needForceKeys())
return null;
- return request0(keys, topVer);
+ return request0(cctx, keys, topVer);
}
/**
@@ -641,7 +651,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
* @return Future for request.
*/
@SuppressWarnings({"unchecked", "RedundantCast"})
- private GridDhtFuture<Object> request0(Collection<KeyCacheObject> keys, AffinityTopologyVersion topVer) {
+ private GridDhtFuture<Object> request0(GridCacheContext cctx, Collection<KeyCacheObject> keys, AffinityTopologyVersion topVer) {
final GridDhtForceKeysFuture<?, ?> fut = new GridDhtForceKeysFuture<>(cctx, topVer, keys, this);
IgniteInternalFuture<?> topReadyFut = cctx.affinity().affinityReadyFuturex(topVer);
@@ -652,7 +662,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
if (topReadyFut == null)
startFut.listen(new CI1<IgniteInternalFuture<?>>() {
@Override public void apply(IgniteInternalFuture<?> syncFut) {
- cctx.kernalContext().closure().runLocalSafe(
+ ctx.kernalContext().closure().runLocalSafe(
new GridPlainRunnable() {
@Override public void run() {
fut.init();
@@ -689,7 +699,8 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
demandLock.writeLock().lock();
try {
- cctx.deploy().unwind(cctx);
+ // TODO IGNITE-5075.
+ // cctx.deploy().unwind(cctx);
}
finally {
demandLock.writeLock().unlock();
@@ -728,7 +739,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
partsToEvict.putIfAbsent(part.id(), part);
if (partsEvictOwning.get() == 0 && partsEvictOwning.compareAndSet(0, 1)) {
- cctx.closures().callLocalSafe(new GPC<Boolean>() {
+ ctx.kernalContext().closure().callLocalSafe(new GPC<Boolean>() {
@Override public Boolean call() {
boolean locked = true;
@@ -749,7 +760,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
partsToEvict.put(part.id(), part);
}
catch (Throwable ex) {
- if (cctx.kernalContext().isStopping()) {
+ if (ctx.kernalContext().isStopping()) {
LT.warn(log, ex, "Partition eviction failed (current node is stopping).",
false,
true);
@@ -785,7 +796,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
/** {@inheritDoc} */
@Override public void dumpDebugInfo() {
if (!forceKeyFuts.isEmpty()) {
- U.warn(log, "Pending force key futures [cache=" + cctx.name() + "]:");
+ U.warn(log, "Pending force key futures [grp=" + grp.name() + "]:");
for (GridDhtForceKeysFuture fut : forceKeyFuts.values())
U.warn(log, ">>> " + fut);
@@ -803,7 +814,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
/** {@inheritDoc} */
@Override public void apply(UUID nodeId, M msg) {
- ClusterNode node = cctx.node(nodeId);
+ ClusterNode node = ctx.node(nodeId);
if (node == null) {
if (log.isDebugEnabled())