You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/07/11 11:18:58 UTC
ignite git commit: ignite-5446 Only lateAffinity logic in
CacheAffinitySharedManager.
Repository: ignite
Updated Branches:
refs/heads/master bf25b5c52 -> e93b28488
ignite-5446 Only lateAffinity logic in CacheAffinitySharedManager.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e93b2848
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e93b2848
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e93b2848
Branch: refs/heads/master
Commit: e93b28488693381fcd232de93087ab8ec1d0f5bb
Parents: bf25b5c
Author: sboikov <sb...@gridgain.com>
Authored: Tue Jul 11 14:18:52 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Jul 11 14:18:52 2017 +0300
----------------------------------------------------------------------
.../cache/CacheAffinitySharedManager.java | 211 +++++++------------
1 file changed, 74 insertions(+), 137 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/e93b2848/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 548d795..79ab183 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -83,9 +83,6 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
private final long clientCacheMsgTimeout =
IgniteSystemProperties.getLong(IgniteSystemProperties.IGNITE_CLIENT_CACHE_CHANGE_MESSAGE_TIMEOUT, 10_000);
- /** Late affinity assignment flag. */
- private boolean lateAffAssign;
-
/** Affinity information for all started caches (initialized on coordinator). */
private ConcurrentMap<Integer, CacheGroupHolder> grpHolders = new ConcurrentHashMap<>();
@@ -132,13 +129,6 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
@Override protected void start0() throws IgniteCheckedException {
super.start0();
- if (cctx.database().persistenceEnabled() && !cctx.kernalContext().config().isLateAffinityAssignment())
- U.quietAndWarn(log,
- "Persistence is enabled, but late affinity assignment is disabled. " +
- "Since it is required for persistence mode, it will be implicitly enabled.");
-
- lateAffAssign = cctx.kernalContext().config().isLateAffinityAssignment() || cctx.database().persistenceEnabled();
-
cctx.kernalContext().event().addLocalEventListener(discoLsnr, EVT_NODE_LEFT, EVT_NODE_FAILED);
}
@@ -193,8 +183,6 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
* @return {@code True} if minor topology version should be increased.
*/
boolean onCustomEvent(CacheAffinityChangeMessage msg) {
- assert lateAffAssign : msg;
-
if (msg.exchangeId() != null) {
if (log.isDebugEnabled()) {
log.debug("Ignore affinity change message [lastAffVer=" + lastAffVer +
@@ -259,9 +247,6 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
* @param checkGrpId Group ID.
*/
void checkRebalanceState(GridDhtPartitionTopology top, Integer checkGrpId) {
- if (!lateAffAssign)
- return;
-
CacheAffinityChangeMessage msg = null;
synchronized (mux) {
@@ -349,13 +334,11 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
* @param grp Cache group.
*/
void onCacheGroupCreated(CacheGroupContext grp) {
- final Integer grpId = grp.groupId();
-
if (!grpHolders.containsKey(grp.groupId())) {
- cctx.io().addCacheGroupHandler(grpId, GridDhtAffinityAssignmentResponse.class,
+ cctx.io().addCacheGroupHandler(grp.groupId(), GridDhtAffinityAssignmentResponse.class,
new IgniteBiInClosure<UUID, GridDhtAffinityAssignmentResponse>() {
@Override public void apply(UUID nodeId, GridDhtAffinityAssignmentResponse res) {
- processAffinityAssignmentResponse(grpId, nodeId, res);
+ processAffinityAssignmentResponse(nodeId, res);
}
});
}
@@ -692,7 +675,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
caches.updateCachesInfo(exchActions);
// Affinity did not change for existing caches.
- forAllCacheGroups(crd && lateAffAssign, new IgniteInClosureX<GridAffinityAssignmentCache>() {
+ forAllCacheGroups(crd, new IgniteInClosureX<GridAffinityAssignmentCache>() {
@Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException {
if (exchActions.cacheGroupStopping(aff.groupId()))
return;
@@ -772,7 +755,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
Integer grpId = action.descriptor().groupId();
if (gprs.add(grpId)) {
- if (crd && lateAffAssign)
+ if (crd)
initStartedGroupOnCoordinator(fut, action.descriptor().groupDescriptor());
else {
CacheGroupContext grp = cctx.cache().cacheGroup(grpId);
@@ -813,7 +796,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
Set<Integer> stoppedGrps = null;
- if (crd && lateAffAssign) {
+ if (crd) {
for (ExchangeActions.CacheGroupActionData data : exchActions.cacheGroupsToStop()) {
if (data.descriptor().config().getCacheMode() != LOCAL) {
CacheGroupHolder cacheGrp = grpHolders.remove(data.descriptor().groupId());
@@ -1026,32 +1009,17 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
public void onClientEvent(final GridDhtPartitionsExchangeFuture fut, boolean crd) throws IgniteCheckedException {
boolean locJoin = fut.discoveryEvent().eventNode().isLocal();
- if (lateAffAssign) {
- if (!locJoin) {
- forAllCacheGroups(crd, new IgniteInClosureX<GridAffinityAssignmentCache>() {
- @Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException {
- AffinityTopologyVersion topVer = fut.topologyVersion();
-
- aff.clientEventTopologyChange(fut.discoveryEvent(), topVer);
- }
- });
- }
- else
- fetchAffinityOnJoin(fut);
- }
- else {
- if (!locJoin) {
- forAllCacheGroups(false, new IgniteInClosureX<GridAffinityAssignmentCache>() {
- @Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException {
- AffinityTopologyVersion topVer = fut.topologyVersion();
+ if (!locJoin) {
+ forAllCacheGroups(crd, new IgniteInClosureX<GridAffinityAssignmentCache>() {
+ @Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException {
+ AffinityTopologyVersion topVer = fut.topologyVersion();
- aff.clientEventTopologyChange(fut.discoveryEvent(), topVer);
- }
- });
- }
- else
- initAffinityNoLateAssignment(fut);
+ aff.clientEventTopologyChange(fut.discoveryEvent(), topVer);
+ }
+ });
}
+ else
+ fetchAffinityOnJoin(fut);
}
/**
@@ -1074,11 +1042,10 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
}
/**
- * @param grpId Cache group ID.
* @param nodeId Node ID.
* @param res Response.
*/
- private void processAffinityAssignmentResponse(Integer grpId, UUID nodeId, GridDhtAffinityAssignmentResponse res) {
+ private void processAffinityAssignmentResponse(UUID nodeId, GridDhtAffinityAssignmentResponse res) {
if (log.isDebugEnabled())
log.debug("Processing affinity assignment response [node=" + nodeId + ", res=" + res + ']');
@@ -1093,8 +1060,6 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
* @throws IgniteCheckedException If failed
*/
private void forAllRegisteredCacheGroups(IgniteInClosureX<CacheGroupDescriptor> c) throws IgniteCheckedException {
- assert lateAffAssign;
-
for (CacheGroupDescriptor cacheDesc : caches.allGroups()) {
if (cacheDesc.config().getCacheMode() == LOCAL)
continue;
@@ -1179,10 +1144,10 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
) throws IgniteCheckedException {
caches.initStartedCaches(descs);
- if (crd && lateAffAssign) {
+ if (crd) {
forAllRegisteredCacheGroups(new IgniteInClosureX<CacheGroupDescriptor>() {
@Override public void applyx(CacheGroupDescriptor desc) throws IgniteCheckedException {
- CacheGroupHolder cache = groupHolder(fut, desc);
+ CacheGroupHolder cache = groupHolder(fut.topologyVersion(), desc);
if (cache.affinity().lastVersion().equals(AffinityTopologyVersion.NONE)) {
List<List<ClusterNode>> assignment =
@@ -1247,7 +1212,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
assert desc != null : aff.cacheOrGroupName();
// Do not request affinity from remote nodes if affinity function is not centralized.
- if (!lateAffAssign && !aff.centralizedAffinityFunction())
+ if (!aff.centralizedAffinityFunction())
return true;
// If local node did not initiate exchange or local node is the only cache node in grid.
@@ -1272,31 +1237,31 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
WaitRebalanceInfo waitRebalanceInfo = null;
- if (lateAffAssign) {
- if (locJoin) {
- if (crd) {
- forAllRegisteredCacheGroups(new IgniteInClosureX<CacheGroupDescriptor>() {
- @Override public void applyx(CacheGroupDescriptor desc) throws IgniteCheckedException {
- AffinityTopologyVersion topVer = fut.topologyVersion();
+ if (locJoin) {
+ if (crd) {
+ forAllRegisteredCacheGroups(new IgniteInClosureX<CacheGroupDescriptor>() {
+ @Override public void applyx(CacheGroupDescriptor desc) throws IgniteCheckedException {
+ AffinityTopologyVersion topVer = fut.topologyVersion();
- CacheGroupHolder cache = groupHolder(fut, desc);
+ CacheGroupHolder cache = groupHolder(topVer, desc);
- List<List<ClusterNode>> newAff = cache.affinity().calculate(topVer,
- fut.discoveryEvent(),
- fut.discoCache());
+ List<List<ClusterNode>> newAff = cache.affinity().calculate(topVer,
+ fut.discoveryEvent(),
+ fut.discoCache());
- cache.affinity().initialize(topVer, newAff);
- }
- });
- }
- else
- fetchAffinityOnJoin(fut);
+ cache.affinity().initialize(topVer, newAff);
+ }
+ });
}
else
- waitRebalanceInfo = initAffinityOnNodeJoin(fut, crd);
+ fetchAffinityOnJoin(fut);
+ }
+ else {
+ waitRebalanceInfo = initAffinityOnNodeJoin(fut.topologyVersion(),
+ fut.discoveryEvent(),
+ fut.discoCache(),
+ crd);
}
- else
- initAffinityNoLateAssignment(fut);
synchronized (mux) {
affCalcVer = fut.topologyVersion();
@@ -1305,7 +1270,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
WaitRebalanceInfo info = this.waitInfo;
- if (crd && lateAffAssign) {
+ if (crd) {
if (log.isDebugEnabled()) {
log.debug("Computed new affinity after node join [topVer=" + fut.topologyVersion() +
", waitGrps=" + (info != null ? groupNames(info.waitGrps.keySet()) : null) + ']');
@@ -1412,7 +1377,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
if (idealAff != null)
affCache.idealAssignment(idealAff);
else {
- assert !affCache.centralizedAffinityFunction() || !lateAffAssign;
+ assert !affCache.centralizedAffinityFunction();
affCache.calculate(topVer, discoveryEvt, discoCache);
}
@@ -1439,22 +1404,11 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
assert !leftNode.isClient() : leftNode;
- boolean centralizedAff;
-
- if (lateAffAssign) {
- for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
- if (grp.isLocal())
- continue;
-
- grp.affinity().calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache());
- }
-
- centralizedAff = true;
- }
- else {
- initAffinityNoLateAssignment(fut);
+ for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
+ if (grp.isLocal())
+ continue;
- centralizedAff = false;
+ grp.affinity().calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache());
}
synchronized (mux) {
@@ -1463,22 +1417,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
this.waitInfo = null;
}
- return centralizedAff;
- }
-
- /**
- * @param fut Exchange future.
- * @throws IgniteCheckedException If failed.
- */
- private void initAffinityNoLateAssignment(GridDhtPartitionsExchangeFuture fut) throws IgniteCheckedException {
- assert !lateAffAssign;
-
- for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
- if (grp.isLocal())
- continue;
-
- initAffinity(caches.group(grp.groupId()), grp.affinity(), fut);
- }
+ return true;
}
/**
@@ -1488,8 +1427,6 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
*/
private IgniteInternalFuture<?> initCoordinatorCaches(final GridDhtPartitionsExchangeFuture fut)
throws IgniteCheckedException {
- assert lateAffAssign;
-
final List<IgniteInternalFuture<AffinityTopologyVersion>> futs = new ArrayList<>();
forAllRegisteredCacheGroups(new IgniteInClosureX<CacheGroupDescriptor>() {
@@ -1512,7 +1449,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
cctx.io().addCacheGroupHandler(desc.groupId(), GridDhtAffinityAssignmentResponse.class,
new IgniteBiInClosure<UUID, GridDhtAffinityAssignmentResponse>() {
@Override public void apply(UUID nodeId, GridDhtAffinityAssignmentResponse res) {
- processAffinityAssignmentResponse(grpId, nodeId, res);
+ processAffinityAssignmentResponse(nodeId, res);
}
}
);
@@ -1587,15 +1524,13 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
}
/**
- * @param fut Exchange future.
+ * @param topVer Topology version.
* @param desc Cache descriptor.
* @return Cache holder.
* @throws IgniteCheckedException If failed.
*/
- private CacheGroupHolder groupHolder(GridDhtPartitionsExchangeFuture fut, final CacheGroupDescriptor desc)
+ private CacheGroupHolder groupHolder(AffinityTopologyVersion topVer, final CacheGroupDescriptor desc)
throws IgniteCheckedException {
- assert lateAffAssign;
-
CacheGroupHolder cacheGrp = grpHolders.get(desc.groupId());
if (cacheGrp != null)
@@ -1607,12 +1542,12 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
cctx.io().addCacheGroupHandler(desc.groupId(), GridDhtAffinityAssignmentResponse.class,
new IgniteBiInClosure<UUID, GridDhtAffinityAssignmentResponse>() {
@Override public void apply(UUID nodeId, GridDhtAffinityAssignmentResponse res) {
- processAffinityAssignmentResponse(desc.groupId(), nodeId, res);
+ processAffinityAssignmentResponse(nodeId, res);
}
}
);
- cacheGrp = CacheGroupHolder2.create(cctx, desc, fut.topologyVersion(), null);
+ cacheGrp = CacheGroupHolder2.create(cctx, desc, topVer, null);
}
else
cacheGrp = new CacheGroupHolder1(grp, null);
@@ -1625,17 +1560,18 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
}
/**
- * @param fut Exchange future.
+ * @param topVer Topology version.
+ * @param evt Discovery event.
+ * @param discoCache Discovery data cache.
* @param crd Coordinator flag.
* @throws IgniteCheckedException If failed.
* @return Rabalance info.
*/
- @Nullable private WaitRebalanceInfo initAffinityOnNodeJoin(final GridDhtPartitionsExchangeFuture fut, boolean crd)
+ @Nullable private WaitRebalanceInfo initAffinityOnNodeJoin(final AffinityTopologyVersion topVer,
+ final DiscoveryEvent evt,
+ final DiscoCache discoCache,
+ boolean crd)
throws IgniteCheckedException {
- assert lateAffAssign;
-
- AffinityTopologyVersion topVer = fut.topologyVersion();
-
final Map<Object, List<List<ClusterNode>>> affCache = new HashMap<>();
if (!crd) {
@@ -1645,7 +1581,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
boolean latePrimary = grp.rebalanceEnabled();
- initAffinityOnNodeJoin(fut, grp.affinity(), null, latePrimary, affCache);
+ initAffinityOnNodeJoin(topVer, evt, discoCache, grp.affinity(), null, latePrimary, affCache);
}
return null;
@@ -1655,11 +1591,17 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
forAllRegisteredCacheGroups(new IgniteInClosureX<CacheGroupDescriptor>() {
@Override public void applyx(CacheGroupDescriptor desc) throws IgniteCheckedException {
- CacheGroupHolder cache = groupHolder(fut, desc);
+ CacheGroupHolder cache = groupHolder(topVer, desc);
boolean latePrimary = cache.rebalanceEnabled;
- initAffinityOnNodeJoin(fut, cache.affinity(), waitRebalanceInfo, latePrimary, affCache);
+ initAffinityOnNodeJoin(topVer,
+ evt,
+ discoCache,
+ cache.affinity(),
+ waitRebalanceInfo,
+ latePrimary,
+ affCache);
}
});
@@ -1668,24 +1610,24 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
}
/**
- * @param fut Exchange future.
+ * @param topVer Topology version.
+ * @param evt Discovery event.
+ * @param discoCache Discovery data cache.
* @param aff Affinity.
* @param rebalanceInfo Rebalance information.
* @param latePrimary If {@code true} delays primary assignment if it is not owner.
* @param affCache Already calculated assignments (to reduce data stored in history).
* @throws IgniteCheckedException If failed.
*/
- private void initAffinityOnNodeJoin(GridDhtPartitionsExchangeFuture fut,
+ private void initAffinityOnNodeJoin(AffinityTopologyVersion topVer,
+ DiscoveryEvent evt,
+ DiscoCache discoCache,
GridAffinityAssignmentCache aff,
WaitRebalanceInfo rebalanceInfo,
boolean latePrimary,
Map<Object, List<List<ClusterNode>>> affCache)
throws IgniteCheckedException
{
- assert lateAffAssign;
-
- AffinityTopologyVersion topVer = fut.topologyVersion();
-
AffinityTopologyVersion affTopVer = aff.lastVersion();
assert affTopVer.topologyVersion() > 0 : "Affinity is not initialized [grp=" + aff.cacheOrGroupName() +
@@ -1695,7 +1637,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
assert aff.idealAssignment() != null : "Previous assignment is not available.";
- List<List<ClusterNode>> idealAssignment = aff.calculate(topVer, fut.discoveryEvent(), fut.discoCache());
+ List<List<ClusterNode>> idealAssignment = aff.calculate(topVer, evt, discoCache);
List<List<ClusterNode>> newAssignment = null;
if (latePrimary) {
@@ -1726,7 +1668,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
if (newAssignment == null)
newAssignment = idealAssignment;
- aff.initialize(fut.topologyVersion(), cachedAssignment(aff, newAssignment, affCache));
+ aff.initialize(topVer, cachedAssignment(aff, newAssignment, affCache));
}
/**
@@ -1762,7 +1704,6 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
ClusterNode curPrimary,
List<ClusterNode> newNodes,
WaitRebalanceInfo rebalance) {
- assert lateAffAssign;
assert curPrimary != null;
assert !F.isEmpty(newNodes);
assert !curPrimary.equals(newNodes.get(0));
@@ -1791,8 +1732,6 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
*/
public IgniteInternalFuture<Map<Integer, Map<Integer, List<UUID>>>> initAffinityOnNodeLeft(
final GridDhtPartitionsExchangeFuture fut) throws IgniteCheckedException {
- assert lateAffAssign;
-
IgniteInternalFuture<?> initFut = initCoordinatorCaches(fut);
if (initFut != null && !initFut.isDone()) {
@@ -1822,8 +1761,6 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
*/
private Map<Integer, Map<Integer, List<UUID>>> initAffinityOnNodeLeft0(final GridDhtPartitionsExchangeFuture fut)
throws IgniteCheckedException {
- assert lateAffAssign;
-
final AffinityTopologyVersion topVer = fut.topologyVersion();
final WaitRebalanceInfo waitRebalanceInfo = new WaitRebalanceInfo(topVer);
@@ -1834,7 +1771,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
forAllRegisteredCacheGroups(new IgniteInClosureX<CacheGroupDescriptor>() {
@Override public void applyx(CacheGroupDescriptor desc) throws IgniteCheckedException {
- CacheGroupHolder grpHolder = groupHolder(fut, desc);
+ CacheGroupHolder grpHolder = groupHolder(fut.topologyVersion(), desc);
if (!grpHolder.rebalanceEnabled)
return;