You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/12/09 08:57:31 UTC
[02/14] ignite git commit: ignite-4296 Optimize
GridDhtPartitionsSingleMessage processing - optimized processing of
GridDhtPartitionsSingleMessage - minor optimizations for
RendezvousAffinityFunction - fixed heartbeats sending in tcp discovery
ignite-4296 Optimize GridDhtPartitionsSingleMessage processing
- optimized processing of GridDhtPartitionsSingleMessage
- minor optimizations for RendezvousAffinityFunction
- fixed heartbeats sending in tcp discovery
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/acf20b32
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/acf20b32
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/acf20b32
Branch: refs/heads/ignite-comm-balance-master
Commit: acf20b32d8fb68e42b904b091fb2b943f4558cef
Parents: b4aedfd
Author: sboikov <sb...@gridgain.com>
Authored: Mon Dec 5 14:01:28 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Dec 5 14:01:28 2016 +0300
----------------------------------------------------------------------
.../rendezvous/RendezvousAffinityFunction.java | 80 ++++--
.../discovery/GridDiscoveryManager.java | 118 +-------
.../GridCachePartitionExchangeManager.java | 6 +-
.../processors/cache/GridCacheUtils.java | 17 --
.../binary/CacheObjectBinaryProcessorImpl.java | 3 +-
.../dht/GridClientPartitionTopology.java | 120 ++++----
.../distributed/dht/GridDhtLocalPartition.java | 1 -
.../dht/GridDhtPartitionTopology.java | 28 +-
.../dht/GridDhtPartitionTopologyImpl.java | 284 +++++++++++--------
.../dht/preloader/GridDhtPartitionFullMap.java | 18 +-
.../GridDhtPartitionsExchangeFuture.java | 56 +++-
.../cache/transactions/IgniteTxHandler.java | 4 +-
.../service/GridServiceProcessor.java | 4 +-
.../ignite/spi/discovery/tcp/ServerImpl.java | 40 ++-
.../tcp/internal/TcpDiscoveryStatistics.java | 4 +
.../AbstractAffinityFunctionSelfTest.java | 2 +-
.../GridDiscoveryManagerAliveCacheSelfTest.java | 2 +-
.../GridCachePartitionedAffinitySpreadTest.java | 7 +-
.../distributed/dht/GridCacheDhtTestUtils.java | 232 ---------------
.../h2/twostep/GridReduceQueryExecutor.java | 14 +-
20 files changed, 437 insertions(+), 603 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/acf20b32/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java b/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java
index ec12973..75e7c92 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java
@@ -17,7 +17,6 @@
package org.apache.ignite.cache.affinity.rendezvous;
-import java.io.ByteArrayOutputStream;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
@@ -354,46 +353,69 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza
/**
* Returns collection of nodes (primary first) for specified partition.
+ *
+ * @param d Message digest.
+ * @param part Partition.
+ * @param nodes Nodes.
+ * @param nodesHash Serialized nodes hashes.
+ * @param backups Number of backups.
+ * @param neighborhoodCache Neighborhood.
+ * @return Assignment.
*/
- public List<ClusterNode> assignPartition(int part, List<ClusterNode> nodes, int backups,
+ public List<ClusterNode> assignPartition(MessageDigest d,
+ int part,
+ List<ClusterNode> nodes,
+ Map<ClusterNode, byte[]> nodesHash,
+ int backups,
@Nullable Map<UUID, Collection<ClusterNode>> neighborhoodCache) {
if (nodes.size() <= 1)
return nodes;
- List<IgniteBiTuple<Long, ClusterNode>> lst = new ArrayList<>();
+ if (d == null)
+ d = digest.get();
- MessageDigest d = digest.get();
+ List<IgniteBiTuple<Long, ClusterNode>> lst = new ArrayList<>(nodes.size());
- for (ClusterNode node : nodes) {
- Object nodeHash = resolveNodeHash(node);
+ try {
+ for (int i = 0; i < nodes.size(); i++) {
+ ClusterNode node = nodes.get(i);
- try {
- ByteArrayOutputStream out = new ByteArrayOutputStream();
+ byte[] nodeHashBytes = nodesHash.get(node);
- byte[] nodeHashBytes = U.marshal(ignite.configuration().getMarshaller(), nodeHash);
+ if (nodeHashBytes == null) {
+ Object nodeHash = resolveNodeHash(node);
- out.write(U.intToBytes(part), 0, 4); // Avoid IOException.
- out.write(nodeHashBytes, 0, nodeHashBytes.length); // Avoid IOException.
+ byte[] nodeHashBytes0 = U.marshal(ignite.configuration().getMarshaller(), nodeHash);
+
+ // Add 4 bytes for partition bytes.
+ nodeHashBytes = new byte[nodeHashBytes0.length + 4];
+
+ System.arraycopy(nodeHashBytes0, 0, nodeHashBytes, 4, nodeHashBytes0.length);
+
+ nodesHash.put(node, nodeHashBytes);
+ }
+
+ U.intToBytes(part, nodeHashBytes, 0);
d.reset();
- byte[] bytes = d.digest(out.toByteArray());
+ byte[] bytes = d.digest(nodeHashBytes);
long hash =
- (bytes[0] & 0xFFL)
- | ((bytes[1] & 0xFFL) << 8)
- | ((bytes[2] & 0xFFL) << 16)
- | ((bytes[3] & 0xFFL) << 24)
- | ((bytes[4] & 0xFFL) << 32)
- | ((bytes[5] & 0xFFL) << 40)
- | ((bytes[6] & 0xFFL) << 48)
- | ((bytes[7] & 0xFFL) << 56);
+ (bytes[0] & 0xFFL)
+ | ((bytes[1] & 0xFFL) << 8)
+ | ((bytes[2] & 0xFFL) << 16)
+ | ((bytes[3] & 0xFFL) << 24)
+ | ((bytes[4] & 0xFFL) << 32)
+ | ((bytes[5] & 0xFFL) << 40)
+ | ((bytes[6] & 0xFFL) << 48)
+ | ((bytes[7] & 0xFFL) << 56);
lst.add(F.t(hash, node));
}
- catch (IgniteCheckedException e) {
- throw new IgniteException(e);
- }
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
}
Collections.sort(lst, COMPARATOR);
@@ -474,8 +496,18 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza
Map<UUID, Collection<ClusterNode>> neighborhoodCache = exclNeighbors ?
GridCacheUtils.neighbors(affCtx.currentTopologySnapshot()) : null;
+ MessageDigest d = digest.get();
+
+ List<ClusterNode> nodes = affCtx.currentTopologySnapshot();
+
+ Map<ClusterNode, byte[]> nodesHash = U.newHashMap(nodes.size());
+
for (int i = 0; i < parts; i++) {
- List<ClusterNode> partAssignment = assignPartition(i, affCtx.currentTopologySnapshot(), affCtx.backups(),
+ List<ClusterNode> partAssignment = assignPartition(d,
+ i,
+ nodes,
+ nodesHash,
+ affCtx.backups(),
neighborhoodCache);
assignments.add(partAssignment);
http://git-wip-us.apache.org/repos/asf/ignite/blob/acf20b32/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index d24f900..ddd4ee3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -42,7 +42,7 @@ import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
@@ -1623,17 +1623,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
/**
* Gets cache remote nodes for cache with given name.
*
- * @param cacheName Cache name.
- * @param topVer Topology version.
- * @return Collection of cache nodes.
- */
- public Collection<ClusterNode> remoteCacheNodes(@Nullable String cacheName, AffinityTopologyVersion topVer) {
- return resolveDiscoCache(cacheName, topVer).remoteCacheNodes(cacheName, topVer.topologyVersion());
- }
-
- /**
- * Gets cache remote nodes for cache with given name.
- *
* @param topVer Topology version.
* @return Collection of cache nodes.
*/
@@ -1648,7 +1637,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
* @param topVer Topology version.
* @return Collection of cache nodes.
*/
- public Collection<ClusterNode> aliveCacheNodes(@Nullable String cacheName, AffinityTopologyVersion topVer) {
+ Collection<ClusterNode> aliveCacheNodes(@Nullable String cacheName, AffinityTopologyVersion topVer) {
return resolveDiscoCache(cacheName, topVer).aliveCacheNodes(cacheName, topVer.topologyVersion());
}
@@ -1659,38 +1648,20 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
* @param topVer Topology version.
* @return Collection of cache nodes.
*/
- public Collection<ClusterNode> aliveRemoteCacheNodes(@Nullable String cacheName, AffinityTopologyVersion topVer) {
+ Collection<ClusterNode> aliveRemoteCacheNodes(@Nullable String cacheName, AffinityTopologyVersion topVer) {
return resolveDiscoCache(cacheName, topVer).aliveRemoteCacheNodes(cacheName, topVer.topologyVersion());
}
/**
- * Gets alive remote server nodes with at least one cache configured.
- *
* @param topVer Topology version (maximum allowed node order).
- * @return Collection of alive cache nodes.
+ * @return Oldest alive server nodes with at least one cache configured.
*/
- public Collection<ClusterNode> aliveRemoteServerNodesWithCaches(AffinityTopologyVersion topVer) {
- return resolveDiscoCache(null, topVer).aliveRemoteServerNodesWithCaches(topVer.topologyVersion());
- }
+ @Nullable public ClusterNode oldestAliveCacheServerNode(AffinityTopologyVersion topVer) {
+ DiscoCache cache = resolveDiscoCache(null, topVer);
- /**
- * Gets alive server nodes with at least one cache configured.
- *
- * @param topVer Topology version (maximum allowed node order).
- * @return Collection of alive cache nodes.
- */
- public Collection<ClusterNode> aliveServerNodesWithCaches(AffinityTopologyVersion topVer) {
- return resolveDiscoCache(null, topVer).aliveServerNodesWithCaches(topVer.topologyVersion());
- }
+ Map.Entry<ClusterNode, Boolean> e = cache.aliveSrvNodesWithCaches.firstEntry();
- /**
- * Gets alive nodes with at least one cache configured.
- *
- * @param topVer Topology version (maximum allowed node order).
- * @return Collection of alive cache nodes.
- */
- public Collection<ClusterNode> aliveNodesWithCaches(AffinityTopologyVersion topVer) {
- return resolveDiscoCache(null, topVer).aliveNodesWithCaches(topVer.topologyVersion());
+ return e != null ? e.getKey() : null;
}
/**
@@ -2580,19 +2551,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
private final ConcurrentMap<String, Collection<ClusterNode>> aliveRmtCacheNodes;
/**
- * Cached alive remote nodes with caches.
- */
- private final Collection<ClusterNode> aliveNodesWithCaches;
-
- /**
* Cached alive server remote nodes with caches.
*/
- private final Collection<ClusterNode> aliveSrvNodesWithCaches;
-
- /**
- * Cached alive remote server nodes with caches.
- */
- private final Collection<ClusterNode> aliveRmtSrvNodesWithCaches;
+ private final ConcurrentSkipListMap<ClusterNode, Boolean> aliveSrvNodesWithCaches;
/**
* @param loc Local node.
@@ -2625,9 +2586,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
aliveCacheNodes = new ConcurrentHashMap8<>(allNodes.size(), 1.0f);
aliveRmtCacheNodes = new ConcurrentHashMap8<>(allNodes.size(), 1.0f);
- aliveNodesWithCaches = new ConcurrentSkipListSet<>();
- aliveSrvNodesWithCaches = new ConcurrentSkipListSet<>();
- aliveRmtSrvNodesWithCaches = new ConcurrentSkipListSet<>();
+ aliveSrvNodesWithCaches = new ConcurrentSkipListMap<>(GridNodeOrderComparator.INSTANCE);
nodesByVer = new TreeMap<>();
long maxOrder0 = 0;
@@ -2681,18 +2640,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
}
}
- if (hasCaches) {
- if (alive(node.id())) {
- aliveNodesWithCaches.add(node);
-
- if (!CU.clientNode(node)) {
- aliveSrvNodesWithCaches.add(node);
-
- if (!loc.id().equals(node.id()))
- aliveRmtSrvNodesWithCaches.add(node);
- }
- }
- }
+ if (hasCaches && alive(node.id()) && !CU.clientNode(node))
+ aliveSrvNodesWithCaches.put(node, Boolean.TRUE);
IgniteProductVersion nodeVer = U.productVersion(node);
@@ -2821,17 +2770,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
}
/**
- * Gets all remote nodes that have cache with given name.
- *
- * @param cacheName Cache name.
- * @param topVer Topology version.
- * @return Collection of nodes.
- */
- Collection<ClusterNode> remoteCacheNodes(@Nullable String cacheName, final long topVer) {
- return filter(topVer, rmtCacheNodes.get(cacheName));
- }
-
- /**
* Gets all remote nodes that have at least one cache configured.
*
* @param topVer Topology version.
@@ -2876,36 +2814,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
}
/**
- * Gets all alive remote server nodes with at least one cache configured.
- *
- * @param topVer Topology version.
- * @return Collection of nodes.
- */
- Collection<ClusterNode> aliveRemoteServerNodesWithCaches(final long topVer) {
- return filter(topVer, aliveRmtSrvNodesWithCaches);
- }
-
- /**
- * Gets all alive server nodes with at least one cache configured.
- *
- * @param topVer Topology version.
- * @return Collection of nodes.
- */
- Collection<ClusterNode> aliveServerNodesWithCaches(final long topVer) {
- return filter(topVer, aliveSrvNodesWithCaches);
- }
-
- /**
- * Gets all alive remote nodes with at least one cache configured.
- *
- * @param topVer Topology version.
- * @return Collection of nodes.
- */
- Collection<ClusterNode> aliveNodesWithCaches(final long topVer) {
- return filter(topVer, aliveNodesWithCaches);
- }
-
- /**
* Checks if cache with given name has at least one node with near cache enabled.
*
* @param cacheName Cache name.
@@ -2928,9 +2836,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
filterNodeMap(aliveRmtCacheNodes, leftNode);
- aliveNodesWithCaches.remove(leftNode);
aliveSrvNodesWithCaches.remove(leftNode);
- aliveRmtSrvNodesWithCaches.remove(leftNode);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/acf20b32/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 503b334..7a24aa1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -740,7 +740,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
* Partition refresh callback.
*/
private void refreshPartitions() {
- ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx, AffinityTopologyVersion.NONE);
+ ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(AffinityTopologyVersion.NONE);
if (oldest == null) {
if (log.isDebugEnabled())
@@ -1224,7 +1224,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
top = cacheCtx.topology();
if (top != null)
- updated |= top.update(null, entry.getValue(), null) != null;
+ updated |= top.update(null, entry.getValue(), null);
}
if (!cctx.kernalContext().clientNode() && updated)
@@ -1273,7 +1273,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
top = cacheCtx.topology();
if (top != null) {
- updated |= top.update(null, entry.getValue(), null) != null;
+ updated |= top.update(null, entry.getValue(), null, true);
cctx.affinity().checkRebalanceState(top, cacheId);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/acf20b32/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index 90e428c..d32f4c1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -490,23 +490,6 @@ public class GridCacheUtils {
}
/**
- * Gets oldest alive server node with at least one cache configured for specified topology version.
- *
- * @param ctx Context.
- * @param topVer Maximum allowed topology version.
- * @return Oldest alive cache server node.
- */
- @Nullable public static ClusterNode oldestAliveCacheServerNode(GridCacheSharedContext ctx,
- AffinityTopologyVersion topVer) {
- Collection<ClusterNode> nodes = ctx.discovery().aliveServerNodesWithCaches(topVer);
-
- if (nodes.isEmpty())
- return null;
-
- return oldest(nodes);
- }
-
- /**
* @param nodes Nodes.
* @return Oldest node for the given topology version.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/acf20b32/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
index 568a4da..1d60c42 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
@@ -304,8 +304,7 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
assert !metaDataCache.context().affinityNode();
while (true) {
- ClusterNode oldestSrvNode =
- CU.oldestAliveCacheServerNode(ctx.cache().context(), AffinityTopologyVersion.NONE);
+ ClusterNode oldestSrvNode = ctx.discovery().oldestAliveCacheServerNode(AffinityTopologyVersion.NONE);
if (oldestSrvNode == null)
break;
http://git-wip-us.apache.org/repos/asf/ignite/blob/acf20b32/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index 5efb317..816132d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -271,7 +271,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
removeNode(exchId.nodeId());
// In case if node joins, get topology at the time of joining node.
- ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx, topVer);
+ ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(topVer);
assert oldest != null;
@@ -536,7 +536,8 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
try {
assert node2part != null && node2part.valid() : "Invalid node2part [node2part: " + node2part +
- ", locNodeId=" + cctx.localNodeId() + ", gridName=" + cctx.gridName() + ']';
+ ", locNodeId=" + cctx.localNodeId() +
+ ", gridName=" + cctx.gridName() + ']';
GridDhtPartitionFullMap m = node2part;
@@ -549,7 +550,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
/** {@inheritDoc} */
@SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
- @Nullable @Override public GridDhtPartitionMap2 update(@Nullable GridDhtPartitionExchangeId exchId,
+ @Nullable @Override public boolean update(@Nullable GridDhtPartitionExchangeId exchId,
GridDhtPartitionFullMap partMap,
Map<Integer, Long> cntrMap) {
if (log.isDebugEnabled())
@@ -563,7 +564,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
log.debug("Stale exchange id for full partition map update (will ignore) [lastExchId=" +
lastExchangeId + ", exchId=" + exchId + ']');
- return null;
+ return false;
}
if (node2part != null && node2part.compareTo(partMap) >= 0) {
@@ -571,7 +572,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
log.debug("Stale partition map for full partition map update (will ignore) [lastExchId=" +
lastExchangeId + ", exchId=" + exchId + ", curMap=" + node2part + ", newMap=" + partMap + ']');
- return null;
+ return false;
}
updateSeq.incrementAndGet();
@@ -634,7 +635,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
if (log.isDebugEnabled())
log.debug("Partition map after full update: " + fullMapString());
- return null;
+ return false;
}
finally {
lock.writeLock().unlock();
@@ -642,10 +643,10 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
}
/** {@inheritDoc} */
- @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
- @Nullable @Override public GridDhtPartitionMap2 update(@Nullable GridDhtPartitionExchangeId exchId,
+ @Nullable @Override public boolean update(@Nullable GridDhtPartitionExchangeId exchId,
GridDhtPartitionMap2 parts,
- Map<Integer, Long> cntrMap) {
+ Map<Integer, Long> cntrMap,
+ boolean checkEvictions) {
if (log.isDebugEnabled())
log.debug("Updating single partition map [exchId=" + exchId + ", parts=" + mapString(parts) + ']');
@@ -654,29 +655,27 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
log.debug("Received partition update for non-existing node (will ignore) [exchId=" + exchId +
", parts=" + parts + ']');
- return null;
+ return false;
}
lock.writeLock().lock();
try {
if (stopping)
- return null;
+ return false;
if (lastExchangeId != null && exchId != null && lastExchangeId.compareTo(exchId) > 0) {
if (log.isDebugEnabled())
log.debug("Stale exchange id for single partition map update (will ignore) [lastExchId=" +
lastExchangeId + ", exchId=" + exchId + ']');
- return null;
+ return false;
}
if (exchId != null)
lastExchangeId = exchId;
if (node2part == null) {
- U.dumpStack(log, "Created invalid: " + node2part);
-
// Create invalid partition map.
node2part = new GridDhtPartitionFullMap();
}
@@ -688,43 +687,45 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
log.debug("Stale update sequence for single partition map update (will ignore) [exchId=" + exchId +
", curSeq=" + cur.updateSequence() + ", newSeq=" + parts.updateSequence() + ']');
- return null;
+ return false;
}
long updateSeq = this.updateSeq.incrementAndGet();
- node2part = new GridDhtPartitionFullMap(node2part, updateSeq);
-
- boolean changed = false;
+ node2part.updateSequence(updateSeq);
- if (cur == null || !cur.equals(parts))
- changed = true;
+ boolean changed = cur == null || !cur.equals(parts);
- node2part.put(parts.nodeId(), parts);
+ if (changed) {
+ node2part.put(parts.nodeId(), parts);
- part2node = new HashMap<>(part2node);
+ // Add new mappings.
+ for (Integer p : parts.keySet()) {
+ Set<UUID> ids = part2node.get(p);
- // Add new mappings.
- for (Integer p : parts.keySet()) {
- Set<UUID> ids = part2node.get(p);
+ if (ids == null)
+ // Initialize HashSet to size 3 in anticipation that there won't be
+ // more than 3 nodes per partition.
+ part2node.put(p, ids = U.newHashSet(3));
- if (ids == null)
- // Initialize HashSet to size 3 in anticipation that there won't be
- // more than 3 nodes per partition.
- part2node.put(p, ids = U.newHashSet(3));
+ ids.add(parts.nodeId());
+ }
- changed |= ids.add(parts.nodeId());
- }
+ // Remove obsolete mappings.
+ if (cur != null) {
+ for (Integer p : cur.keySet()) {
+ if (parts.containsKey(p))
+ continue;
- // Remove obsolete mappings.
- if (cur != null) {
- for (Integer p : F.view(cur.keySet(), F0.notIn(parts.keySet()))) {
- Set<UUID> ids = part2node.get(p);
+ Set<UUID> ids = part2node.get(p);
- if (ids != null)
- changed |= ids.remove(parts.nodeId());
+ if (ids != null)
+ ids.remove(parts.nodeId());
+ }
}
}
+ else
+ cur.updateSequence(parts.updateSequence(), parts.topologyVersion());
if (cntrMap != null) {
for (Map.Entry<Integer, Long> e : cntrMap.entrySet()) {
@@ -740,13 +741,18 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
if (log.isDebugEnabled())
log.debug("Partition map after single update: " + fullMapString());
- return changed ? localPartitionMap() : null;
+ return changed;
}
finally {
lock.writeLock().unlock();
}
}
+ /** {@inheritDoc} */
+ @Override public void checkEvictions() {
+ // No-op.
+ }
+
/**
* Updates value for single partition.
*
@@ -755,13 +761,12 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
* @param state State.
* @param updateSeq Update sequence.
*/
- @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
private void updateLocal(int p, UUID nodeId, GridDhtPartitionState state, long updateSeq) {
assert lock.isWriteLockedByCurrentThread();
assert nodeId.equals(cctx.localNodeId());
// In case if node joins, get topology at the time of joining node.
- ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx, topVer);
+ ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(topVer);
// If this node became the oldest node.
if (oldest.id().equals(cctx.localNodeId())) {
@@ -811,7 +816,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
assert nodeId != null;
assert lock.writeLock().isHeldByCurrentThread();
- ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx, topVer);
+ ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(topVer);
ClusterNode loc = cctx.localNode();
@@ -877,18 +882,6 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
}
/** {@inheritDoc} */
- @Nullable @Override public GridDhtPartitionMap2 partitions(UUID nodeId) {
- lock.readLock().lock();
-
- try {
- return node2part.get(nodeId);
- }
- finally {
- lock.readLock().unlock();
- }
- }
-
- /** {@inheritDoc} */
@Override public Map<Integer, Long> updateCounters(boolean skipZeros) {
lock.readLock().lock();
@@ -919,6 +912,27 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
}
/** {@inheritDoc} */
+ @Override public boolean hasMovingPartitions() {
+ lock.readLock().lock();
+
+ try {
+ assert node2part != null && node2part.valid() : "Invalid node2part [node2part: " + node2part +
+ ", locNodeId=" + cctx.localNodeId() +
+ ", gridName=" + cctx.gridName() + ']';
+
+ for (GridDhtPartitionMap2 map : node2part.values()) {
+ if (map.hasMovingPartitions())
+ return true;
+ }
+
+ return false;
+ }
+ finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
@Override public void printMemoryStats(int threshold) {
X.println(">>> Cache partition topology stats [grid=" + cctx.gridName() + ", cacheId=" + cacheId + ']');
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/acf20b32/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
index 39a3e08..668a1cd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
@@ -479,7 +479,6 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
if ((reservations & 0xFFFF) == 0 && casState(reservations, RENTING)) {
shouldBeRenting = false;
-
if (log.isDebugEnabled())
log.debug("Moved partition to RENTING state: " + this);
http://git-wip-us.apache.org/repos/asf/ignite/blob/acf20b32/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
index 0f75a5d..14ce1f9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
@@ -195,6 +195,11 @@ public interface GridDhtPartitionTopology {
public GridDhtPartitionFullMap partitionMap(boolean onlyActive);
/**
+ * @return {@code True} If one of cache nodes has partitions in {@link GridDhtPartitionState#MOVING} state.
+ */
+ public boolean hasMovingPartitions();
+
+ /**
* @param e Entry removed from cache.
*/
public void onRemoved(GridDhtCacheEntry e);
@@ -203,9 +208,9 @@ public interface GridDhtPartitionTopology {
* @param exchId Exchange ID.
* @param partMap Update partition map.
* @param cntrMap Partition update counters.
- * @return Local partition map if there were evictions or {@code null} otherwise.
+ * @return {@code True} if topology state changed.
*/
- public GridDhtPartitionMap2 update(@Nullable GridDhtPartitionExchangeId exchId,
+ public boolean update(@Nullable GridDhtPartitionExchangeId exchId,
GridDhtPartitionFullMap partMap,
@Nullable Map<Integer, Long> cntrMap);
@@ -213,11 +218,18 @@ public interface GridDhtPartitionTopology {
* @param exchId Exchange ID.
* @param parts Partitions.
* @param cntrMap Partition update counters.
- * @return Local partition map if there were evictions or {@code null} otherwise.
+ * @param checkEvictions Check evictions flag.
+ * @return {@code True} if topology state changed.
*/
- @Nullable public GridDhtPartitionMap2 update(@Nullable GridDhtPartitionExchangeId exchId,
+ @Nullable public boolean update(@Nullable GridDhtPartitionExchangeId exchId,
GridDhtPartitionMap2 parts,
- @Nullable Map<Integer, Long> cntrMap);
+ @Nullable Map<Integer, Long> cntrMap,
+ boolean checkEvictions);
+
+ /**
+ *
+ */
+ public void checkEvictions();
/**
* @param skipZeros If {@code true} then filters out zero counters.
@@ -238,12 +250,6 @@ public interface GridDhtPartitionTopology {
public void onEvicted(GridDhtLocalPartition part, boolean updateSeq);
/**
- * @param nodeId Node to get partitions for.
- * @return Partitions for node.
- */
- @Nullable public GridDhtPartitionMap2 partitions(UUID nodeId);
-
- /**
* Prints memory stats.
*
* @param threshold Threshold for number of entries.
http://git-wip-us.apache.org/repos/asf/ignite/blob/acf20b32/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index ab573bd..1b4dcc9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -44,7 +44,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap2;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
-import org.apache.ignite.internal.util.F0;
import org.apache.ignite.internal.util.GridAtomicLong;
import org.apache.ignite.internal.util.StripedCompositeReadWriteLock;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
@@ -330,8 +329,9 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
}
/** {@inheritDoc} */
- @Override public void initPartitions(
- GridDhtPartitionsExchangeFuture exchFut) throws IgniteInterruptedCheckedException {
+ @Override public void initPartitions(GridDhtPartitionsExchangeFuture exchFut)
+ throws IgniteInterruptedCheckedException
+ {
U.writeLock(lock);
try {
@@ -356,9 +356,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
private void initPartitions0(GridDhtPartitionsExchangeFuture exchFut, long updateSeq) {
ClusterNode loc = cctx.localNode();
- ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx.shared(), topVer);
-
- assert oldest != null || cctx.kernalContext().clientNode();
+ ClusterNode oldest = currentCoordinator();
GridDhtPartitionExchangeId exchId = exchFut.exchangeId();
@@ -397,7 +395,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
if (log.isDebugEnabled())
log.debug("Owned partition for oldest node: " + locPart);
- updateLocal(p, loc.id(), locPart.state(), updateSeq);
+ updateSeq = updateLocal(p, locPart.state(), updateSeq);
}
}
}
@@ -419,7 +417,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
if (state.active()) {
locPart.rent(false);
- updateLocal(p, loc.id(), locPart.state(), updateSeq);
+ updateSeq = updateLocal(p, locPart.state(), updateSeq);
if (log.isDebugEnabled())
log.debug("Evicting partition with rebalancing disabled " +
@@ -443,8 +441,6 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
* @param updateSeq Update sequence.
*/
private void createPartitions(List<List<ClusterNode>> aff, long updateSeq) {
- ClusterNode loc = cctx.localNode();
-
int num = cctx.affinity().partitions();
for (int p = 0; p < num; p++) {
@@ -454,7 +450,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
// will be created in MOVING state.
GridDhtLocalPartition locPart = createPartition(p);
- updateLocal(p, loc.id(), locPart.state(), updateSeq);
+ updateSeq = updateLocal(p, locPart.state(), updateSeq);
}
}
// If this node's map is empty, we pre-create local partitions,
@@ -485,10 +481,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
if (exchId.isLeft())
removeNode(exchId.nodeId());
- // In case if node joins, get topology at the time of joining node.
- ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx.shared(), topVer);
-
- assert oldest != null || cctx.kernalContext().clientNode();
+ ClusterNode oldest = currentCoordinator();
if (log.isDebugEnabled())
log.debug("Partition map beforeExchange [exchId=" + exchId + ", fullMap=" + fullMapString() + ']');
@@ -548,8 +541,6 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
@Override public boolean afterExchange(GridDhtPartitionsExchangeFuture exchFut) throws IgniteCheckedException {
boolean changed = waitForRent();
- ClusterNode loc = cctx.localNode();
-
int num = cctx.affinity().partitions();
AffinityTopologyVersion topVer = exchFut.topologyVersion();
@@ -600,7 +591,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
assert owned : "Failed to own partition [cacheName" + cctx.name() + ", locPart=" +
locPart + ']';
- updateLocal(p, loc.id(), locPart.state(), updateSeq);
+ updateSeq = updateLocal(p, locPart.state(), updateSeq);
changed = true;
@@ -620,7 +611,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
locPart + ", owners = " + owners + ']');
}
else
- updateLocal(p, loc.id(), locPart.state(), updateSeq);
+ updateSeq = updateLocal(p, locPart.state(), updateSeq);
}
}
else {
@@ -630,7 +621,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
if (state == MOVING) {
locPart.rent(false);
- updateLocal(p, loc.id(), locPart.state(), updateSeq);
+ updateSeq = updateLocal(p, locPart.state(), updateSeq);
changed = true;
@@ -803,8 +794,11 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
map.put(i, part.state());
}
- return new GridDhtPartitionMap2(cctx.nodeId(), updateSeq.get(), topVer,
- Collections.unmodifiableMap(map), true);
+ return new GridDhtPartitionMap2(cctx.nodeId(),
+ updateSeq.get(),
+ topVer,
+ Collections.unmodifiableMap(map),
+ true);
}
finally {
lock.readLock().unlock();
@@ -985,7 +979,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
/** {@inheritDoc} */
@SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
- @Nullable @Override public GridDhtPartitionMap2 update(@Nullable GridDhtPartitionExchangeId exchId,
+ @Nullable @Override public boolean update(@Nullable GridDhtPartitionExchangeId exchId,
GridDhtPartitionFullMap partMap,
@Nullable Map<Integer, Long> cntrMap) {
if (log.isDebugEnabled())
@@ -997,7 +991,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
try {
if (stopping)
- return null;
+ return false;
if (cntrMap != null) {
for (Map.Entry<Integer, Long> e : cntrMap.entrySet()) {
@@ -1025,7 +1019,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
log.debug("Stale exchange id for full partition map update (will ignore) [lastExchId=" +
lastExchangeId + ", exchId=" + exchId + ']');
- return null;
+ return false;
}
if (node2part != null && node2part.compareTo(partMap) >= 0) {
@@ -1033,7 +1027,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
log.debug("Stale partition map for full partition map update (will ignore) [lastExchId=" +
lastExchangeId + ", exchId=" + exchId + ", curMap=" + node2part + ", newMap=" + partMap + ']');
- return null;
+ return false;
}
long updateSeq = this.updateSeq.incrementAndGet();
@@ -1076,7 +1070,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
node2part = partMap;
- Map<Integer, Set<UUID>> p2n = new HashMap<>(cctx.affinity().partitions(), 1.0f);
+ Map<Integer, Set<UUID>> p2n = U.newHashMap(cctx.affinity().partitions());
for (Map.Entry<UUID, GridDhtPartitionMap2> e : partMap.entrySet()) {
for (Integer p : e.getValue().keySet()) {
@@ -1110,7 +1104,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
if (log.isDebugEnabled())
log.debug("Partition map after full update: " + fullMapString());
- return changed ? localPartitionMap() : null;
+ return changed;
}
finally {
lock.writeLock().unlock();
@@ -1118,10 +1112,10 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
}
/** {@inheritDoc} */
- @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
- @Nullable @Override public GridDhtPartitionMap2 update(@Nullable GridDhtPartitionExchangeId exchId,
+ @Nullable @Override public boolean update(@Nullable GridDhtPartitionExchangeId exchId,
GridDhtPartitionMap2 parts,
- @Nullable Map<Integer, Long> cntrMap) {
+ @Nullable Map<Integer, Long> cntrMap,
+ boolean checkEvictions) {
if (log.isDebugEnabled())
log.debug("Updating single partition map [exchId=" + exchId + ", parts=" + mapString(parts) + ']');
@@ -1130,33 +1124,28 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
log.debug("Received partition update for non-existing node (will ignore) [exchId=" + exchId +
", parts=" + parts + ']');
- return null;
+ return false;
}
lock.writeLock().lock();
try {
if (stopping)
- return null;
+ return false;
if (cntrMap != null) {
for (Map.Entry<Integer, Long> e : cntrMap.entrySet()) {
- Long cntr = this.cntrMap.get(e.getKey());
+ Integer p = e.getKey();
- if (cntr == null || cntr < e.getValue())
- this.cntrMap.put(e.getKey(), e.getValue());
- }
-
- for (int i = 0; i < locParts.length(); i++) {
- GridDhtLocalPartition part = locParts.get(i);
+ Long cntr = this.cntrMap.get(p);
- if (part == null)
- continue;
+ if (cntr == null || cntr < e.getValue())
+ this.cntrMap.put(p, e.getValue());
- Long cntr = cntrMap.get(part.id());
+ GridDhtLocalPartition part = locParts.get(p);
- if (cntr != null)
- part.updateCounter(cntr);
+ if (part != null)
+ part.updateCounter(e.getValue());
}
}
@@ -1165,7 +1154,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
log.debug("Stale exchange id for single partition map update (will ignore) [lastExchId=" +
lastExchangeId + ", exchId=" + exchId + ']');
- return null;
+ return false;
}
if (exchId != null)
@@ -1182,60 +1171,91 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
log.debug("Stale update sequence for single partition map update (will ignore) [exchId=" + exchId +
", curSeq=" + cur.updateSequence() + ", newSeq=" + parts.updateSequence() + ']');
- return null;
+ return false;
}
long updateSeq = this.updateSeq.incrementAndGet();
- node2part = new GridDhtPartitionFullMap(node2part, updateSeq);
-
- boolean changed = false;
+ node2part.newUpdateSequence(updateSeq);
- if (cur == null || !cur.equals(parts))
- changed = true;
+ boolean changed = cur == null || !cur.equals(parts);
- node2part.put(parts.nodeId(), parts);
+ if (changed) {
+ node2part.put(parts.nodeId(), parts);
- part2node = new HashMap<>(part2node);
+ // Add new mappings.
+ for (Integer p : parts.keySet()) {
+ Set<UUID> ids = part2node.get(p);
- // Add new mappings.
- for (Integer p : parts.keySet()) {
- Set<UUID> ids = part2node.get(p);
+ if (ids == null)
+ // Initialize HashSet to size 3 in anticipation that there won't be
+ // more than 3 nodes per partition.
+ part2node.put(p, ids = U.newHashSet(3));
- if (ids == null)
- // Initialize HashSet to size 3 in anticipation that there won't be
- // more than 3 nodes per partition.
- part2node.put(p, ids = U.newHashSet(3));
+ ids.add(parts.nodeId());
+ }
- changed |= ids.add(parts.nodeId());
- }
+ // Remove obsolete mappings.
+ if (cur != null) {
+ for (Integer p : cur.keySet()) {
+ if (parts.containsKey(p))
+ continue;
- // Remove obsolete mappings.
- if (cur != null) {
- for (Integer p : F.view(cur.keySet(), F0.notIn(parts.keySet()))) {
- Set<UUID> ids = part2node.get(p);
+ Set<UUID> ids = part2node.get(p);
- if (ids != null)
- changed |= ids.remove(parts.nodeId());
+ if (ids != null)
+ ids.remove(parts.nodeId());
+ }
}
}
+ else
+ cur.updateSequence(parts.updateSequence(), parts.topologyVersion());
- AffinityTopologyVersion affVer = cctx.affinity().affinityTopologyVersion();
-
- if (!affVer.equals(AffinityTopologyVersion.NONE) && affVer.compareTo(topVer) >= 0) {
- List<List<ClusterNode>> aff = cctx.affinity().assignments(topVer);
-
- changed |= checkEvictions(updateSeq, aff);
-
- updateRebalanceVersion(aff);
- }
+ if (checkEvictions)
+ changed |= checkEvictions(updateSeq);
consistencyCheck();
if (log.isDebugEnabled())
log.debug("Partition map after single update: " + fullMapString());
- return changed ? localPartitionMap() : null;
+ return changed;
+ }
+ finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ /**
+ * @param updateSeq Update sequence.
+ * @return {@code True} if state changed.
+ */
+ private boolean checkEvictions(long updateSeq) {
+ AffinityTopologyVersion affVer = cctx.affinity().affinityTopologyVersion();
+
+ boolean changed = false;
+
+ if (!affVer.equals(AffinityTopologyVersion.NONE) && affVer.compareTo(topVer) >= 0) {
+ List<List<ClusterNode>> aff = cctx.affinity().assignments(topVer);
+
+ changed = checkEvictions(updateSeq, aff);
+
+ updateRebalanceVersion(aff);
+ }
+
+ return changed;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void checkEvictions() {
+ lock.writeLock().lock();
+
+ try {
+ long updateSeq = this.updateSeq.incrementAndGet();
+
+ node2part.newUpdateSequence(updateSeq);
+
+ checkEvictions(updateSeq);
}
finally {
lock.writeLock().unlock();
@@ -1270,7 +1290,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
if (nodeIds.containsAll(F.nodeIds(affNodes))) {
part.rent(false);
- updateLocal(part.id(), locId, part.state(), updateSeq);
+ updateSeq = updateLocal(part.id(), part.state(), updateSeq);
changed = true;
@@ -1295,7 +1315,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
if (locId.equals(n.id())) {
part.rent(false);
- updateLocal(part.id(), locId, part.state(), updateSeq);
+ updateSeq = updateLocal(part.id(), part.state(), updateSeq);
changed = true;
@@ -1316,19 +1336,27 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
}
/**
+ * @return Current coordinator node.
+ */
+ @Nullable private ClusterNode currentCoordinator() {
+ ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(topVer);
+
+ assert oldest != null || cctx.kernalContext().clientNode();
+
+ return oldest;
+ }
+
+ /**
* Updates value for single partition.
*
* @param p Partition.
- * @param nodeId Node ID.
* @param state State.
* @param updateSeq Update sequence.
+ * @return Update sequence.
*/
@SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
- private void updateLocal(int p, UUID nodeId, GridDhtPartitionState state, long updateSeq) {
- assert nodeId.equals(cctx.nodeId());
-
- // In case if node joins, get topology at the time of joining node.
- ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx.shared(), topVer);
+ private long updateLocal(int p, GridDhtPartitionState state, long updateSeq) {
+ ClusterNode oldest = currentCoordinator();
assert oldest != null || cctx.kernalContext().clientNode();
@@ -1338,12 +1366,16 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
if (seq != updateSeq) {
if (seq > updateSeq) {
- if (this.updateSeq.get() < seq) {
+ long seq0 = this.updateSeq.get();
+
+ if (seq0 < seq) {
// Update global counter if necessary.
- boolean b = this.updateSeq.compareAndSet(this.updateSeq.get(), seq + 1);
+ boolean b = this.updateSeq.compareAndSet(seq0, seq + 1);
- assert b : "Invalid update sequence [updateSeq=" + updateSeq + ", seq=" + seq +
- ", curUpdateSeq=" + this.updateSeq.get() + ", node2part=" + node2part.toFullString() + ']';
+ assert b : "Invalid update sequence [updateSeq=" + updateSeq +
+ ", seq=" + seq +
+ ", curUpdateSeq=" + this.updateSeq.get() +
+ ", node2part=" + node2part.toFullString() + ']';
updateSeq = seq + 1;
}
@@ -1355,11 +1387,19 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
}
}
- GridDhtPartitionMap2 map = node2part.get(nodeId);
+ UUID locNodeId = cctx.localNodeId();
+
+ GridDhtPartitionMap2 map = node2part.get(locNodeId);
- if (map == null)
- node2part.put(nodeId, map = new GridDhtPartitionMap2(nodeId, updateSeq, topVer,
- Collections.<Integer, GridDhtPartitionState>emptyMap(), false));
+ if (map == null) {
+ map = new GridDhtPartitionMap2(locNodeId,
+ updateSeq,
+ topVer,
+ Collections.<Integer, GridDhtPartitionState>emptyMap(),
+ false);
+
+ node2part.put(locNodeId, map);
+ }
map.updateSequence(updateSeq, topVer);
@@ -1370,7 +1410,9 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
if (ids == null)
part2node.put(p, ids = U.newHashSet(3));
- ids.add(nodeId);
+ ids.add(locNodeId);
+
+ return updateSeq;
}
/**
@@ -1395,8 +1437,6 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
else
node2part = new GridDhtPartitionFullMap(node2part, node2part.updateSequence());
- part2node = new HashMap<>(part2node);
-
GridDhtPartitionMap2 parts = node2part.remove(nodeId);
if (parts != null) {
@@ -1418,13 +1458,11 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
/** {@inheritDoc} */
@Override public boolean own(GridDhtLocalPartition part) {
- ClusterNode loc = cctx.localNode();
-
lock.writeLock().lock();
try {
if (part.own()) {
- updateLocal(part.id(), loc.id(), part.state(), updateSeq.incrementAndGet());
+ updateLocal(part.id(), part.state(), updateSeq.incrementAndGet());
consistencyCheck();
@@ -1452,7 +1490,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
long seq = updateSeq ? this.updateSeq.incrementAndGet() : this.updateSeq.get();
- updateLocal(part.id(), cctx.localNodeId(), part.state(), seq);
+ updateLocal(part.id(), part.state(), seq);
consistencyCheck();
}
@@ -1462,18 +1500,6 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
}
/** {@inheritDoc} */
- @Nullable @Override public GridDhtPartitionMap2 partitions(UUID nodeId) {
- lock.readLock().lock();
-
- try {
- return node2part.get(nodeId);
- }
- finally {
- lock.readLock().unlock();
- }
- }
-
- /** {@inheritDoc} */
@Override public Map<Integer, Long> updateCounters(boolean skipZeros) {
lock.readLock().lock();
@@ -1526,6 +1552,30 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
}
/** {@inheritDoc} */
+ @Override public boolean hasMovingPartitions() {
+ lock.readLock().lock();
+
+ try {
+ assert node2part != null && node2part.valid() : "Invalid node2part [node2part: " + node2part +
+ ", cache=" + cctx.name() +
+ ", started=" + cctx.started() +
+ ", stopping=" + stopping +
+ ", locNodeId=" + cctx.localNode().id() +
+ ", locName=" + cctx.gridName() + ']';
+
+ for (GridDhtPartitionMap2 map : node2part.values()) {
+ if (map.hasMovingPartitions())
+ return true;
+ }
+
+ return false;
+ }
+ finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
@Override public void printMemoryStats(int threshold) {
X.println(">>> Cache partition topology stats [grid=" + cctx.gridName() + ", cache=" + cctx.name() + ']');
@@ -1607,10 +1657,12 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
if (state == match)
return true;
- if (matches != null && matches.length > 0)
- for (GridDhtPartitionState s : matches)
+ if (matches != null && matches.length > 0) {
+ for (GridDhtPartitionState s : matches) {
if (state == s)
return true;
+ }
+ }
}
return false;
http://git-wip-us.apache.org/repos/asf/ignite/blob/acf20b32/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java
index 8f5ad17..e8860f2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java
@@ -103,10 +103,13 @@ public class GridDhtPartitionFullMap extends HashMap<UUID, GridDhtPartitionMap2>
for (Map.Entry<UUID, GridDhtPartitionMap2> e : m.entrySet()) {
GridDhtPartitionMap2 part = e.getValue();
- if (onlyActive)
- put(e.getKey(), new GridDhtPartitionMap2(part.nodeId(), part.updateSequence(), part.topologyVersion(), part.map(), true));
- else
- put(e.getKey(), part);
+ GridDhtPartitionMap2 cpy = new GridDhtPartitionMap2(part.nodeId(),
+ part.updateSequence(),
+ part.topologyVersion(),
+ part.map(),
+ onlyActive);
+
+ put(e.getKey(), cpy);
}
}
@@ -177,6 +180,13 @@ public class GridDhtPartitionFullMap extends HashMap<UUID, GridDhtPartitionMap2>
/**
* @param updateSeq New update sequence value.
+ */
+ public void newUpdateSequence(long updateSeq) {
+ this.updateSeq = updateSeq;
+ }
+
+ /**
+ * @param updateSeq New update sequence value.
* @return Old update sequence value.
*/
public long updateSequence(long updateSeq) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/acf20b32/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index f391265..e945de9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -112,6 +112,10 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
/** */
@GridToStringExclude
+ private int pendingSingleUpdates;
+
+ /** */
+ @GridToStringExclude
private List<ClusterNode> srvNodes;
/** */
@@ -1162,13 +1166,16 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
*/
private void processMessage(ClusterNode node, GridDhtPartitionsSingleMessage msg) {
boolean allReceived = false;
+ boolean updateSingleMap = false;
synchronized (mux) {
assert crd != null;
if (crd.isLocal()) {
if (remaining.remove(node.id())) {
- updatePartitionSingleMap(msg);
+ updateSingleMap = true;
+
+ pendingSingleUpdates++;
allReceived = remaining.isEmpty();
}
@@ -1177,8 +1184,42 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
singleMsgs.put(node, msg);
}
- if (allReceived)
+ if (updateSingleMap) {
+ try {
+ updatePartitionSingleMap(msg);
+ }
+ finally {
+ synchronized (mux) {
+ assert pendingSingleUpdates > 0;
+
+ pendingSingleUpdates--;
+
+ if (pendingSingleUpdates == 0)
+ mux.notifyAll();
+ }
+ }
+ }
+
+ if (allReceived) {
+ awaitSingleMapUpdates();
+
onAllReceived(false);
+ }
+ }
+
+ /**
+ *
+ */
+ private void awaitSingleMapUpdates() {
+ synchronized (mux) {
+ try {
+ while (pendingSingleUpdates > 0)
+ U.wait(mux);
+ }
+ catch (IgniteInterruptedCheckedException e) {
+ U.warn(log, "Failed to wait for partition map updates, thread was interrupted: " + e);
+ }
+ }
}
/**
@@ -1218,6 +1259,11 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
}
}
+ for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
+ if (!cacheCtx.isLocal())
+ cacheCtx.topology().checkEvictions();
+ }
+
updateLastVersion(cctx.versions().last());
cctx.versions().onExchange(lastVer.get().order());
@@ -1374,7 +1420,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
if (cacheCtx != null)
cacheCtx.topology().update(exchId, entry.getValue(), cntrMap);
else {
- ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx, AffinityTopologyVersion.NONE);
+ ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(AffinityTopologyVersion.NONE);
if (oldest != null && oldest.isLocal())
cctx.exchange().clientTopology(cacheId, this).update(exchId, entry.getValue(), cntrMap);
@@ -1395,7 +1441,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
GridDhtPartitionTopology top = cacheCtx != null ? cacheCtx.topology() :
cctx.exchange().clientTopology(cacheId, this);
- top.update(exchId, entry.getValue(), msg.partitionUpdateCounters(cacheId));
+ top.update(exchId, entry.getValue(), msg.partitionUpdateCounters(cacheId), false);
}
}
@@ -1557,6 +1603,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
if (crd0.isLocal()) {
if (allReceived) {
+ awaitSingleMapUpdates();
+
onAllReceived(true);
return;
http://git-wip-us.apache.org/repos/asf/ignite/blob/acf20b32/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index fbd8ce5..cf69264 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -679,14 +679,14 @@ public class IgniteTxHandler {
* @param req Request.
* @return Future.
*/
- @Nullable public IgniteInternalFuture<IgniteInternalTx> processNearTxFinishRequest(UUID nodeId,
+ @Nullable private IgniteInternalFuture<IgniteInternalTx> processNearTxFinishRequest(UUID nodeId,
GridNearTxFinishRequest req) {
if (txFinishMsgLog.isDebugEnabled())
txFinishMsgLog.debug("Received near finish request [txId=" + req.version() + ", node=" + nodeId + ']');
IgniteInternalFuture<IgniteInternalTx> fut = finish(nodeId, null, req);
- assert req.txState() != null || fut.error() != null ||
+ assert req.txState() != null || (fut != null && fut.error() != null) ||
(ctx.tm().tx(req.version()) == null && ctx.tm().nearTx(req.version()) == null);
return fut;
http://git-wip-us.apache.org/repos/asf/ignite/blob/acf20b32/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index 6c26363..b9b92b8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -1265,7 +1265,7 @@ public class GridServiceProcessor extends GridProcessorAdapter {
try {
if (!cache.context().affinityNode()) {
ClusterNode oldestSrvNode =
- CU.oldestAliveCacheServerNode(cache.context().shared(), AffinityTopologyVersion.NONE);
+ ctx.discovery().oldestAliveCacheServerNode(AffinityTopologyVersion.NONE);
if (oldestSrvNode == null)
return new GridEmptyIterator<>();
@@ -1589,7 +1589,7 @@ public class GridServiceProcessor extends GridProcessorAdapter {
depExe.submit(new BusyRunnable() {
@Override public void run0() {
- ClusterNode oldest = CU.oldestAliveCacheServerNode(cache.context().shared(), topVer);
+ ClusterNode oldest = ctx.discovery().oldestAliveCacheServerNode(topVer);
if (oldest != null && oldest.isLocal()) {
final Collection<GridServiceDeployment> retries = new ConcurrentLinkedQueue<>();
http://git-wip-us.apache.org/repos/asf/ignite/blob/acf20b32/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 8814745..a660ec8 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -129,6 +129,7 @@ import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingRequest;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingResponse;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryRedirectToClient;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryStatusCheckMessage;
+import org.apache.ignite.thread.IgniteThreadPoolExecutor;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
@@ -174,8 +175,7 @@ class ServerImpl extends TcpDiscoveryImpl {
IgniteProductVersion.fromString("1.5.0");
/** */
- private final ThreadPoolExecutor utilityPool = new ThreadPoolExecutor(0, 1, 2000, TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue<Runnable>());
+ private IgniteThreadPoolExecutor utilityPool;
/** Nodes ring. */
@GridToStringExclude
@@ -297,6 +297,13 @@ class ServerImpl extends TcpDiscoveryImpl {
spiState = DISCONNECTED;
}
+ utilityPool = new IgniteThreadPoolExecutor("disco-pool",
+ spi.ignite().name(),
+ 0,
+ 1,
+ 2000,
+ new LinkedBlockingQueue<Runnable>());
+
if (debugMode) {
if (!log.isInfoEnabled())
throw new IgniteSpiException("Info log level should be enabled for TCP discovery to work " +
@@ -2403,9 +2410,12 @@ class ServerImpl extends TcpDiscoveryImpl {
/** Connection check threshold. */
private long connCheckThreshold;
+ /** */
+ private long lastRingMsgTime;
+
/**
*/
- protected RingMessageWorker() {
+ RingMessageWorker() {
super("tcp-disco-msg-worker", 10);
initConnectionCheckFrequency();
@@ -2500,6 +2510,8 @@ class ServerImpl extends TcpDiscoveryImpl {
* @param msg Message to process.
*/
@Override protected void processMessage(TcpDiscoveryAbstractMessage msg) {
+ sendHeartbeatMessage();
+
DebugLogger log = messageLogger(msg);
if (log.isDebugEnabled())
@@ -2508,6 +2520,11 @@ class ServerImpl extends TcpDiscoveryImpl {
if (debugMode)
debugLog(msg, "Processing message [cls=" + msg.getClass().getSimpleName() + ", id=" + msg.id() + ']');
+ boolean ensured = spi.ensured(msg);
+
+ if (!locNode.id().equals(msg.senderNodeId()) && ensured)
+ lastRingMsgTime = U.currentTimeMillis();
+
if (locNode.internalOrder() == 0) {
boolean proc = false;
@@ -2564,7 +2581,7 @@ class ServerImpl extends TcpDiscoveryImpl {
else
assert false : "Unknown message type: " + msg.getClass().getSimpleName();
- if (spi.ensured(msg) && redirectToClients(msg))
+ if (ensured && redirectToClients(msg))
msgHist.add(msg);
if (msg.senderNodeId() != null && !msg.senderNodeId().equals(getLocalNodeId())) {
@@ -5336,12 +5353,9 @@ class ServerImpl extends TcpDiscoveryImpl {
* Sends heartbeat message if needed.
*/
private void sendHeartbeatMessage() {
- if (!isLocalNodeCoordinator())
- return;
-
long elapsed = (lastTimeHbMsgSent + spi.hbFreq) - U.currentTimeMillis();
- if (elapsed > 0)
+ if (elapsed > 0 || !isLocalNodeCoordinator())
return;
TcpDiscoveryHeartbeatMessage msg = new TcpDiscoveryHeartbeatMessage(getConfiguredNodeId());
@@ -5361,7 +5375,9 @@ class ServerImpl extends TcpDiscoveryImpl {
if (lastTimeStatusMsgSent < locNode.lastUpdateTime())
lastTimeStatusMsgSent = locNode.lastUpdateTime();
- long elapsed = (lastTimeStatusMsgSent + hbCheckFreq) - U.currentTimeMillis();
+ long updateTime = Math.max(lastTimeStatusMsgSent, lastRingMsgTime);
+
+ long elapsed = (updateTime + hbCheckFreq) - U.currentTimeMillis();
if (elapsed > 0)
return;
@@ -6062,11 +6078,11 @@ class ServerImpl extends TcpDiscoveryImpl {
TcpDiscoverySpiState state = spiStateCopy();
- long socketTimeout = spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() :
+ long sockTimeout = spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() :
spi.getSocketTimeout();
if (state == CONNECTED) {
- spi.writeToSocket(msg, sock, RES_OK, socketTimeout);
+ spi.writeToSocket(msg, sock, RES_OK, sockTimeout);
if (log.isDebugEnabled())
log.debug("Responded to join request message [msg=" + msg + ", res=" + RES_OK + ']');
@@ -6103,7 +6119,7 @@ class ServerImpl extends TcpDiscoveryImpl {
// Local node is stopping. Remote node should try next one.
res = RES_CONTINUE_JOIN;
- spi.writeToSocket(msg, sock, res, socketTimeout);
+ spi.writeToSocket(msg, sock, res, sockTimeout);
if (log.isDebugEnabled())
log.debug("Responded to join request message [msg=" + msg + ", res=" + res + ']');
http://git-wip-us.apache.org/repos/asf/ignite/blob/acf20b32/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryStatistics.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryStatistics.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryStatistics.java
index 9e73632..c790644 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryStatistics.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryStatistics.java
@@ -28,6 +28,8 @@ import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddFinishedMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddedMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeFailedMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeLeftMessage;
@@ -317,7 +319,9 @@ public class TcpDiscoveryStatistics {
assert time >= 0 : time;
if (crdSinceTs.get() > 0 &&
+ (msg instanceof TcpDiscoveryCustomEventMessage) ||
(msg instanceof TcpDiscoveryNodeAddedMessage) ||
+ (msg instanceof TcpDiscoveryNodeAddFinishedMessage) ||
(msg instanceof TcpDiscoveryNodeLeftMessage) ||
(msg instanceof TcpDiscoveryNodeFailedMessage)) {
ringMsgsSndTs.put(msg.id(), U.currentTimeMillis());
http://git-wip-us.apache.org/repos/asf/ignite/blob/acf20b32/modules/core/src/test/java/org/apache/ignite/cache/affinity/AbstractAffinityFunctionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/AbstractAffinityFunctionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/AbstractAffinityFunctionSelfTest.java
index 878d7d1..43017db 100644
--- a/modules/core/src/test/java/org/apache/ignite/cache/affinity/AbstractAffinityFunctionSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cache/affinity/AbstractAffinityFunctionSelfTest.java
@@ -104,6 +104,7 @@ public abstract class AbstractAffinityFunctionSelfTest extends GridCommonAbstrac
}
/**
+ * @param backups Number of backups.
* @throws Exception If failed.
*/
protected void checkNodeRemoved(int backups) throws Exception {
@@ -247,7 +248,6 @@ public abstract class AbstractAffinityFunctionSelfTest extends GridCommonAbstrac
}
}
-
/**
* @param assignment Assignment to verify.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/acf20b32/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerAliveCacheSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerAliveCacheSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerAliveCacheSelfTest.java
index 390c83e..31b4bc7 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerAliveCacheSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerAliveCacheSelfTest.java
@@ -239,7 +239,7 @@ public class GridDiscoveryManagerAliveCacheSelfTest extends GridCommonAbstractTe
GridCacheSharedContext<?, ?> ctx = k.context().cache().context();
ClusterNode oldest =
- GridCacheUtils.oldestAliveCacheServerNode(ctx, new AffinityTopologyVersion(currVer));
+ ctx.discovery().oldestAliveCacheServerNode(new AffinityTopologyVersion(currVer));
assertNotNull(oldest);
http://git-wip-us.apache.org/repos/asf/ignite/blob/acf20b32/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedAffinitySpreadTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedAffinitySpreadTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedAffinitySpreadTest.java
index a59ca8b..2d46cf4 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedAffinitySpreadTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedAffinitySpreadTest.java
@@ -76,7 +76,12 @@ public class GridCachePartitionedAffinitySpreadTest extends GridCommonAbstractTe
Map<ClusterNode, Integer> parts = new HashMap<>(nodes.size());
for (int part = 0; part < aff.getPartitions(); part++) {
- Collection<ClusterNode> affNodes = aff.assignPartition(part, new ArrayList(nodes), 0, null);
+ Collection<ClusterNode> affNodes = aff.assignPartition(null,
+ part,
+ new ArrayList<>(nodes),
+ new HashMap<ClusterNode, byte[]>(),
+ 0,
+ null);
assertEquals(1, affNodes.size());