You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/11/24 10:28:30 UTC
ignite git commit: ignite-4296
Repository: ignite
Updated Branches:
refs/heads/ignite-4296 [created] d4477e845
ignite-4296
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d4477e84
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d4477e84
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d4477e84
Branch: refs/heads/ignite-4296
Commit: d4477e8456731db6acb7e669dfc89de2de0341cc
Parents: 7a47a01
Author: sboikov <sb...@gridgain.com>
Authored: Thu Nov 24 12:19:59 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Nov 24 13:26:26 2016 +0300
----------------------------------------------------------------------
.../rendezvous/RendezvousAffinityFunction.java | 80 +++++++++----
.../discovery/GridDiscoveryManager.java | 114 ++-----------------
.../GridCachePartitionExchangeManager.java | 6 +-
.../processors/cache/GridCacheUtils.java | 17 ---
.../binary/CacheObjectBinaryProcessorImpl.java | 3 +-
.../dht/GridClientPartitionTopology.java | 26 ++---
.../dht/GridDhtPartitionTopology.java | 8 +-
.../dht/GridDhtPartitionTopologyImpl.java | 105 +++++++++--------
.../GridDhtPartitionsExchangeFuture.java | 49 +++++++-
.../service/GridServiceProcessor.java | 4 +-
.../ignite/spi/discovery/tcp/ServerImpl.java | 41 +++++--
.../AbstractAffinityFunctionSelfTest.java | 2 +-
.../GridDiscoveryManagerAliveCacheSelfTest.java | 2 +-
.../GridCachePartitionedAffinitySpreadTest.java | 7 +-
14 files changed, 227 insertions(+), 237 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/d4477e84/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java b/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java
index ec12973..c76aae8 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java
@@ -17,7 +17,6 @@
package org.apache.ignite.cache.affinity.rendezvous;
-import java.io.ByteArrayOutputStream;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
@@ -354,46 +353,69 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza
/**
* Returns collection of nodes (primary first) for specified partition.
+ *
+ * @param d Message digest.
+ * @param part Partition.
+ * @param nodes Nodes.
+ * @param nodesHash Serialized nodes hashes.
+ * @param backups Number of backups.
+ * @param neighborhoodCache Neighborhood.
+ * @return Assignment.
*/
- public List<ClusterNode> assignPartition(int part, List<ClusterNode> nodes, int backups,
+ public List<ClusterNode> assignPartition(MessageDigest d,
+ int part,
+ List<ClusterNode> nodes,
+ Map<ClusterNode, byte[]> nodesHash,
+ int backups,
@Nullable Map<UUID, Collection<ClusterNode>> neighborhoodCache) {
if (nodes.size() <= 1)
return nodes;
- List<IgniteBiTuple<Long, ClusterNode>> lst = new ArrayList<>();
+ if (d == null)
+ d = digest.get();
- MessageDigest d = digest.get();
+ List<IgniteBiTuple<Long, ClusterNode>> lst = new ArrayList<>(nodes.size());
- for (ClusterNode node : nodes) {
- Object nodeHash = resolveNodeHash(node);
+ try {
+ for (int i = 0; i < nodes.size(); i++) {
+ ClusterNode node = nodes.get(i);
- try {
- ByteArrayOutputStream out = new ByteArrayOutputStream();
+ byte[] nodeHashBytes = nodesHash.get(node);
- byte[] nodeHashBytes = U.marshal(ignite.configuration().getMarshaller(), nodeHash);
+ if (nodeHashBytes == null) {
+ Object nodeHash = resolveNodeHash(node);
- out.write(U.intToBytes(part), 0, 4); // Avoid IOException.
- out.write(nodeHashBytes, 0, nodeHashBytes.length); // Avoid IOException.
+ byte[] nodeHashBytes0 = U.marshal(ignite.configuration().getMarshaller(), nodeHash);
+
+ // Add 4 bytes for partition bytes.
+ nodeHashBytes = new byte[nodeHashBytes0.length + 4];
+
+ System.arraycopy(nodeHashBytes0, 0, nodeHashBytes, 4, nodeHashBytes0.length);
+
+ nodesHash.put(node, nodeHashBytes0);
+ }
+
+ U.intToBytes(part, nodeHashBytes, 0);
d.reset();
- byte[] bytes = d.digest(out.toByteArray());
+ byte[] bytes = d.digest(nodeHashBytes);
long hash =
- (bytes[0] & 0xFFL)
- | ((bytes[1] & 0xFFL) << 8)
- | ((bytes[2] & 0xFFL) << 16)
- | ((bytes[3] & 0xFFL) << 24)
- | ((bytes[4] & 0xFFL) << 32)
- | ((bytes[5] & 0xFFL) << 40)
- | ((bytes[6] & 0xFFL) << 48)
- | ((bytes[7] & 0xFFL) << 56);
+ (bytes[0] & 0xFFL)
+ | ((bytes[1] & 0xFFL) << 8)
+ | ((bytes[2] & 0xFFL) << 16)
+ | ((bytes[3] & 0xFFL) << 24)
+ | ((bytes[4] & 0xFFL) << 32)
+ | ((bytes[5] & 0xFFL) << 40)
+ | ((bytes[6] & 0xFFL) << 48)
+ | ((bytes[7] & 0xFFL) << 56);
lst.add(F.t(hash, node));
}
- catch (IgniteCheckedException e) {
- throw new IgniteException(e);
- }
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
}
Collections.sort(lst, COMPARATOR);
@@ -474,8 +496,18 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza
Map<UUID, Collection<ClusterNode>> neighborhoodCache = exclNeighbors ?
GridCacheUtils.neighbors(affCtx.currentTopologySnapshot()) : null;
+ MessageDigest d = digest.get();
+
+ List<ClusterNode> nodes = affCtx.currentTopologySnapshot();
+
+ Map<ClusterNode, byte[]> nodesHash = U.newHashMap(nodes.size());
+
for (int i = 0; i < parts; i++) {
- List<ClusterNode> partAssignment = assignPartition(i, affCtx.currentTopologySnapshot(), affCtx.backups(),
+ List<ClusterNode> partAssignment = assignPartition(d,
+ i,
+ nodes,
+ nodesHash,
+ affCtx.backups(),
neighborhoodCache);
assignments.add(partAssignment);
http://git-wip-us.apache.org/repos/asf/ignite/blob/d4477e84/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index d24f900..488dabe 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -42,7 +42,7 @@ import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
@@ -1623,17 +1623,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
/**
* Gets cache remote nodes for cache with given name.
*
- * @param cacheName Cache name.
- * @param topVer Topology version.
- * @return Collection of cache nodes.
- */
- public Collection<ClusterNode> remoteCacheNodes(@Nullable String cacheName, AffinityTopologyVersion topVer) {
- return resolveDiscoCache(cacheName, topVer).remoteCacheNodes(cacheName, topVer.topologyVersion());
- }
-
- /**
- * Gets cache remote nodes for cache with given name.
- *
* @param topVer Topology version.
* @return Collection of cache nodes.
*/
@@ -1664,33 +1653,15 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
}
/**
- * Gets alive remote server nodes with at least one cache configured.
- *
* @param topVer Topology version (maximum allowed node order).
- * @return Collection of alive cache nodes.
+ * @return Oldest alive server nodes with at least one cache configured.
*/
- public Collection<ClusterNode> aliveRemoteServerNodesWithCaches(AffinityTopologyVersion topVer) {
- return resolveDiscoCache(null, topVer).aliveRemoteServerNodesWithCaches(topVer.topologyVersion());
- }
+ @Nullable public ClusterNode oldestAliveCacheServerNode(AffinityTopologyVersion topVer) {
+ DiscoCache cache = resolveDiscoCache(null, topVer);
- /**
- * Gets alive server nodes with at least one cache configured.
- *
- * @param topVer Topology version (maximum allowed node order).
- * @return Collection of alive cache nodes.
- */
- public Collection<ClusterNode> aliveServerNodesWithCaches(AffinityTopologyVersion topVer) {
- return resolveDiscoCache(null, topVer).aliveServerNodesWithCaches(topVer.topologyVersion());
- }
+ Map.Entry<ClusterNode, Boolean> e = cache.aliveSrvNodesWithCaches.firstEntry();
- /**
- * Gets alive nodes with at least one cache configured.
- *
- * @param topVer Topology version (maximum allowed node order).
- * @return Collection of alive cache nodes.
- */
- public Collection<ClusterNode> aliveNodesWithCaches(AffinityTopologyVersion topVer) {
- return resolveDiscoCache(null, topVer).aliveNodesWithCaches(topVer.topologyVersion());
+ return e != null ? e.getKey() : null;
}
/**
@@ -2580,19 +2551,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
private final ConcurrentMap<String, Collection<ClusterNode>> aliveRmtCacheNodes;
/**
- * Cached alive remote nodes with caches.
- */
- private final Collection<ClusterNode> aliveNodesWithCaches;
-
- /**
* Cached alive server remote nodes with caches.
*/
- private final Collection<ClusterNode> aliveSrvNodesWithCaches;
-
- /**
- * Cached alive remote server nodes with caches.
- */
- private final Collection<ClusterNode> aliveRmtSrvNodesWithCaches;
+ private final ConcurrentSkipListMap<ClusterNode, Boolean> aliveSrvNodesWithCaches;
/**
* @param loc Local node.
@@ -2625,9 +2586,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
aliveCacheNodes = new ConcurrentHashMap8<>(allNodes.size(), 1.0f);
aliveRmtCacheNodes = new ConcurrentHashMap8<>(allNodes.size(), 1.0f);
- aliveNodesWithCaches = new ConcurrentSkipListSet<>();
- aliveSrvNodesWithCaches = new ConcurrentSkipListSet<>();
- aliveRmtSrvNodesWithCaches = new ConcurrentSkipListSet<>();
+ aliveSrvNodesWithCaches = new ConcurrentSkipListMap<>(GridNodeOrderComparator.INSTANCE);
nodesByVer = new TreeMap<>();
long maxOrder0 = 0;
@@ -2681,18 +2640,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
}
}
- if (hasCaches) {
- if (alive(node.id())) {
- aliveNodesWithCaches.add(node);
-
- if (!CU.clientNode(node)) {
- aliveSrvNodesWithCaches.add(node);
-
- if (!loc.id().equals(node.id()))
- aliveRmtSrvNodesWithCaches.add(node);
- }
- }
- }
+ if (hasCaches && alive(node.id()) && !CU.clientNode(node))
+ aliveSrvNodesWithCaches.put(node, Boolean.TRUE);
IgniteProductVersion nodeVer = U.productVersion(node);
@@ -2821,17 +2770,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
}
/**
- * Gets all remote nodes that have cache with given name.
- *
- * @param cacheName Cache name.
- * @param topVer Topology version.
- * @return Collection of nodes.
- */
- Collection<ClusterNode> remoteCacheNodes(@Nullable String cacheName, final long topVer) {
- return filter(topVer, rmtCacheNodes.get(cacheName));
- }
-
- /**
* Gets all remote nodes that have at least one cache configured.
*
* @param topVer Topology version.
@@ -2876,36 +2814,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
}
/**
- * Gets all alive remote server nodes with at least one cache configured.
- *
- * @param topVer Topology version.
- * @return Collection of nodes.
- */
- Collection<ClusterNode> aliveRemoteServerNodesWithCaches(final long topVer) {
- return filter(topVer, aliveRmtSrvNodesWithCaches);
- }
-
- /**
- * Gets all alive server nodes with at least one cache configured.
- *
- * @param topVer Topology version.
- * @return Collection of nodes.
- */
- Collection<ClusterNode> aliveServerNodesWithCaches(final long topVer) {
- return filter(topVer, aliveSrvNodesWithCaches);
- }
-
- /**
- * Gets all alive remote nodes with at least one cache configured.
- *
- * @param topVer Topology version.
- * @return Collection of nodes.
- */
- Collection<ClusterNode> aliveNodesWithCaches(final long topVer) {
- return filter(topVer, aliveNodesWithCaches);
- }
-
- /**
* Checks if cache with given name has at least one node with near cache enabled.
*
* @param cacheName Cache name.
@@ -2928,9 +2836,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
filterNodeMap(aliveRmtCacheNodes, leftNode);
- aliveNodesWithCaches.remove(leftNode);
aliveSrvNodesWithCaches.remove(leftNode);
- aliveRmtSrvNodesWithCaches.remove(leftNode);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/d4477e84/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 503b334..5651e58 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -740,7 +740,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
* Partition refresh callback.
*/
private void refreshPartitions() {
- ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx, AffinityTopologyVersion.NONE);
+ ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(AffinityTopologyVersion.NONE);
if (oldest == null) {
if (log.isDebugEnabled())
@@ -1224,7 +1224,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
top = cacheCtx.topology();
if (top != null)
- updated |= top.update(null, entry.getValue(), null) != null;
+ updated |= top.update(null, entry.getValue(), null);
}
if (!cctx.kernalContext().clientNode() && updated)
@@ -1273,7 +1273,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
top = cacheCtx.topology();
if (top != null) {
- updated |= top.update(null, entry.getValue(), null) != null;
+ updated |= top.update(null, entry.getValue(), null);
cctx.affinity().checkRebalanceState(top, cacheId);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/d4477e84/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index 90e428c..d32f4c1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -490,23 +490,6 @@ public class GridCacheUtils {
}
/**
- * Gets oldest alive server node with at least one cache configured for specified topology version.
- *
- * @param ctx Context.
- * @param topVer Maximum allowed topology version.
- * @return Oldest alive cache server node.
- */
- @Nullable public static ClusterNode oldestAliveCacheServerNode(GridCacheSharedContext ctx,
- AffinityTopologyVersion topVer) {
- Collection<ClusterNode> nodes = ctx.discovery().aliveServerNodesWithCaches(topVer);
-
- if (nodes.isEmpty())
- return null;
-
- return oldest(nodes);
- }
-
- /**
* @param nodes Nodes.
* @return Oldest node for the given topology version.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/d4477e84/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
index 568a4da..1d60c42 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
@@ -304,8 +304,7 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
assert !metaDataCache.context().affinityNode();
while (true) {
- ClusterNode oldestSrvNode =
- CU.oldestAliveCacheServerNode(ctx.cache().context(), AffinityTopologyVersion.NONE);
+ ClusterNode oldestSrvNode = ctx.discovery().oldestAliveCacheServerNode(AffinityTopologyVersion.NONE);
if (oldestSrvNode == null)
break;
http://git-wip-us.apache.org/repos/asf/ignite/blob/d4477e84/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index 5efb317..f2ee758 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -271,7 +271,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
removeNode(exchId.nodeId());
// In case if node joins, get topology at the time of joining node.
- ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx, topVer);
+ ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(topVer);
assert oldest != null;
@@ -549,7 +549,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
/** {@inheritDoc} */
@SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
- @Nullable @Override public GridDhtPartitionMap2 update(@Nullable GridDhtPartitionExchangeId exchId,
+ @Nullable @Override public boolean update(@Nullable GridDhtPartitionExchangeId exchId,
GridDhtPartitionFullMap partMap,
Map<Integer, Long> cntrMap) {
if (log.isDebugEnabled())
@@ -563,7 +563,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
log.debug("Stale exchange id for full partition map update (will ignore) [lastExchId=" +
lastExchangeId + ", exchId=" + exchId + ']');
- return null;
+ return false;
}
if (node2part != null && node2part.compareTo(partMap) >= 0) {
@@ -571,7 +571,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
log.debug("Stale partition map for full partition map update (will ignore) [lastExchId=" +
lastExchangeId + ", exchId=" + exchId + ", curMap=" + node2part + ", newMap=" + partMap + ']');
- return null;
+ return false;
}
updateSeq.incrementAndGet();
@@ -634,7 +634,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
if (log.isDebugEnabled())
log.debug("Partition map after full update: " + fullMapString());
- return null;
+ return false;
}
finally {
lock.writeLock().unlock();
@@ -643,7 +643,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
/** {@inheritDoc} */
@SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
- @Nullable @Override public GridDhtPartitionMap2 update(@Nullable GridDhtPartitionExchangeId exchId,
+ @Nullable @Override public boolean update(@Nullable GridDhtPartitionExchangeId exchId,
GridDhtPartitionMap2 parts,
Map<Integer, Long> cntrMap) {
if (log.isDebugEnabled())
@@ -654,21 +654,21 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
log.debug("Received partition update for non-existing node (will ignore) [exchId=" + exchId +
", parts=" + parts + ']');
- return null;
+ return false;
}
lock.writeLock().lock();
try {
if (stopping)
- return null;
+ return false;
if (lastExchangeId != null && exchId != null && lastExchangeId.compareTo(exchId) > 0) {
if (log.isDebugEnabled())
log.debug("Stale exchange id for single partition map update (will ignore) [lastExchId=" +
lastExchangeId + ", exchId=" + exchId + ']');
- return null;
+ return false;
}
if (exchId != null)
@@ -688,7 +688,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
log.debug("Stale update sequence for single partition map update (will ignore) [exchId=" + exchId +
", curSeq=" + cur.updateSequence() + ", newSeq=" + parts.updateSequence() + ']');
- return null;
+ return false;
}
long updateSeq = this.updateSeq.incrementAndGet();
@@ -740,7 +740,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
if (log.isDebugEnabled())
log.debug("Partition map after single update: " + fullMapString());
- return changed ? localPartitionMap() : null;
+ return changed;
}
finally {
lock.writeLock().unlock();
@@ -761,7 +761,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
assert nodeId.equals(cctx.localNodeId());
// In case if node joins, get topology at the time of joining node.
- ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx, topVer);
+ ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(topVer);
// If this node became the oldest node.
if (oldest.id().equals(cctx.localNodeId())) {
@@ -811,7 +811,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
assert nodeId != null;
assert lock.writeLock().isHeldByCurrentThread();
- ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx, topVer);
+ ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(topVer);
ClusterNode loc = cctx.localNode();
http://git-wip-us.apache.org/repos/asf/ignite/blob/d4477e84/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
index 0f75a5d..33a6fdf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
@@ -203,9 +203,9 @@ public interface GridDhtPartitionTopology {
* @param exchId Exchange ID.
* @param partMap Update partition map.
* @param cntrMap Partition update counters.
- * @return Local partition map if there were evictions or {@code null} otherwise.
+ * @return {@code True} if topology state changed.
*/
- public GridDhtPartitionMap2 update(@Nullable GridDhtPartitionExchangeId exchId,
+ public boolean update(@Nullable GridDhtPartitionExchangeId exchId,
GridDhtPartitionFullMap partMap,
@Nullable Map<Integer, Long> cntrMap);
@@ -213,9 +213,9 @@ public interface GridDhtPartitionTopology {
* @param exchId Exchange ID.
* @param parts Partitions.
* @param cntrMap Partition update counters.
- * @return Local partition map if there were evictions or {@code null} otherwise.
+ * @return {@code True} if topology state changed.
*/
- @Nullable public GridDhtPartitionMap2 update(@Nullable GridDhtPartitionExchangeId exchId,
+ @Nullable public boolean update(@Nullable GridDhtPartitionExchangeId exchId,
GridDhtPartitionMap2 parts,
@Nullable Map<Integer, Long> cntrMap);
http://git-wip-us.apache.org/repos/asf/ignite/blob/d4477e84/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index ab573bd..24ff3ac 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -340,7 +340,9 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
long updateSeq = this.updateSeq.incrementAndGet();
- initPartitions0(exchFut, updateSeq);
+ ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(topVer);
+
+ initPartitions0(oldest, exchFut, updateSeq);
consistencyCheck();
}
@@ -350,14 +352,13 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
}
/**
+ * @param oldest Oldest server node.
* @param exchFut Exchange future.
* @param updateSeq Update sequence.
*/
- private void initPartitions0(GridDhtPartitionsExchangeFuture exchFut, long updateSeq) {
+ private void initPartitions0(ClusterNode oldest, GridDhtPartitionsExchangeFuture exchFut, long updateSeq) {
ClusterNode loc = cctx.localNode();
- ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx.shared(), topVer);
-
assert oldest != null || cctx.kernalContext().clientNode();
GridDhtPartitionExchangeId exchId = exchFut.exchangeId();
@@ -397,12 +398,12 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
if (log.isDebugEnabled())
log.debug("Owned partition for oldest node: " + locPart);
- updateLocal(p, loc.id(), locPart.state(), updateSeq);
+ updateLocal(oldest, p, loc.id(), locPart.state(), updateSeq);
}
}
}
else
- createPartitions(aff, updateSeq);
+ createPartitions(oldest, aff, updateSeq);
}
else {
// If preloader is disabled, then we simply clear out
@@ -419,7 +420,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
if (state.active()) {
locPart.rent(false);
- updateLocal(p, loc.id(), locPart.state(), updateSeq);
+ updateLocal(oldest, p, loc.id(), locPart.state(), updateSeq);
if (log.isDebugEnabled())
log.debug("Evicting partition with rebalancing disabled " +
@@ -433,7 +434,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
}
if (node2part != null && node2part.valid())
- checkEvictions(updateSeq, aff);
+ checkEvictions(oldest, updateSeq, aff);
updateRebalanceVersion(aff);
}
@@ -442,7 +443,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
* @param aff Affinity assignments.
* @param updateSeq Update sequence.
*/
- private void createPartitions(List<List<ClusterNode>> aff, long updateSeq) {
+ private void createPartitions(ClusterNode oldest, List<List<ClusterNode>> aff, long updateSeq) {
ClusterNode loc = cctx.localNode();
int num = cctx.affinity().partitions();
@@ -454,7 +455,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
// will be created in MOVING state.
GridDhtLocalPartition locPart = createPartition(p);
- updateLocal(p, loc.id(), locPart.state(), updateSeq);
+ updateLocal(oldest, p, loc.id(), locPart.state(), updateSeq);
}
}
// If this node's map is empty, we pre-create local partitions,
@@ -486,7 +487,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
removeNode(exchId.nodeId());
// In case if node joins, get topology at the time of joining node.
- ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx.shared(), topVer);
+ ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(topVer);
assert oldest != null || cctx.kernalContext().clientNode();
@@ -523,11 +524,11 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
}
if (affReady)
- initPartitions0(exchFut, updateSeq);
+ initPartitions0(oldest, exchFut, updateSeq);
else {
List<List<ClusterNode>> aff = cctx.affinity().idealAssignment();
- createPartitions(aff, updateSeq);
+ createPartitions(oldest, aff, updateSeq);
}
consistencyCheck();
@@ -574,6 +575,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
long updateSeq = this.updateSeq.incrementAndGet();
+ ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(topVer);
+
for (int p = 0; p < num; p++) {
GridDhtLocalPartition locPart = localPartition(p, topVer, false, false);
@@ -600,7 +603,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
assert owned : "Failed to own partition [cacheName" + cctx.name() + ", locPart=" +
locPart + ']';
- updateLocal(p, loc.id(), locPart.state(), updateSeq);
+ updateLocal(oldest, p, loc.id(), locPart.state(), updateSeq);
changed = true;
@@ -620,7 +623,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
locPart + ", owners = " + owners + ']');
}
else
- updateLocal(p, loc.id(), locPart.state(), updateSeq);
+ updateLocal(oldest, p, loc.id(), locPart.state(), updateSeq);
}
}
else {
@@ -630,7 +633,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
if (state == MOVING) {
locPart.rent(false);
- updateLocal(p, loc.id(), locPart.state(), updateSeq);
+ updateLocal(oldest, p, loc.id(), locPart.state(), updateSeq);
changed = true;
@@ -985,7 +988,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
/** {@inheritDoc} */
@SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
- @Nullable @Override public GridDhtPartitionMap2 update(@Nullable GridDhtPartitionExchangeId exchId,
+ @Nullable @Override public boolean update(@Nullable GridDhtPartitionExchangeId exchId,
GridDhtPartitionFullMap partMap,
@Nullable Map<Integer, Long> cntrMap) {
if (log.isDebugEnabled())
@@ -997,7 +1000,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
try {
if (stopping)
- return null;
+ return false;
if (cntrMap != null) {
for (Map.Entry<Integer, Long> e : cntrMap.entrySet()) {
@@ -1025,7 +1028,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
log.debug("Stale exchange id for full partition map update (will ignore) [lastExchId=" +
lastExchangeId + ", exchId=" + exchId + ']');
- return null;
+ return false;
}
if (node2part != null && node2part.compareTo(partMap) >= 0) {
@@ -1033,7 +1036,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
log.debug("Stale partition map for full partition map update (will ignore) [lastExchId=" +
lastExchangeId + ", exchId=" + exchId + ", curMap=" + node2part + ", newMap=" + partMap + ']');
- return null;
+ return false;
}
long updateSeq = this.updateSeq.incrementAndGet();
@@ -1100,7 +1103,9 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
if (!affVer.equals(AffinityTopologyVersion.NONE) && affVer.compareTo(topVer) >= 0) {
List<List<ClusterNode>> aff = cctx.affinity().assignments(topVer);
- changed = checkEvictions(updateSeq, aff);
+ ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(topVer);
+
+ changed = checkEvictions(oldest, updateSeq, aff);
updateRebalanceVersion(aff);
}
@@ -1110,7 +1115,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
if (log.isDebugEnabled())
log.debug("Partition map after full update: " + fullMapString());
- return changed ? localPartitionMap() : null;
+ return changed;
}
finally {
lock.writeLock().unlock();
@@ -1119,7 +1124,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
/** {@inheritDoc} */
@SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
- @Nullable @Override public GridDhtPartitionMap2 update(@Nullable GridDhtPartitionExchangeId exchId,
+ @Nullable @Override public boolean update(@Nullable GridDhtPartitionExchangeId exchId,
GridDhtPartitionMap2 parts,
@Nullable Map<Integer, Long> cntrMap) {
if (log.isDebugEnabled())
@@ -1130,33 +1135,28 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
log.debug("Received partition update for non-existing node (will ignore) [exchId=" + exchId +
", parts=" + parts + ']');
- return null;
+ return false;
}
lock.writeLock().lock();
try {
if (stopping)
- return null;
+ return false;
if (cntrMap != null) {
for (Map.Entry<Integer, Long> e : cntrMap.entrySet()) {
- Long cntr = this.cntrMap.get(e.getKey());
+ Integer p = e.getKey();
- if (cntr == null || cntr < e.getValue())
- this.cntrMap.put(e.getKey(), e.getValue());
- }
+ Long cntr = this.cntrMap.get(p);
- for (int i = 0; i < locParts.length(); i++) {
- GridDhtLocalPartition part = locParts.get(i);
-
- if (part == null)
- continue;
+ if (cntr == null || cntr < e.getValue())
+ this.cntrMap.put(p, e.getValue());
- Long cntr = cntrMap.get(part.id());
+ GridDhtLocalPartition part = locParts.get(p);
- if (cntr != null)
- part.updateCounter(cntr);
+ if (part != null)
+ part.updateCounter(e.getValue());
}
}
@@ -1165,7 +1165,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
log.debug("Stale exchange id for single partition map update (will ignore) [lastExchId=" +
lastExchangeId + ", exchId=" + exchId + ']');
- return null;
+ return false;
}
if (exchId != null)
@@ -1182,7 +1182,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
log.debug("Stale update sequence for single partition map update (will ignore) [exchId=" + exchId +
", curSeq=" + cur.updateSequence() + ", newSeq=" + parts.updateSequence() + ']');
- return null;
+ return false;
}
long updateSeq = this.updateSeq.incrementAndGet();
@@ -1225,7 +1225,9 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
if (!affVer.equals(AffinityTopologyVersion.NONE) && affVer.compareTo(topVer) >= 0) {
List<List<ClusterNode>> aff = cctx.affinity().assignments(topVer);
- changed |= checkEvictions(updateSeq, aff);
+ ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(topVer);
+
+ changed |= checkEvictions(oldest, updateSeq, aff);
updateRebalanceVersion(aff);
}
@@ -1235,7 +1237,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
if (log.isDebugEnabled())
log.debug("Partition map after single update: " + fullMapString());
- return changed ? localPartitionMap() : null;
+ return changed;
}
finally {
lock.writeLock().unlock();
@@ -1243,11 +1245,12 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
}
/**
+ * @param oldest Oldest server node.
* @param updateSeq Update sequence.
* @param aff Affinity assignments.
* @return Checks if any of the local partitions need to be evicted.
*/
- private boolean checkEvictions(long updateSeq, List<List<ClusterNode>> aff) {
+ private boolean checkEvictions(ClusterNode oldest, long updateSeq, List<List<ClusterNode>> aff) {
boolean changed = false;
UUID locId = cctx.nodeId();
@@ -1270,7 +1273,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
if (nodeIds.containsAll(F.nodeIds(affNodes))) {
part.rent(false);
- updateLocal(part.id(), locId, part.state(), updateSeq);
+ updateLocal(oldest, part.id(), locId, part.state(), updateSeq);
changed = true;
@@ -1295,7 +1298,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
if (locId.equals(n.id())) {
part.rent(false);
- updateLocal(part.id(), locId, part.state(), updateSeq);
+ updateLocal(oldest, part.id(), locId, part.state(), updateSeq);
changed = true;
@@ -1318,18 +1321,16 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
/**
* Updates value for single partition.
*
+ * @param oldest Oldest server node.
* @param p Partition.
* @param nodeId Node ID.
* @param state State.
* @param updateSeq Update sequence.
*/
@SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
- private void updateLocal(int p, UUID nodeId, GridDhtPartitionState state, long updateSeq) {
+ private void updateLocal(ClusterNode oldest, int p, UUID nodeId, GridDhtPartitionState state, long updateSeq) {
assert nodeId.equals(cctx.nodeId());
- // In case if node joins, get topology at the time of joining node.
- ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx.shared(), topVer);
-
assert oldest != null || cctx.kernalContext().clientNode();
// If this node became the oldest node.
@@ -1424,7 +1425,9 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
try {
if (part.own()) {
- updateLocal(part.id(), loc.id(), part.state(), updateSeq.incrementAndGet());
+ ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(topVer);
+
+ updateLocal(oldest, part.id(), loc.id(), part.state(), updateSeq.incrementAndGet());
consistencyCheck();
@@ -1452,7 +1455,9 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
long seq = updateSeq ? this.updateSeq.incrementAndGet() : this.updateSeq.get();
- updateLocal(part.id(), cctx.localNodeId(), part.state(), seq);
+ ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(topVer);
+
+ updateLocal(oldest, part.id(), cctx.localNodeId(), part.state(), seq);
consistencyCheck();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/d4477e84/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index f391265..704c654 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -112,6 +112,10 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
/** */
@GridToStringExclude
+ private int pendingSingleUpdates;
+
+ /** */
+ @GridToStringExclude
private List<ClusterNode> srvNodes;
/** */
@@ -1162,13 +1166,16 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
*/
private void processMessage(ClusterNode node, GridDhtPartitionsSingleMessage msg) {
boolean allReceived = false;
+ boolean updateSingleMap = false;
synchronized (mux) {
assert crd != null;
if (crd.isLocal()) {
if (remaining.remove(node.id())) {
- updatePartitionSingleMap(msg);
+ updateSingleMap = true;
+
+ pendingSingleUpdates++;
allReceived = remaining.isEmpty();
}
@@ -1177,8 +1184,42 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
singleMsgs.put(node, msg);
}
- if (allReceived)
+ if (updateSingleMap) {
+ try {
+ updatePartitionSingleMap(msg);
+ }
+ finally {
+ synchronized (mux) {
+ assert pendingSingleUpdates > 0;
+
+ pendingSingleUpdates--;
+
+ if (pendingSingleUpdates == 0)
+ mux.notifyAll();
+ }
+ }
+ }
+
+ if (allReceived) {
+ awaitSingleMapUpdates();
+
onAllReceived(false);
+ }
+ }
+
+ /**
+ *
+ */
+ private void awaitSingleMapUpdates() {
+ synchronized (mux) {
+ try {
+ while (pendingSingleUpdates > 0)
+ U.wait(mux);
+ }
+ catch (IgniteInterruptedCheckedException e) {
+ U.warn(log, "Failed to wait for partition map updates, thread was interrupted: " + e);
+ }
+ }
}
/**
@@ -1374,7 +1415,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
if (cacheCtx != null)
cacheCtx.topology().update(exchId, entry.getValue(), cntrMap);
else {
- ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx, AffinityTopologyVersion.NONE);
+ ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(AffinityTopologyVersion.NONE);
if (oldest != null && oldest.isLocal())
cctx.exchange().clientTopology(cacheId, this).update(exchId, entry.getValue(), cntrMap);
@@ -1557,6 +1598,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
if (crd0.isLocal()) {
if (allReceived) {
+ awaitSingleMapUpdates();
+
onAllReceived(true);
return;
http://git-wip-us.apache.org/repos/asf/ignite/blob/d4477e84/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index 6c26363..b9b92b8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -1265,7 +1265,7 @@ public class GridServiceProcessor extends GridProcessorAdapter {
try {
if (!cache.context().affinityNode()) {
ClusterNode oldestSrvNode =
- CU.oldestAliveCacheServerNode(cache.context().shared(), AffinityTopologyVersion.NONE);
+ ctx.discovery().oldestAliveCacheServerNode(AffinityTopologyVersion.NONE);
if (oldestSrvNode == null)
return new GridEmptyIterator<>();
@@ -1589,7 +1589,7 @@ public class GridServiceProcessor extends GridProcessorAdapter {
depExe.submit(new BusyRunnable() {
@Override public void run0() {
- ClusterNode oldest = CU.oldestAliveCacheServerNode(cache.context().shared(), topVer);
+ ClusterNode oldest = ctx.discovery().oldestAliveCacheServerNode(topVer);
if (oldest != null && oldest.isLocal()) {
final Collection<GridServiceDeployment> retries = new ConcurrentLinkedQueue<>();
http://git-wip-us.apache.org/repos/asf/ignite/blob/d4477e84/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 8814745..0e90418 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -129,6 +129,7 @@ import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingRequest;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingResponse;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryRedirectToClient;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryStatusCheckMessage;
+import org.apache.ignite.thread.IgniteThreadPoolExecutor;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
@@ -136,6 +137,7 @@ import static org.apache.ignite.IgniteSystemProperties.IGNITE_BINARY_MARSHALLER_
import static org.apache.ignite.IgniteSystemProperties.IGNITE_DISCOVERY_CLIENT_RECONNECT_HISTORY_SIZE;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_OPTIMIZED_MARSHALLER_USE_DEFAULT_SUID;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_SERVICES_COMPATIBILITY_MODE;
+import static org.apache.ignite.IgniteSystemProperties.getBoolean;
import static org.apache.ignite.IgniteSystemProperties.getInteger;
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
@@ -174,8 +176,7 @@ class ServerImpl extends TcpDiscoveryImpl {
IgniteProductVersion.fromString("1.5.0");
/** */
- private final ThreadPoolExecutor utilityPool = new ThreadPoolExecutor(0, 1, 2000, TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue<Runnable>());
+ private IgniteThreadPoolExecutor utilityPool;
/** Nodes ring. */
@GridToStringExclude
@@ -297,6 +298,13 @@ class ServerImpl extends TcpDiscoveryImpl {
spiState = DISCONNECTED;
}
+ utilityPool = new IgniteThreadPoolExecutor("disco-pool",
+ spi.ignite().name(),
+ 0,
+ 1,
+ 2000,
+ new LinkedBlockingQueue<Runnable>());
+
if (debugMode) {
if (!log.isInfoEnabled())
throw new IgniteSpiException("Info log level should be enabled for TCP discovery to work " +
@@ -2403,9 +2411,12 @@ class ServerImpl extends TcpDiscoveryImpl {
/** Connection check threshold. */
private long connCheckThreshold;
+ /** */
+ private long lastRingMsgTime;
+
/**
*/
- protected RingMessageWorker() {
+ RingMessageWorker() {
super("tcp-disco-msg-worker", 10);
initConnectionCheckFrequency();
@@ -2500,6 +2511,8 @@ class ServerImpl extends TcpDiscoveryImpl {
* @param msg Message to process.
*/
@Override protected void processMessage(TcpDiscoveryAbstractMessage msg) {
+ sendHeartbeatMessage();
+
DebugLogger log = messageLogger(msg);
if (log.isDebugEnabled())
@@ -2508,6 +2521,11 @@ class ServerImpl extends TcpDiscoveryImpl {
if (debugMode)
debugLog(msg, "Processing message [cls=" + msg.getClass().getSimpleName() + ", id=" + msg.id() + ']');
+ boolean ensured = spi.ensured(msg);
+
+ if (!locNode.id().equals(msg.senderNodeId()) && ensured)
+ lastRingMsgTime = U.currentTimeMillis();
+
if (locNode.internalOrder() == 0) {
boolean proc = false;
@@ -2564,7 +2582,7 @@ class ServerImpl extends TcpDiscoveryImpl {
else
assert false : "Unknown message type: " + msg.getClass().getSimpleName();
- if (spi.ensured(msg) && redirectToClients(msg))
+ if (ensured && redirectToClients(msg))
msgHist.add(msg);
if (msg.senderNodeId() != null && !msg.senderNodeId().equals(getLocalNodeId())) {
@@ -5336,12 +5354,9 @@ class ServerImpl extends TcpDiscoveryImpl {
* Sends heartbeat message if needed.
*/
private void sendHeartbeatMessage() {
- if (!isLocalNodeCoordinator())
- return;
-
long elapsed = (lastTimeHbMsgSent + spi.hbFreq) - U.currentTimeMillis();
- if (elapsed > 0)
+ if (elapsed > 0 || !isLocalNodeCoordinator())
return;
TcpDiscoveryHeartbeatMessage msg = new TcpDiscoveryHeartbeatMessage(getConfiguredNodeId());
@@ -5361,7 +5376,9 @@ class ServerImpl extends TcpDiscoveryImpl {
if (lastTimeStatusMsgSent < locNode.lastUpdateTime())
lastTimeStatusMsgSent = locNode.lastUpdateTime();
- long elapsed = (lastTimeStatusMsgSent + hbCheckFreq) - U.currentTimeMillis();
+ long updateTime = Math.max(lastTimeStatusMsgSent, lastRingMsgTime);
+
+ long elapsed = (updateTime + hbCheckFreq) - U.currentTimeMillis();
if (elapsed > 0)
return;
@@ -6062,11 +6079,11 @@ class ServerImpl extends TcpDiscoveryImpl {
TcpDiscoverySpiState state = spiStateCopy();
- long socketTimeout = spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() :
+ long sockTimeout = spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() :
spi.getSocketTimeout();
if (state == CONNECTED) {
- spi.writeToSocket(msg, sock, RES_OK, socketTimeout);
+ spi.writeToSocket(msg, sock, RES_OK, sockTimeout);
if (log.isDebugEnabled())
log.debug("Responded to join request message [msg=" + msg + ", res=" + RES_OK + ']');
@@ -6103,7 +6120,7 @@ class ServerImpl extends TcpDiscoveryImpl {
// Local node is stopping. Remote node should try next one.
res = RES_CONTINUE_JOIN;
- spi.writeToSocket(msg, sock, res, socketTimeout);
+ spi.writeToSocket(msg, sock, res, sockTimeout);
if (log.isDebugEnabled())
log.debug("Responded to join request message [msg=" + msg + ", res=" + res + ']');
http://git-wip-us.apache.org/repos/asf/ignite/blob/d4477e84/modules/core/src/test/java/org/apache/ignite/cache/affinity/AbstractAffinityFunctionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/AbstractAffinityFunctionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/AbstractAffinityFunctionSelfTest.java
index 878d7d1..43017db 100644
--- a/modules/core/src/test/java/org/apache/ignite/cache/affinity/AbstractAffinityFunctionSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cache/affinity/AbstractAffinityFunctionSelfTest.java
@@ -104,6 +104,7 @@ public abstract class AbstractAffinityFunctionSelfTest extends GridCommonAbstrac
}
/**
+ * @param backups Number of backups.
* @throws Exception If failed.
*/
protected void checkNodeRemoved(int backups) throws Exception {
@@ -247,7 +248,6 @@ public abstract class AbstractAffinityFunctionSelfTest extends GridCommonAbstrac
}
}
-
/**
* @param assignment Assignment to verify.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/d4477e84/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerAliveCacheSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerAliveCacheSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerAliveCacheSelfTest.java
index 390c83e..31b4bc7 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerAliveCacheSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerAliveCacheSelfTest.java
@@ -239,7 +239,7 @@ public class GridDiscoveryManagerAliveCacheSelfTest extends GridCommonAbstractTe
GridCacheSharedContext<?, ?> ctx = k.context().cache().context();
ClusterNode oldest =
- GridCacheUtils.oldestAliveCacheServerNode(ctx, new AffinityTopologyVersion(currVer));
+ ctx.discovery().oldestAliveCacheServerNode(new AffinityTopologyVersion(currVer));
assertNotNull(oldest);
http://git-wip-us.apache.org/repos/asf/ignite/blob/d4477e84/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedAffinitySpreadTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedAffinitySpreadTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedAffinitySpreadTest.java
index a59ca8b..2d46cf4 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedAffinitySpreadTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedAffinitySpreadTest.java
@@ -76,7 +76,12 @@ public class GridCachePartitionedAffinitySpreadTest extends GridCommonAbstractTe
Map<ClusterNode, Integer> parts = new HashMap<>(nodes.size());
for (int part = 0; part < aff.getPartitions(); part++) {
- Collection<ClusterNode> affNodes = aff.assignPartition(part, new ArrayList(nodes), 0, null);
+ Collection<ClusterNode> affNodes = aff.assignPartition(null,
+ part,
+ new ArrayList<>(nodes),
+ new HashMap<ClusterNode, byte[]>(),
+ 0,
+ null);
assertEquals(1, affNodes.size());