You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/07/19 10:13:30 UTC
[1/6] ignite git commit: ignite-5578
Repository: ignite
Updated Branches:
refs/heads/ignite-5578 a7cb82962 -> 3db3266c2
ignite-5578
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/dd44df9a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/dd44df9a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/dd44df9a
Branch: refs/heads/ignite-5578
Commit: dd44df9a76c8fbb5511a1bb415691b2e1973391d
Parents: f006bfb
Author: sboikov <sb...@gridgain.com>
Authored: Wed Jul 19 11:39:08 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Jul 19 11:39:08 2017 +0300
----------------------------------------------------------------------
.../processors/cache/CacheAffinitySharedManager.java | 8 +++++---
1 file changed, 5 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/dd44df9a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 8a293ae..a59f5d7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -518,11 +518,13 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
}
for (DynamicCacheDescriptor desc : startDescs) {
- CacheGroupContext grp = cctx.cache().cacheGroup(desc.groupId());
+ if (desc.cacheConfiguration().getCacheMode() != LOCAL) {
+ CacheGroupContext grp = cctx.cache().cacheGroup(desc.groupId());
- assert grp != null;
+ assert grp != null;
- grp.topology().onExchangeDone(grp.affinity().cachedAffinity(topVer), true);
+ grp.topology().onExchangeDone(grp.affinity().cachedAffinity(topVer), true);
+ }
}
cctx.cache().initCacheProxies(topVer, null);
[4/6] ignite git commit: Merge remote-tracking branch
'remotes/origin/master' into ignite-5578-locJoin
Posted by sb...@apache.org.
Merge remote-tracking branch 'remotes/origin/master' into ignite-5578-locJoin
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ba46cbd5
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ba46cbd5
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ba46cbd5
Branch: refs/heads/ignite-5578
Commit: ba46cbd59f7bade48e91c3c4d5514c3bfef9a153
Parents: 8fb60ff cde3da4
Author: sboikov <sb...@gridgain.com>
Authored: Wed Jul 19 12:59:43 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Jul 19 12:59:43 2017 +0300
----------------------------------------------------------------------
.../apache/ignite/IgniteSystemProperties.java | 20 +++
.../communication/GridIoMessageFactory.java | 2 +-
.../MetaPageUpdatePartitionDataRecord.java | 2 +
.../cache/CacheObjectsReleaseFuture.java | 60 +++++++++
.../cache/GridCacheExplicitLockSpan.java | 10 +-
.../processors/cache/GridCacheMvccManager.java | 24 +++-
.../cache/GridCacheSharedContext.java | 4 +-
...idNearAtomicAbstractSingleUpdateRequest.java | 9 +-
...GridNearAtomicSingleUpdateInvokeRequest.java | 6 +
.../atomic/GridNearAtomicUpdateResponse.java | 2 +-
.../GridDhtPartitionsExchangeFuture.java | 30 +++--
.../persistence/GridCacheOffheapManager.java | 2 +-
.../cache/transactions/IgniteTxAdapter.java | 44 ++++++-
.../cache/transactions/IgniteTxManager.java | 5 +-
.../service/GridServiceProcessor.java | 4 +-
.../processors/service/ServiceContextImpl.java | 2 +
.../util/future/GridCompoundFuture.java | 6 +-
.../util/tostring/GridToStringBuilder.java | 2 +-
.../spi/IgniteSpiOperationTimeoutHelper.java | 8 +-
.../communication/tcp/TcpCommunicationSpi.java | 131 ++++++++++++-------
.../ignite/spi/discovery/tcp/ServerImpl.java | 72 +++++++---
.../spi/discovery/tcp/TcpDiscoverySpi.java | 16 ++-
.../ignite/internal/GridVersionSelfTest.java | 2 +
.../IgniteClientReconnectAbstractTest.java | 5 +
...UpdateNotifierPerClusterSettingSelfTest.java | 2 +
.../cache/GridCachePutAllFailoverSelfTest.java | 15 +++
.../cache/IgniteCachePutAllRestartTest.java | 15 +++
.../CacheGetInsideLockChangingTopologyTest.java | 5 +
.../GridCacheAbstractNodeRestartSelfTest.java | 12 +-
...titionEvictionDuringReadThroughSelfTest.java | 1 +
...niteBinaryMetadataUpdateNodeRestartTest.java | 10 ++
.../distributed/IgniteCacheGetRestartTest.java | 5 +
.../IgniteCacheNearRestartRollbackSelfTest.java | 15 +++
...teSynchronizationModesMultithreadedTest.java | 5 +
.../IgniteCachePutRetryAbstractSelfTest.java | 5 +
.../GridCacheReplicatedNodeRestartSelfTest.java | 2 +-
.../processors/database/BPlusTreeSelfTest.java | 4 +-
.../database/FreeListImplSelfTest.java | 4 +-
.../org/apache/ignite/spi/GridTcpForwarder.java | 26 ++++
.../tcp/TcpCommunicationSpiDropNodesTest.java | 15 +++
.../TcpCommunicationSpiFaultyClientTest.java | 20 ++-
.../IgniteCacheRestartTestSuite2.java | 2 +
.../cache/IgnitePutTxLoadBenchmark.java | 3 +-
43 files changed, 511 insertions(+), 123 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/ba46cbd5/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 003c2f0,29c89a5..97e06bf
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@@ -870,16 -869,10 +870,16 @@@ public class GridIoMessageFactory imple
break;
- // [-3..119] [124..127] [-23..-27] [-36..-55]- this
+ case 128:
+ msg = new CacheGroupAffinityMessage();
+
+ break;
+
+
+ // [-3..119] [124..128] [-23..-27] [-36..-55]- this
// [120..123] - DR
// [-4..-22, -30..-35] - SQL
- // [-54..-60] - Snapshots
+ // [2048..2053] - Snapshots
default:
if (ext != null) {
for (MessageFactory factory : ext) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/ba46cbd5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 2c9119f,c4a4f83..4f572df
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@@ -898,16 -896,18 +903,17 @@@ public class GridDhtPartitionsExchangeF
try {
long start = U.currentTimeMillis();
- IgniteInternalFuture fut = cctx.snapshot()
- .tryStartLocalSnapshotOperation(discoEvt);
+ IgniteInternalFuture fut = cctx.snapshot().tryStartLocalSnapshotOperation(discoEvt);
- if (fut != null)
+ if (fut != null) {
fut.get();
- long end = U.currentTimeMillis();
+ long end = U.currentTimeMillis();
- if (log.isInfoEnabled())
- log.info("Snapshot initialization completed [topVer=" + exchangeId().topologyVersion() +
- ", time=" + (end - start) + "ms]");
+ if (log.isInfoEnabled())
+ log.info("Snapshot initialization completed [topVer=" + exchangeId().topologyVersion() +
+ ", time=" + (end - start) + "ms]");
+ }
}
catch (IgniteCheckedException e) {
U.error(log, "Error while starting snapshot operation", e);
[6/6] ignite git commit: Merge branch 'ignite-5578-locJoin' into
ignite-5578
Posted by sb...@apache.org.
Merge branch 'ignite-5578-locJoin' into ignite-5578
# Conflicts:
# modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
# modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
# modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java
# modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
# modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3db3266c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3db3266c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3db3266c
Branch: refs/heads/ignite-5578
Commit: 3db3266c2290948da2cfda302a844cf64727b356
Parents: 5978213
Author: sboikov <sb...@gridgain.com>
Authored: Wed Jul 19 13:13:21 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Jul 19 13:13:21 2017 +0300
----------------------------------------------------------------------
.../cache/CacheAffinitySharedManager.java | 46 +-------------------
.../preloader/CacheGroupAffinityMessage.java | 2 +-
2 files changed, 2 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/3db3266c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 1fc59bb..7f55e79 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -1289,7 +1289,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
});
}
- public void onJoin(final GridDhtPartitionsExchangeFuture fut, GridDhtPartitionsFullMessage msg)
+ public void onLocalJoin(final GridDhtPartitionsExchangeFuture fut, GridDhtPartitionsFullMessage msg)
throws IgniteCheckedException {
final Set<Integer> affReq = fut.context().groupsAffinityRequestOnJoin();
@@ -1370,50 +1370,6 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
}
/**
- * @param fut Exchange future.
- * @param msg Message.
- */
- public void onLocalJoin(final GridDhtPartitionsExchangeFuture fut, GridDhtPartitionsFullMessage msg) {
- final Set<Integer> affReq = fut.context().groupsAffinityRequestOnJoin();
-
- if (F.isEmpty(affReq))
- return;
-
- final Map<Long, ClusterNode> nodesByOrder = new HashMap<>();
-
- final Map<Integer, CacheGroupAffinityMessage> joinedNodeAff = msg.joinedNodeAffinity();
-
- assert !F.isEmpty(joinedNodeAff) : msg;
- assert joinedNodeAff.size() >= affReq.size();
-
- forAllCacheGroups(false, new IgniteInClosureX<GridAffinityAssignmentCache>() {
- @Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException {
- if (affReq.contains(aff.groupId())) {
- assert AffinityTopologyVersion.NONE.equals(aff.lastVersion());
-
- CacheGroupAffinityMessage affMsg = joinedNodeAff.get(aff.groupId());
-
- assert affMsg != null;
-
- List<List<ClusterNode>> assignments = affMsg.createAssignments(nodesByOrder, fut.discoCache());
-
- // Calculate ideal assignments.
- if (!aff.centralizedAffinityFunction())
- aff.calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache());
-
- aff.initialize(fut.topologyVersion(), assignments);
-
- CacheGroupContext grp = cctx.cache().cacheGroup(aff.groupId());
-
- assert grp != null;
-
- grp.topology().initPartitions(fut);
- }
- }
- });
- }
-
- /**
* Called on exchange initiated by server node join.
*
* @param fut Exchange future.
http://git-wip-us.apache.org/repos/asf/ignite/blob/3db3266c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java
index b310308..fcfec1d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java
@@ -250,7 +250,7 @@ public class CacheGroupAffinityMessage implements Message {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 1;
+ return 2;
}
/** {@inheritDoc} */
[2/6] ignite git commit: ignite-5578 join
Posted by sb...@apache.org.
ignite-5578 join
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/056847c0
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/056847c0
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/056847c0
Branch: refs/heads/ignite-5578
Commit: 056847c091d091d678f6c96432d00e196115c3e7
Parents: dd44df9
Author: sboikov <sb...@gridgain.com>
Authored: Wed Jul 19 12:19:03 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Jul 19 12:19:03 2017 +0300
----------------------------------------------------------------------
.../GridCachePartitionExchangeManager.java | 17 -----
.../preloader/CacheGroupAffinityMessage.java | 75 ++++++++------------
.../GridDhtPartitionsExchangeFuture.java | 4 +-
.../preloader/GridDhtPartitionsFullMessage.java | 37 +++-------
4 files changed, 43 insertions(+), 90 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/056847c0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index cfc3671..80121e2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -2256,23 +2256,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
return super.values();
}
- /**
- * @param exchangeId Exchange ID.
- * @return Future.
- */
- public synchronized GridDhtPartitionsExchangeFuture find(GridDhtPartitionExchangeId exchangeId) {
- ListIterator<GridDhtPartitionsExchangeFuture> it = listIterator(size() - 1);
-
- while (it.hasPrevious()) {
- GridDhtPartitionsExchangeFuture fut0 = it.previous();
-
- if (fut0.exchangeId().equals(exchangeId))
- return fut0;
- }
-
- return null;
- }
-
/** {@inheritDoc} */
@Override public synchronized String toString() {
return S.toString(ExchangeFutureSet.class, this, super.toString());
http://git-wip-us.apache.org/repos/asf/ignite/blob/056847c0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java
index 5cd5d26..ee4ef45 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java
@@ -45,9 +45,6 @@ public class CacheGroupAffinityMessage implements Message {
private static final long serialVersionUID = 0L;
/** */
- private int grpId;
-
- /** */
@GridDirectCollection(GridLongList.class)
private List<GridLongList> assigns;
@@ -59,12 +56,9 @@ public class CacheGroupAffinityMessage implements Message {
}
/**
- * @param grpId Group ID.
* @param assign0 Assignment.
*/
- private CacheGroupAffinityMessage(int grpId, List<List<ClusterNode>> assign0) {
- this.grpId = grpId;
-
+ private CacheGroupAffinityMessage(List<List<ClusterNode>> assign0) {
assigns = new ArrayList<>(assign0.size());
for (int i = 0; i < assign0.size(); i++) {
@@ -80,13 +74,6 @@ public class CacheGroupAffinityMessage implements Message {
}
/**
- * @return Cache group ID.
- */
- int groupId() {
- return grpId;
- }
-
- /**
* @param cctx Context.
* @param topVer Topology version.
* @param affReq Cache group IDs.
@@ -115,34 +102,46 @@ public class CacheGroupAffinityMessage implements Message {
}
/**
+ * @param assign Nodes orders.
* @param nodesByOrder Nodes by order cache.
* @param discoCache Discovery data cache.
- * @return Assignments.
+ * @return Nodes list.
*/
- List<List<ClusterNode>> createAssignments(Map<Long, ClusterNode> nodesByOrder, DiscoCache discoCache) {
- List<List<ClusterNode>> assignments0 = new ArrayList<>(assigns.size());
+ public static List<ClusterNode> toNodes(GridLongList assign, Map<Long, ClusterNode> nodesByOrder, DiscoCache discoCache) {
+ List<ClusterNode> assign0 = new ArrayList<>(assign.size());
- for (int p = 0; p < assigns.size(); p++) {
- GridLongList assign = assigns.get(p);
- List<ClusterNode> assign0 = new ArrayList<>(assign.size());
+ for (int n = 0; n < assign.size(); n++) {
+ long order = assign.get(n);
- for (int n = 0; n < assign.size(); n++) {
- long order = assign.get(n);
+ ClusterNode affNode = nodesByOrder.get(order);
- ClusterNode affNode = nodesByOrder.get(order);
+ if (affNode == null) {
+ affNode = discoCache.serverNodeByOrder(order);
- if (affNode == null) {
- affNode = discoCache.serverNodeByOrder(order);
+ assert affNode != null : "Failed to find node by order [order=" + order +
+ ", topVer=" + discoCache.version() + ']';
- assert affNode != null : order;
+ nodesByOrder.put(order, affNode);
+ }
- nodesByOrder.put(order, affNode);
- }
+ assign0.add(affNode);
+ }
- assign0.add(affNode);
- }
+ return assign0;
+ }
- assignments0.add(assign0);
+ /**
+ * @param nodesByOrder Nodes by order cache.
+ * @param discoCache Discovery data cache.
+ * @return Assignments.
+ */
+ public List<List<ClusterNode>> createAssignments(Map<Long, ClusterNode> nodesByOrder, DiscoCache discoCache) {
+ List<List<ClusterNode>> assignments0 = new ArrayList<>(assigns.size());
+
+ for (int p = 0; p < assigns.size(); p++) {
+ GridLongList assign = assigns.get(p);
+
+ assignments0.add(toNodes(assign, nodesByOrder, discoCache));
}
return assignments0;
@@ -167,12 +166,6 @@ public class CacheGroupAffinityMessage implements Message {
writer.incrementState();
- case 1:
- if (!writer.writeInt("grpId", grpId))
- return false;
-
- writer.incrementState();
-
}
return true;
@@ -194,14 +187,6 @@ public class CacheGroupAffinityMessage implements Message {
reader.incrementState();
- case 1:
- grpId = reader.readInt("grpId");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
}
return reader.afterMessageRead(CacheGroupAffinityMessage.class);
http://git-wip-us.apache.org/repos/asf/ignite/blob/056847c0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index fa30fa2..6da7876 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -227,6 +227,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
private ExchangeContext exchCtx;
/** */
+ @GridToStringExclude
private FinishState finishState;
/**
@@ -873,7 +874,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
if (grp.isLocal() || cacheGroupStopping(grp.groupId()))
continue;
- if (!localJoinExchange() || grp.affinity().lastVersion().topologyVersion() > 0)
+ // It is possible affinity is not initialized yet if node joins to cluster.
+ if (grp.affinity().lastVersion().topologyVersion() > 0)
grp.topology().beforeExchange(this, !centralizedAff);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/056847c0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
index edc9c9e..1ea8757 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
@@ -103,8 +103,8 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
private transient boolean compress;
/** */
- @GridDirectCollection(CacheGroupAffinityMessage.class)
- private Collection<CacheGroupAffinityMessage> cachesAff;
+ @GridDirectMap(keyType = Integer.class, valueType = CacheGroupAffinityMessage.class)
+ private Map<Integer, CacheGroupAffinityMessage> joinedNodeAff;
/**
* Required by {@link Externalizable}.
@@ -148,37 +148,32 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
cp.partsToReload = partsToReload;
cp.partsToReloadBytes = partsToReloadBytes;
cp.topVer = topVer;
- cp.cachesAff = cachesAff;
+ cp.joinedNodeAff = joinedNodeAff;
}
/**
- * @param cachesAff Affinity.
* @return Message copy.
*/
- GridDhtPartitionsFullMessage copyWithAffinity(Collection<CacheGroupAffinityMessage> cachesAff) {
- assert !F.isEmpty(cachesAff) : cachesAff;
-
+ GridDhtPartitionsFullMessage copy() {
GridDhtPartitionsFullMessage cp = new GridDhtPartitionsFullMessage();
copyStateTo(cp);
- cp.cachesAff = cachesAff;
-
return cp;
}
/**
- * @return Affinity.
+ * @return Caches affinity for joining nodes.
*/
- @Nullable Collection<CacheGroupAffinityMessage> cachesAffinity() {
- return cachesAff;
+ @Nullable public Map<Integer, CacheGroupAffinityMessage> joinedNodeAffinity() {
+ return joinedNodeAff;
}
/**
- * @param cachesAff Affinity.
+ * @param joinedNodeAff Caches affinity for joining nodes.
*/
- void cachesAffinity(Collection<CacheGroupAffinityMessage> cachesAff) {
- this.cachesAff = cachesAff;
+ void joinedNodeAffinity(Map<Integer, CacheGroupAffinityMessage> joinedNodeAff) {
+ this.joinedNodeAff = joinedNodeAff;
}
/** {@inheritDoc} */
@@ -461,11 +456,6 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
}
switch (writer.state()) {
- case 5:
- if (!writer.writeCollection("cachesAff", cachesAff, MessageCollectionItemType.MSG))
- return false;
-
- writer.incrementState();
case 6:
if (!writer.writeMap("dupPartsData", dupPartsData, MessageCollectionItemType.INT, MessageCollectionItemType.INT))
@@ -525,13 +515,6 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
return false;
switch (reader.state()) {
- case 5:
- cachesAff = reader.readCollection("cachesAff", MessageCollectionItemType.MSG);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
case 6:
dupPartsData = reader.readMap("dupPartsData", MessageCollectionItemType.INT, MessageCollectionItemType.INT, false);
[3/6] ignite git commit: ignite-5578
Posted by sb...@apache.org.
ignite-5578
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8fb60ffc
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8fb60ffc
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8fb60ffc
Branch: refs/heads/ignite-5578
Commit: 8fb60ffcff592b7128ae88151f0fa07c016e0126
Parents: 056847c
Author: sboikov <sb...@gridgain.com>
Authored: Wed Jul 19 12:50:36 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Jul 19 12:50:36 2017 +0300
----------------------------------------------------------------------
.../internal/managers/discovery/DiscoCache.java | 14 ++++
.../discovery/GridDiscoveryManager.java | 31 +++++++--
.../cache/CacheAffinitySharedManager.java | 46 ++++++++++++
.../preloader/CacheGroupAffinityMessage.java | 4 +-
.../GridDhtPartitionsExchangeFuture.java | 73 +++++---------------
.../preloader/GridDhtPartitionsFullMessage.java | 20 ++++--
6 files changed, 120 insertions(+), 68 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/8fb60ffc/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
index f63c5f6..1d8cfdf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
@@ -25,6 +25,7 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -85,7 +86,11 @@ public class DiscoCache {
/** */
private final IgniteProductVersion minNodeVer;
+ /** */
+ private final AffinityTopologyVersion topVer;
+
/**
+ * @param topVer Topology version.
* @param state Current cluster state.
* @param loc Local node.
* @param rmtNodes Remote nodes.
@@ -101,6 +106,7 @@ public class DiscoCache {
* @param alives Alive nodes.
*/
DiscoCache(
+ AffinityTopologyVersion topVer,
DiscoveryDataClusterState state,
ClusterNode loc,
List<ClusterNode> rmtNodes,
@@ -114,6 +120,7 @@ public class DiscoCache {
Map<Integer, List<ClusterNode>> cacheGrpAffNodes,
Map<UUID, ClusterNode> nodeMap,
Set<UUID> alives) {
+ this.topVer = topVer;
this.state = state;
this.loc = loc;
this.rmtNodes = rmtNodes;
@@ -143,6 +150,13 @@ public class DiscoCache {
}
/**
+ * @return Topology version.
+ */
+ public AffinityTopologyVersion version() {
+ return topVer;
+ }
+
+ /**
* @return Minimum node version.
*/
public IgniteProductVersion minimumNodeVersion() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/8fb60ffc/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 347f6fe..1e34f0c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -620,7 +620,10 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
ChangeGlobalStateFinishMessage stateFinishMsg = null;
if (locJoinEvt) {
- discoCache = createDiscoCache(ctx.state().clusterState(), locNode, topSnapshot);
+ discoCache = createDiscoCache(new AffinityTopologyVersion(topVer, minorTopVer),
+ ctx.state().clusterState(),
+ locNode,
+ topSnapshot);
transitionWaitFut = ctx.state().onLocalJoin(discoCache);
}
@@ -643,7 +646,10 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
else if (customMsg instanceof ChangeGlobalStateFinishMessage) {
ctx.state().onStateFinishMessage((ChangeGlobalStateFinishMessage)customMsg);
- discoCache = createDiscoCache(ctx.state().clusterState(), locNode, topSnapshot);
+ discoCache = createDiscoCache(topSnap.get().topVer,
+ ctx.state().clusterState(),
+ locNode,
+ topSnapshot);
topSnap.set(new Snapshot(topSnap.get().topVer, discoCache));
@@ -691,7 +697,11 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
// event notifications, since SPI notifies manager about all events from this listener.
if (verChanged) {
if (discoCache == null)
- discoCache = createDiscoCache(ctx.state().clusterState(), locNode, topSnapshot);
+ discoCache = createDiscoCache(
+ nextTopVer,
+ ctx.state().clusterState(),
+ locNode,
+ topSnapshot);
discoCacheHist.put(nextTopVer, discoCache);
@@ -761,8 +771,11 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
topHist.clear();
- topSnap.set(new Snapshot(AffinityTopologyVersion.ZERO,
- createDiscoCache(ctx.state().clusterState(), locNode, Collections.<ClusterNode>emptySet())));
+ topSnap.set(new Snapshot(AffinityTopologyVersion.ZERO, createDiscoCache(
+ AffinityTopologyVersion.ZERO,
+ ctx.state().clusterState(),
+ locNode,
+ Collections.<ClusterNode>emptySet())));
}
else if (type == EVT_CLIENT_NODE_RECONNECTED) {
assert locNode.isClient() : locNode;
@@ -2170,12 +2183,15 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
/**
* Called from discovery thread.
*
+ * @param topVer Topology version.
* @param state Current state.
* @param loc Local node.
* @param topSnapshot Topology snapshot.
* @return Newly created discovery cache.
*/
- @NotNull private DiscoCache createDiscoCache(DiscoveryDataClusterState state,
+ @NotNull private DiscoCache createDiscoCache(
+ AffinityTopologyVersion topVer,
+ DiscoveryDataClusterState state,
ClusterNode loc,
Collection<ClusterNode> topSnapshot) {
HashSet<UUID> alives = U.newHashSet(topSnapshot.size());
@@ -2252,6 +2268,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
}
return new DiscoCache(
+ topVer,
state,
loc,
Collections.unmodifiableList(rmtNodes),
@@ -2394,7 +2411,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
discoWrk.addEvent(EVT_NODE_SEGMENTED,
AffinityTopologyVersion.NONE,
node,
- createDiscoCache(null, node, empty),
+ createDiscoCache(AffinityTopologyVersion.NONE, null, node, empty),
empty,
null);
http://git-wip-us.apache.org/repos/asf/ignite/blob/8fb60ffc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index a59f5d7..bb27613 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -51,8 +51,10 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffini
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAssignmentFetchFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CacheGroupAffinityMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
@@ -1246,6 +1248,50 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
}
/**
+ * @param fut Exchange future.
+ * @param msg Message.
+ */
+ public void onLocalJoin(final GridDhtPartitionsExchangeFuture fut, GridDhtPartitionsFullMessage msg) {
+ final Set<Integer> affReq = fut.context().groupsAffinityRequestOnJoin();
+
+ if (F.isEmpty(affReq))
+ return;
+
+ final Map<Long, ClusterNode> nodesByOrder = new HashMap<>();
+
+ final Map<Integer, CacheGroupAffinityMessage> joinedNodeAff = msg.joinedNodeAffinity();
+
+ assert !F.isEmpty(joinedNodeAff) : msg;
+ assert joinedNodeAff.size() >= affReq.size();
+
+ forAllCacheGroups(false, new IgniteInClosureX<GridAffinityAssignmentCache>() {
+ @Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException {
+ if (affReq.contains(aff.groupId())) {
+ assert AffinityTopologyVersion.NONE.equals(aff.lastVersion());
+
+ CacheGroupAffinityMessage affMsg = joinedNodeAff.get(aff.groupId());
+
+ assert affMsg != null;
+
+ List<List<ClusterNode>> assignments = affMsg.createAssignments(nodesByOrder, fut.discoCache());
+
+ // Calculate ideal assignments.
+ if (!aff.centralizedAffinityFunction())
+ aff.calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache());
+
+ aff.initialize(fut.topologyVersion(), assignments);
+
+ CacheGroupContext grp = cctx.cache().cacheGroup(aff.groupId());
+
+ assert grp != null;
+
+ grp.topology().initPartitions(fut);
+ }
+ }
+ });
+ }
+
+ /**
* Called on exchange initiated by server node join.
*
* @param fut Exchange future.
http://git-wip-us.apache.org/repos/asf/ignite/blob/8fb60ffc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java
index ee4ef45..726054d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java
@@ -94,7 +94,7 @@ public class CacheGroupAffinityMessage implements Message {
if (!cachesAff.containsKey(grpId)) {
List<List<ClusterNode>> assign = cctx.affinity().affinity(grpId).assignments(topVer);
- cachesAff.put(grpId, new CacheGroupAffinityMessage(grpId, assign));
+ cachesAff.put(grpId, new CacheGroupAffinityMessage(assign));
}
}
@@ -199,7 +199,7 @@ public class CacheGroupAffinityMessage implements Message {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 2;
+ return 1;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/8fb60ffc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 6da7876..2c9119f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -1189,10 +1189,10 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
/**
* @param nodes Nodes.
- * @param cachesAff Affinity if was requested by some nodes.
+ * @param joinedNodeAff Affinity if was requested by some nodes.
* @throws IgniteCheckedException If failed.
*/
- private void sendAllPartitions(Collection<ClusterNode> nodes, Collection<CacheGroupAffinityMessage> cachesAff)
+ private void sendAllPartitions(Collection<ClusterNode> nodes, Map<Integer, CacheGroupAffinityMessage> joinedNodeAff)
throws IgniteCheckedException {
boolean singleNode = nodes.size() == 1;
@@ -1212,15 +1212,18 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
for (ClusterNode node : nodes) {
GridDhtPartitionsFullMessage sndMsg = msg;
- if (cachesAff != null) {
+ if (joinedNodeAff != null) {
if (singleNode)
- msg.cachesAffinity(cachesAff);
+ msg.joinedNodeAffinity(joinedNodeAff);
else {
GridDhtPartitionsSingleMessage singleMsg = msgs.get(node.id());
if (singleMsg != null && singleMsg.cacheGroupsAffinityRequest() != null) {
- if (msgWithAff == null)
- msgWithAff = msg.copyWithAffinity(cachesAff);
+ if (msgWithAff == null) {
+ msgWithAff = msg.copy();
+
+ msgWithAff.joinedNodeAffinity(joinedNodeAff);
+ }
sndMsg = msgWithAff;
}
@@ -1747,7 +1750,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
}
}
- Map<Integer, CacheGroupAffinityMessage> cachesAff = null;
+ Map<Integer, CacheGroupAffinityMessage> joinedNodeAff = null;
for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e : msgs.entrySet()) {
GridDhtPartitionsSingleMessage msg = e.getValue();
@@ -1770,10 +1773,10 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
Collection<Integer> affReq = msg.cacheGroupsAffinityRequest();
if (affReq != null) {
- cachesAff = CacheGroupAffinityMessage.createAffinityMessages(cctx,
+ joinedNodeAff = CacheGroupAffinityMessage.createAffinityMessages(cctx,
topologyVersion(),
affReq,
- cachesAff);
+ joinedNodeAff);
UUID nodeId = e.getKey();
@@ -1880,7 +1883,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
}
if (!nodes.isEmpty())
- sendAllPartitions(nodes, cachesAff != null ? cachesAff.values() : null);
+ sendAllPartitions(nodes, joinedNodeAff);
onDone(exchangeId().topologyVersion(), err);
}
@@ -1919,19 +1922,17 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
if (n != null) {
Collection<Integer> affReq = msg.cacheGroupsAffinityRequest();
- Collection<CacheGroupAffinityMessage> cachesAff = null;
+ Map<Integer, CacheGroupAffinityMessage> joinedNodeAff = null;
if (affReq != null) {
- Map<Integer, CacheGroupAffinityMessage> affMap = CacheGroupAffinityMessage.createAffinityMessages(
+ joinedNodeAff = CacheGroupAffinityMessage.createAffinityMessages(
cctx,
msg.exchangeId().topologyVersion(),
affReq,
null);
-
- cachesAff = affMap.values();
}
- sendAllPartitions(F.asList(n), cachesAff);
+ sendAllPartitions(F.asList(n), joinedNodeAff);
}
}
catch (IgniteCheckedException e) {
@@ -2055,46 +2056,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
Set<Integer> affReq = exchCtx.groupsAffinityRequestOnJoin();
- if (localJoinExchange() && affReq != null) {
- Map<Long, ClusterNode> nodesByOrder = new HashMap<>();
-
- Collection<CacheGroupAffinityMessage> cachesAff = msg.cachesAffinity();
-
- assert !F.isEmpty(cachesAff) : msg;
- assert cachesAff.size() >= affReq.size();
-
- int cnt = 0;
-
- for (CacheGroupAffinityMessage aff : cachesAff) {
- if (affReq.contains(aff.groupId())) {
- CacheGroupContext grp = cctx.cache().cacheGroup(aff.groupId());
-
- assert grp != null : aff.groupId();
- assert AffinityTopologyVersion.NONE.equals(grp.affinity().lastVersion());
-
- List<List<ClusterNode>> assignments = aff.createAssignments(nodesByOrder, discoCache);
-
- // Calculate ideal assignments.
- if (!grp.affinity().centralizedAffinityFunction())
- grp.affinity().calculate(topologyVersion(), discoEvt, discoCache);
-
- grp.affinity().initialize(topologyVersion(), assignments);
-
- try {
- grp.topology().initPartitions(this);
- }
- catch (IgniteInterruptedCheckedException e) {
- U.warn(log, "Interrupted when initialize local partitions.");
-
- return;
- }
-
- cnt++;
- }
- }
-
- assert affReq.size() == cnt : cnt;
- }
+ if (localJoinExchange() && affReq != null)
+ cctx.affinity().onLocalJoin(this, msg);
updatePartitionFullMap(msg);
http://git-wip-us.apache.org/repos/asf/ignite/blob/8fb60ffc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
index 1ea8757..a4258c2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
@@ -456,15 +456,20 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
}
switch (writer.state()) {
+ case 5:
+ if (!writer.writeMap("dupPartsData", dupPartsData, MessageCollectionItemType.INT, MessageCollectionItemType.INT))
+ return false;
+
+ writer.incrementState();
case 6:
- if (!writer.writeMap("dupPartsData", dupPartsData, MessageCollectionItemType.INT, MessageCollectionItemType.INT))
+ if (!writer.writeByteArray("errsBytes", errsBytes))
return false;
writer.incrementState();
case 7:
- if (!writer.writeByteArray("errsBytes", errsBytes))
+ if (!writer.writeMap("joinedNodeAff", joinedNodeAff, MessageCollectionItemType.INT, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
@@ -515,9 +520,16 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
return false;
switch (reader.state()) {
+ case 5:
+ dupPartsData = reader.readMap("dupPartsData", MessageCollectionItemType.INT, MessageCollectionItemType.INT, false);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
case 6:
- dupPartsData = reader.readMap("dupPartsData", MessageCollectionItemType.INT, MessageCollectionItemType.INT, false);
+ errsBytes = reader.readByteArray("errsBytes");
if (!reader.isLastRead())
return false;
@@ -525,7 +537,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
reader.incrementState();
case 7:
- errsBytes = reader.readByteArray("errsBytes");
+ joinedNodeAff = reader.readMap("joinedNodeAff", MessageCollectionItemType.INT, MessageCollectionItemType.MSG, false);
if (!reader.isLastRead())
return false;
[5/6] ignite git commit: Merge branch 'ignite-5578-locJoin' into
ignite-5578
Posted by sb...@apache.org.
Merge branch 'ignite-5578-locJoin' into ignite-5578
# Conflicts:
# modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
# modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
# modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java
# modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
# modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5978213e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5978213e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5978213e
Branch: refs/heads/ignite-5578
Commit: 5978213e1e8c8d81d4b33ced4e38bec1edd5cee5
Parents: a7cb829 ba46cbd
Author: sboikov <sb...@gridgain.com>
Authored: Wed Jul 19 13:13:15 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Jul 19 13:13:15 2017 +0300
----------------------------------------------------------------------
.../internal/managers/discovery/DiscoCache.java | 3 +-
.../discovery/GridDiscoveryManager.java | 5 +-
.../cache/CacheAffinitySharedManager.java | 52 ++++++++++++++++++--
.../GridCachePartitionExchangeManager.java | 17 -------
.../preloader/CacheGroupAffinityMessage.java | 2 +-
.../GridDhtPartitionsExchangeFuture.java | 7 ++-
.../preloader/GridDhtPartitionsFullMessage.java | 4 +-
7 files changed, 62 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/5978213e/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 6ec9b73,1e34f0c..8d309ed
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@@ -697,7 -697,11 +697,7 @@@ public class GridDiscoveryManager exten
// event notifications, since SPI notifies manager about all events from this listener.
if (verChanged) {
if (discoCache == null)
- discoCache = createDiscoCache(nextTopVer, ctx.state().clusterState(), locNode, topSnapshot);
- discoCache = createDiscoCache(
- nextTopVer,
- ctx.state().clusterState(),
- locNode,
- topSnapshot);
++ discoCache = createDiscoCache(nextTopVer,ctx.state().clusterState(), locNode, topSnapshot);
discoCacheHist.put(nextTopVer, discoCache);
@@@ -767,8 -771,11 +767,8 @@@
topHist.clear();
- topSnap.set(new Snapshot(AffinityTopologyVersion.ZERO, createDiscoCache(
- AffinityTopologyVersion.ZERO,
- ctx.state().clusterState(),
- locNode,
- Collections.<ClusterNode>emptySet())));
+ topSnap.set(new Snapshot(AffinityTopologyVersion.ZERO,
- createDiscoCache(AffinityTopologyVersion.ZERO, ctx.state().clusterState(), locNode, Collections.<ClusterNode>emptySet())));
++ createDiscoCache(AffinityTopologyVersion.ZERO,ctx.state().clusterState(), locNode, Collections.<ClusterNode>emptySet())));
}
else if (type == EVT_CLIENT_NODE_RECONNECTED) {
assert locNode.isClient() : locNode;
http://git-wip-us.apache.org/repos/asf/ignite/blob/5978213e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index ba6a22b,bb27613..1fc59bb
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@@ -1244,130 -1247,51 +1246,174 @@@ public class CacheAffinitySharedManager
return grpHolder.affinity();
}
+ public void mergeExchangesOnServerLeft(final GridDhtPartitionsExchangeFuture fut, final GridDhtPartitionsFullMessage msg) {
+ final Map<Long, ClusterNode> nodesByOrder = new HashMap<>();
+
+ final Map<Object, List<List<ClusterNode>>> affCache = new HashMap<>();
+
+ log.info("mergeExchangesOnServerLeft [topVer=" + fut.context().events().discoveryCache().version() + ']');
+
+ forAllCacheGroups(false, new IgniteInClosureX<GridAffinityAssignmentCache>() {
+ @Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException {
+ ExchangeDiscoveryEvents evts = fut.context().events();
+
+ Map<Integer, CacheGroupAffinityMessage> idealAffDiff = msg.idealAffinityDiff();
+
+ List<List<ClusterNode>> idealAssignment =
+ aff.calculate(evts.topologyVersion(), evts.lastEvent(), evts.discoveryCache());
+
+ CacheGroupAffinityMessage affMsg = idealAffDiff != null ? idealAffDiff.get(aff.groupId()) : null;
+
+ List<List<ClusterNode>> newAssignment;
+
+ if (affMsg != null) {
+ Map<Integer, GridLongList> diff = affMsg.assignmentsDiff();
+
+ assert !F.isEmpty(diff);
+
+ newAssignment = new ArrayList<>(idealAssignment);
+
+ for (Map.Entry<Integer, GridLongList> e : diff.entrySet()) {
+ GridLongList assign = e.getValue();
+
+ newAssignment.set(e.getKey(), CacheGroupAffinityMessage.toNodes(assign,
+ nodesByOrder,
+ evts.discoveryCache()));
+ }
+ }
+ else
+ newAssignment = idealAssignment;
+
+ aff.initialize(evts.topologyVersion(), cachedAssignment(aff, newAssignment, affCache));
+ }
+ });
+ }
+
+ public void onJoin(final GridDhtPartitionsExchangeFuture fut, GridDhtPartitionsFullMessage msg)
+ throws IgniteCheckedException {
+ final Set<Integer> affReq = fut.context().groupsAffinityRequestOnJoin();
+
+ final Map<Long, ClusterNode> nodesByOrder = new HashMap<>();
+
+ final Map<Integer, CacheGroupAffinityMessage> joinedNodeAff = msg.joinedNodeAffinity();
+
+ assert !F.isEmpty(joinedNodeAff) : msg;
+ assert joinedNodeAff.size() >= affReq.size();
+
+ forAllCacheGroups(false, new IgniteInClosureX<GridAffinityAssignmentCache>() {
+ @Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException {
+ if (affReq.contains(aff.groupId())) {
+ assert AffinityTopologyVersion.NONE.equals(aff.lastVersion());
+
+ CacheGroupAffinityMessage affMsg = joinedNodeAff.get(aff.groupId());
+
+ assert affMsg != null;
+
+ ExchangeDiscoveryEvents evts = fut.context().events();
+
+ List<List<ClusterNode>> assignments = affMsg.createAssignments(nodesByOrder, evts.discoveryCache());
+
+ // Calculate ideal assignments.
+ if (!aff.centralizedAffinityFunction())
+ aff.calculate(evts.topologyVersion(), evts.lastEvent(), evts.discoveryCache());
+
+ aff.initialize(evts.topologyVersion(), assignments);
+
+ CacheGroupContext grp = cctx.cache().cacheGroup(aff.groupId());
+
+ assert grp != null;
+
+ grp.topology().initPartitions(fut);
+ }
+ }
+ });
+ }
+
+ public void mergeExchangesOnServerJoin(GridDhtPartitionsExchangeFuture fut, boolean crd)
+ throws IgniteCheckedException {
+ final ExchangeDiscoveryEvents evts = fut.context().events();
+
+ log.info("mergeExchangesInitAffinityOnServerLeft [topVer=" + evts.discoveryCache().version() + ']');
+
+ assert evts.serverJoin() && !evts.serverLeft();
+
+ WaitRebalanceInfo waitRebalanceInfo = initAffinityOnNodeJoin(evts, crd);
+
+ setWaitRebalanceInfo(waitRebalanceInfo, evts.waitRebalanceEventVersion(), crd);
+ }
+
+ public Map<Integer, CacheGroupAffinityMessage> mergeExchangesInitAffinityOnServerLeft(
+ GridDhtPartitionsExchangeFuture fut) throws IgniteCheckedException
+ {
+ final ExchangeDiscoveryEvents evts = fut.context().events();
+
+ assert evts.serverLeft();
+
+ log.info("mergeExchangesInitAffinityOnServerLeft [topVer=" + evts.discoveryCache().version() + ']');
+
+ forAllRegisteredCacheGroups(new IgniteInClosureX<CacheGroupDescriptor>() {
+ @Override public void applyx(CacheGroupDescriptor desc) throws IgniteCheckedException {
+ AffinityTopologyVersion topVer = evts.topologyVersion();
+
+ CacheGroupHolder cache = groupHolder(topVer, desc);
+
+ cache.affinity().calculate(topVer, evts.lastEvent(), evts.discoveryCache());
+ }
+ });
+
+ Map<Integer, Map<Integer, List<Long>>> diff = initAffinityOnNodeLeft0(evts.topologyVersion(),
+ fut,
+ NODE_TO_ORDER,
+ true);
+
+ return CacheGroupAffinityMessage.createAffinityDiffMessages(diff);
+ }
+
/**
+ * @param fut Exchange future.
+ * @param msg Message.
+ */
+ public void onLocalJoin(final GridDhtPartitionsExchangeFuture fut, GridDhtPartitionsFullMessage msg) {
+ final Set<Integer> affReq = fut.context().groupsAffinityRequestOnJoin();
+
+ if (F.isEmpty(affReq))
+ return;
+
+ final Map<Long, ClusterNode> nodesByOrder = new HashMap<>();
+
+ final Map<Integer, CacheGroupAffinityMessage> joinedNodeAff = msg.joinedNodeAffinity();
+
+ assert !F.isEmpty(joinedNodeAff) : msg;
+ assert joinedNodeAff.size() >= affReq.size();
+
+ forAllCacheGroups(false, new IgniteInClosureX<GridAffinityAssignmentCache>() {
+ @Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException {
+ if (affReq.contains(aff.groupId())) {
+ assert AffinityTopologyVersion.NONE.equals(aff.lastVersion());
+
+ CacheGroupAffinityMessage affMsg = joinedNodeAff.get(aff.groupId());
+
+ assert affMsg != null;
+
+ List<List<ClusterNode>> assignments = affMsg.createAssignments(nodesByOrder, fut.discoCache());
+
+ // Calculate ideal assignments.
+ if (!aff.centralizedAffinityFunction())
+ aff.calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache());
+
+ aff.initialize(fut.topologyVersion(), assignments);
+
+ CacheGroupContext grp = cctx.cache().cacheGroup(aff.groupId());
+
+ assert grp != null;
+
+ grp.topology().initPartitions(fut);
+ }
+ }
+ });
+ }
+
+ /**
* Called on exchange initiated by server node join.
*
* @param fut Exchange future.
http://git-wip-us.apache.org/repos/asf/ignite/blob/5978213e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/5978213e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/5978213e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 9ae4032,4f572df..17bea14
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@@ -1264,12 -1201,10 +1266,13 @@@ public class GridDhtPartitionsExchangeF
/**
* @param nodes Nodes.
+ * @param joinedNodeAff Affinity if was requested by some nodes.
* @throws IgniteCheckedException If failed.
*/
- private void sendAllPartitions(Collection<ClusterNode> nodes, Map<Integer, CacheGroupAffinityMessage> joinedNodeAff)
+ private void sendAllPartitions(Collection<ClusterNode> nodes,
+ Map<Integer, CacheGroupAffinityMessage> joinedNodeAff,
+ Map<Integer, CacheGroupAffinityMessage> idealAffDiff,
+ @Nullable GridDhtPartitionExchangeId msgExchId)
throws IgniteCheckedException {
boolean singleNode = nodes.size() == 1;
@@@ -2345,68 -2045,43 +2348,68 @@@
* @param msg Message.
*/
private void processFullMessage(ClusterNode node, GridDhtPartitionsFullMessage msg) {
- assert exchId.equals(msg.exchangeId()) : msg;
- assert msg.lastVersion() != null : msg;
+ try {
+ assert exchId.equals(msg.exchangeId()) : msg;
+ assert msg.lastVersion() != null : msg;
- synchronized (this) {
- if (crd == null || finishState != null)
- return;
+ synchronized (this) {
+ if (crd == null || finishState != null)
+ return;
- if (!crd.equals(node)) {
- if (log.isDebugEnabled())
- log.debug("Received full partition map from unexpected node [oldest=" + crd.id() +
- ", nodeId=" + node.id() + ']');
+ if (!crd.equals(node)) {
+ if (log.isDebugEnabled())
+ log.debug("Received full partition map from unexpected node [oldest=" + crd.id() +
+ ", nodeId=" + node.id() + ']');
- if (node.order() > crd.order())
- fullMsgs.put(node, msg);
+ if (node.order() > crd.order())
+ fullMsgs.put(node, msg);
- return;
+ return;
+ }
+
+ finishState = new FinishState(crd.id(), msg.resultTopologyVersion());
}
- finishState = new FinishState(crd.id());
- }
+ if (exchCtx.mergeExchanges()) {
+ if (msg.resultTopologyVersion() != null && !initialVersion().equals(msg.resultTopologyVersion())) {
+ log.info("Received full message, need merge [curFut=" + topologyVersion() +
+ ", resVer=" + msg.resultTopologyVersion() + ']');
+
+ cctx.exchange().mergeExchanges(this, msg.resultTopologyVersion());
+ }
- Set<Integer> affReq = exchCtx.groupsAffinityRequestOnJoin();
+ if (localJoinExchange())
- cctx.affinity().onJoin(this, msg);
++ cctx.affinity().onLocalJoin(this, msg);
+ else {
+ if (exchCtx.events().serverLeft())
+ cctx.affinity().mergeExchangesOnServerLeft(this, msg);
+ else
+ cctx.affinity().mergeExchangesOnServerJoin(this, false);
- if (localJoinExchange() && affReq != null)
- cctx.affinity().onLocalJoin(this, msg);
+ for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
+ if (grp.isLocal() || cacheGroupStopping(grp.groupId()))
+ continue;
- updatePartitionFullMap(msg);
+ grp.topology().beforeExchange(this, true);
+ }
+ }
+ }
- IgniteCheckedException err = null;
+ updatePartitionFullMap(msg);
- if (stateChangeExchange() && !F.isEmpty(msg.getErrorsMap())) {
- err = new IgniteCheckedException("Cluster state change failed");
+ IgniteCheckedException err = null;
- cctx.kernalContext().state().onStateChangeError(msg.getErrorsMap(), exchActions.stateChangeRequest());
- }
+ if (stateChangeExchange() && !F.isEmpty(msg.getErrorsMap())) {
+ err = new IgniteCheckedException("Cluster state change failed");
- onDone(exchId.topologyVersion(), err);
+ cctx.kernalContext().state().onStateChangeError(msg.getErrorsMap(), exchActions.stateChangeRequest());
+ }
+
+ onDone(exchCtx.events().topologyVersion(), err);
+ }
+ catch (IgniteCheckedException e) {
+ onDone(e);
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/5978213e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
----------------------------------------------------------------------