You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by yz...@apache.org on 2016/12/28 14:05:15 UTC
[49/50] [abbrv] ignite git commit: cheats
cheats
* 1. add partition mapping to near upd resp
3. avoid topology lock
4. avoid contention on version generation
* 6. org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache#unlockEntries
* 7. org.apache.ignite.internal.processors.cache.GridCacheEvictionManager#touch(org.apache.ignite.internal.processors.cache.GridCacheEntryEx, org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion)
8. org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateFuture#mapOnTopology - cache.topology().readLock();
*9. org.apache.ignite.internal.processors.cache.GridCacheUtils#affinityNodes(org.apache.ignite.internal.processors.cache.GridCacheContext, org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion)
*10. org.apache.ignite.internal.managers.discovery.GridDiscoveryManager.DiscoCache#cacheAffinityNodes - change to cache id
* NOT FULLY - 11. org.apache.ignite.internal.processors.cache.version.GridCacheVersionManager#next(long, boolean, boolean, byte)
12. org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopologyImpl#nodes(int, org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion) - try replace with array for benchmark
postpone
2. avoid double context switch
5. put atomic futures per partition
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0706f1af
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0706f1af
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0706f1af
Branch: refs/heads/ignite-comm-balance-master
Commit: 0706f1afff2febb0546e403dec46329336e03ba7
Parents: 855399a
Author: Yakov Zhdanov <yz...@gridgain.com>
Authored: Wed Dec 28 16:48:52 2016 +0300
Committer: Yakov Zhdanov <yz...@gridgain.com>
Committed: Wed Dec 28 16:48:52 2016 +0300
----------------------------------------------------------------------
.../ignite/codegen/MessageCodeGenerator.java | 2 +-
.../discovery/GridDiscoveryManager.java | 257 +++++--------------
.../processors/cache/GridCacheUtils.java | 6 +-
.../cache/affinity/GridCacheAffinityImpl.java | 2 +-
.../dht/GridClientPartitionTopology.java | 14 +-
.../distributed/dht/GridDhtCacheAdapter.java | 4 +-
.../dht/GridDhtPartitionTopologyImpl.java | 17 +-
.../dht/atomic/GridDhtAtomicCache.java | 25 +-
.../GridNearAtomicSingleUpdateFuture.java | 78 +++---
.../dht/atomic/GridNearAtomicUpdateFuture.java | 79 +++---
.../atomic/GridNearAtomicUpdateResponse.java | 35 ++-
.../cache/version/GridCacheVersion.java | 2 +-
.../cache/version/GridCacheVersionManager.java | 54 +++-
.../clock/GridClockSyncProcessor.java | 54 ++--
.../GridDiscoveryManagerAliveCacheSelfTest.java | 58 +----
.../discovery/GridDiscoveryManagerSelfTest.java | 2 +-
16 files changed, 294 insertions(+), 395 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/0706f1af/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
----------------------------------------------------------------------
diff --git a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
index 65ebd8d..5a26187 100644
--- a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
+++ b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
@@ -924,4 +924,4 @@ public class MessageCodeGenerator {
}
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/0706f1af/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 9aa4db1..9ecd78e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.managers.discovery;
-import java.io.Externalizable;
import java.io.Serializable;
import java.lang.management.GarbageCollectorMXBean;
import java.lang.management.ManagementFactory;
@@ -144,9 +143,6 @@ import static org.apache.ignite.plugin.segmentation.SegmentationPolicy.NOOP;
* Discovery SPI manager.
*/
public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
- /** Fake key for {@code null}-named caches. Used inside {@link DiscoCache}. */
- private static final String NULL_CACHE_NAME = UUID.randomUUID().toString();
-
/** Metrics update frequency. */
private static final long METRICS_UPDATE_FREQ = 3000;
@@ -1577,7 +1573,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
* @return Collection of cache nodes.
*/
public Collection<ClusterNode> nodes(AffinityTopologyVersion topVer) {
- return resolveDiscoCache(null, topVer).allNodes();
+ return resolveDiscoCache(topVer).allNodes();
}
/**
@@ -1585,7 +1581,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
* @return All server nodes for given topology version.
*/
public List<ClusterNode> serverNodes(AffinityTopologyVersion topVer) {
- return resolveDiscoCache(null, topVer).srvNodes;
+ return resolveDiscoCache(topVer).srvNodes;
}
/**
@@ -1596,7 +1592,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
* @return Node.
*/
public ClusterNode node(AffinityTopologyVersion topVer, UUID id) {
- return resolveDiscoCache(null, topVer).node(id);
+ return resolveDiscoCache(topVer).node(id);
}
/**
@@ -1607,49 +1603,38 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
* @return Collection of cache nodes.
*/
public Collection<ClusterNode> cacheNodes(@Nullable String cacheName, AffinityTopologyVersion topVer) {
- return resolveDiscoCache(cacheName, topVer).cacheNodes(cacheName, topVer.topologyVersion());
+ return resolveDiscoCache(topVer).cacheNodes(cacheName, topVer.topologyVersion());
}
/**
- * Gets all nodes with at least one cache configured.
+ * Gets cache nodes for cache with given ID.
*
+ * @param cacheId Cache ID.
* @param topVer Topology version.
* @return Collection of cache nodes.
*/
- public Collection<ClusterNode> cacheNodes(AffinityTopologyVersion topVer) {
- return resolveDiscoCache(null, topVer).allNodesWithCaches(topVer.topologyVersion());
+ public Collection<ClusterNode> cacheNodes(int cacheId, AffinityTopologyVersion topVer) {
+ return resolveDiscoCache(topVer).cacheNodes(cacheId, topVer.topologyVersion());
}
/**
- * Gets cache remote nodes for cache with given name.
- *
- * @param topVer Topology version.
- * @return Collection of cache nodes.
- */
- public Collection<ClusterNode> remoteCacheNodes(AffinityTopologyVersion topVer) {
- return resolveDiscoCache(null, topVer).remoteCacheNodes(topVer.topologyVersion());
- }
-
- /**
- * Gets cache nodes for cache with given name.
+ * Gets all nodes with at least one cache configured.
*
- * @param cacheName Cache name.
* @param topVer Topology version.
* @return Collection of cache nodes.
*/
- Collection<ClusterNode> aliveCacheNodes(@Nullable String cacheName, AffinityTopologyVersion topVer) {
- return resolveDiscoCache(cacheName, topVer).aliveCacheNodes(cacheName, topVer.topologyVersion());
+ public Collection<ClusterNode> cacheNodes(AffinityTopologyVersion topVer) {
+ return resolveDiscoCache(topVer).allNodesWithCaches(topVer.topologyVersion());
}
/**
* Gets cache remote nodes for cache with given name.
*
- * @param cacheName Cache name.
* @param topVer Topology version.
* @return Collection of cache nodes.
*/
- Collection<ClusterNode> aliveRemoteCacheNodes(@Nullable String cacheName, AffinityTopologyVersion topVer) {
- return resolveDiscoCache(cacheName, topVer).aliveRemoteCacheNodes(cacheName, topVer.topologyVersion());
+ public Collection<ClusterNode> remoteCacheNodes(AffinityTopologyVersion topVer) {
+ return resolveDiscoCache(topVer).remoteCacheNodes(topVer.topologyVersion());
}
/**
@@ -1657,7 +1642,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
* @return Oldest alive server nodes with at least one cache configured.
*/
@Nullable public ClusterNode oldestAliveCacheServerNode(AffinityTopologyVersion topVer) {
- DiscoCache cache = resolveDiscoCache(null, topVer);
+ DiscoCache cache = resolveDiscoCache(topVer);
Map.Entry<ClusterNode, Boolean> e = cache.aliveSrvNodesWithCaches.firstEntry();
@@ -1672,7 +1657,18 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
* @return Collection of cache affinity nodes.
*/
public Collection<ClusterNode> cacheAffinityNodes(@Nullable String cacheName, AffinityTopologyVersion topVer) {
- return resolveDiscoCache(cacheName, topVer).cacheAffinityNodes(cacheName, topVer.topologyVersion());
+ return resolveDiscoCache(topVer).cacheAffinityNodes(CU.cacheId(cacheName), topVer.topologyVersion());
+ }
+
+ /**
+ * Gets cache nodes for cache with given ID that participate in affinity calculation.
+ *
+ * @param cacheId Cache ID.
+ * @param topVer Topology version.
+ * @return Collection of cache affinity nodes.
+ */
+ public Collection<ClusterNode> cacheAffinityNodes(int cacheId, AffinityTopologyVersion topVer) {
+ return resolveDiscoCache(topVer).cacheAffinityNodes(cacheId, topVer.topologyVersion());
}
/**
@@ -1749,25 +1745,34 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
* @return {@code True} if cache with given name has at least one node with near cache enabled.
*/
public boolean hasNearCache(@Nullable String cacheName, AffinityTopologyVersion topVer) {
- return resolveDiscoCache(cacheName, topVer).hasNearCache(cacheName);
+ return resolveDiscoCache(topVer).hasNearCache(CU.cacheId(cacheName));
+ }
+
+ /**
+ * Checks if cache with given ID has at least one node with near cache enabled.
+ *
+ * @param cacheId Cache ID.
+ * @param topVer Topology version.
+ * @return {@code True} if cache with given name has at least one node with near cache enabled.
+ */
+ public boolean hasNearCache(int cacheId, AffinityTopologyVersion topVer) {
+ return resolveDiscoCache(topVer).hasNearCache(cacheId);
}
/**
* Gets discovery cache for given topology version.
*
- * @param cacheName Cache name (participates in exception message).
* @param topVer Topology version.
* @return Discovery cache.
*/
- private DiscoCache resolveDiscoCache(@Nullable String cacheName, AffinityTopologyVersion topVer) {
+ private DiscoCache resolveDiscoCache(AffinityTopologyVersion topVer) {
Snapshot snap = topSnap.get();
DiscoCache cache = AffinityTopologyVersion.NONE.equals(topVer) || topVer.equals(snap.topVer) ?
snap.discoCache : discoCacheHist.get(topVer);
if (cache == null) {
- throw new IgniteException("Failed to resolve nodes topology [cacheName=" + cacheName +
- ", topVer=" + topVer +
+ throw new IgniteException("Failed to resolve nodes topology [topVer=" + topVer +
", history=" + discoCacheHist.keySet() +
", snap=" + snap +
", locNode=" + ctx.discovery().localNode() + ']');
@@ -2093,19 +2098,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
evts.add(new GridTuple5<>(type, topVer, node, topSnapshot, data));
}
- /**
- * @param node Node to get a short description for.
- * @return Short description for the node to be used in 'quiet' mode.
- */
- private String quietNode(ClusterNode node) {
- assert node != null;
-
- return "nodeId8=" + node.id().toString().substring(0, 8) + ", " +
- "addrs=" + U.addressesAsString(node) + ", " +
- "order=" + node.order() + ", " +
- "CPUs=" + node.metrics().getTotalCpus();
- }
-
/** {@inheritDoc} */
@Override protected void body() throws InterruptedException {
while (!isCancelled()) {
@@ -2415,11 +2407,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
/** Topology await version. */
private long awaitVer;
- /** Empty constructor required by {@link Externalizable}. */
- private DiscoTopologyFuture() {
- // No-op.
- }
-
/**
* @param ctx Context.
* @param awaitVer Await version.
@@ -2509,19 +2496,15 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
/** Cache nodes by cache name. */
@GridToStringInclude
- private final Map<String, Collection<ClusterNode>> allCacheNodes;
-
- /** Remote cache nodes by cache name. */
- @GridToStringInclude
- private final Map<String, Collection<ClusterNode>> rmtCacheNodes;
+ private final Map<Integer, Collection<ClusterNode>> allCacheNodes;
/** Cache nodes by cache name. */
@GridToStringInclude
- private final Map<String, Collection<ClusterNode>> affCacheNodes;
+ private final Map<Integer, Collection<ClusterNode>> affCacheNodes;
/** Caches where at least one node has near cache enabled. */
@GridToStringInclude
- private final Set<String> nearEnabledCaches;
+ private final Set<Integer> nearEnabledCaches;
/** Nodes grouped by version. */
private final NavigableMap<IgniteProductVersion, Collection<ClusterNode>> nodesByVer;
@@ -2539,18 +2522,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
private final long maxOrder;
/**
- * Cached alive nodes list. As long as this collection doesn't accept {@code null}s use {@link
- * #maskNull(String)} before passing raw cache names to it.
- */
- private final ConcurrentMap<String, Collection<ClusterNode>> aliveCacheNodes;
-
- /**
- * Cached alive remote nodes list. As long as this collection doesn't accept {@code null}s use {@link
- * #maskNull(String)} before passing raw cache names to it.
- */
- private final ConcurrentMap<String, Collection<ClusterNode>> aliveRmtCacheNodes;
-
- /**
* Cached alive server remote nodes with caches.
*/
private final ConcurrentSkipListMap<ClusterNode, Boolean> aliveSrvNodesWithCaches;
@@ -2578,20 +2549,17 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
allNodes = Collections.unmodifiableList(all);
- Map<String, Collection<ClusterNode>> cacheMap = new HashMap<>(allNodes.size(), 1.0f);
- Map<String, Collection<ClusterNode>> rmtCacheMap = new HashMap<>(allNodes.size(), 1.0f);
- Map<String, Collection<ClusterNode>> dhtNodesMap = new HashMap<>(allNodes.size(), 1.0f);
- Collection<ClusterNode> nodesWithCaches = new HashSet<>(allNodes.size());
- Collection<ClusterNode> rmtNodesWithCaches = new HashSet<>(allNodes.size());
+ Map<Integer, Collection<ClusterNode>> cacheMap = U.newHashMap(allNodes.size());
+ Map<Integer, Collection<ClusterNode>> dhtNodesMap = U.newHashMap(allNodes.size());
+ Collection<ClusterNode> nodesWithCaches = U.newHashSet(allNodes.size());
+ Collection<ClusterNode> rmtNodesWithCaches = U.newHashSet(allNodes.size());
- aliveCacheNodes = new ConcurrentHashMap8<>(allNodes.size(), 1.0f);
- aliveRmtCacheNodes = new ConcurrentHashMap8<>(allNodes.size(), 1.0f);
aliveSrvNodesWithCaches = new ConcurrentSkipListMap<>(GridNodeOrderComparator.INSTANCE);
nodesByVer = new TreeMap<>();
long maxOrder0 = 0;
- Set<String> nearEnabledSet = new HashSet<>();
+ Set<Integer> nearEnabledSet = new HashSet<>();
List<ClusterNode> srvNodes = new ArrayList<>();
@@ -2620,21 +2588,11 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
addToMap(cacheMap, cacheName, node);
- if (alive(node.id()))
- addToMap(aliveCacheNodes, maskNull(cacheName), node);
-
if (filter.dataNode(node))
addToMap(dhtNodesMap, cacheName, node);
if (filter.nearNode(node))
- nearEnabledSet.add(cacheName);
-
- if (!loc.id().equals(node.id())) {
- addToMap(rmtCacheMap, cacheName, node);
-
- if (alive(node.id()))
- addToMap(aliveRmtCacheNodes, maskNull(cacheName), node);
- }
+ nearEnabledSet.add(CU.cacheId(cacheName));
hasCaches = true;
}
@@ -2674,7 +2632,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
maxOrder = maxOrder0;
allCacheNodes = Collections.unmodifiableMap(cacheMap);
- rmtCacheNodes = Collections.unmodifiableMap(rmtCacheMap);
affCacheNodes = Collections.unmodifiableMap(dhtNodesMap);
allNodesWithCaches = Collections.unmodifiableCollection(nodesWithCaches);
this.rmtNodesWithCaches = Collections.unmodifiableCollection(rmtNodesWithCaches);
@@ -2684,7 +2641,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
daemonNodes = Collections.unmodifiableList(new ArrayList<>(
F.view(F.concat(false, loc, rmts), F0.not(FILTER_DAEMON))));
- Map<UUID, ClusterNode> nodeMap = new HashMap<>(allNodes().size() + daemonNodes.size(), 1.0f);
+ Map<UUID, ClusterNode> nodeMap = U.newHashMap(allNodes().size() + daemonNodes.size());
for (ClusterNode n : F.concat(false, allNodes(), daemonNodes()))
nodeMap.put(n.id(), n);
@@ -2699,13 +2656,13 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
* @param cacheName Cache name.
* @param rich Node to add
*/
- private void addToMap(Map<String, Collection<ClusterNode>> cacheMap, String cacheName, ClusterNode rich) {
- Collection<ClusterNode> cacheNodes = cacheMap.get(cacheName);
+ private void addToMap(Map<Integer, Collection<ClusterNode>> cacheMap, String cacheName, ClusterNode rich) {
+ Collection<ClusterNode> cacheNodes = cacheMap.get(CU.cacheId(cacheName));
if (cacheNodes == null) {
cacheNodes = new ArrayList<>(allNodes.size());
- cacheMap.put(cacheName, cacheNodes);
+ cacheMap.put(CU.cacheId(cacheName), cacheNodes);
}
cacheNodes.add(rich);
@@ -2727,28 +2684,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
}
/**
- * Gets collection of nodes which have version equal or greater than {@code ver}.
- *
- * @param ver Version to check.
- * @return Collection of nodes with version equal or greater than {@code ver}.
- */
- Collection<ClusterNode> elderNodes(IgniteProductVersion ver) {
- Map.Entry<IgniteProductVersion, Collection<ClusterNode>> entry = nodesByVer.ceilingEntry(ver);
-
- if (entry == null)
- return Collections.emptyList();
-
- return entry.getValue();
- }
-
- /**
- * @return Versions map.
- */
- NavigableMap<IgniteProductVersion, Collection<ClusterNode>> versionsMap() {
- return nodesByVer;
- }
-
- /**
* Gets collection of nodes with at least one cache configured.
*
* @param topVer Topology version (maximum allowed node order).
@@ -2766,61 +2701,50 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
* @return Collection of nodes.
*/
Collection<ClusterNode> cacheNodes(@Nullable String cacheName, final long topVer) {
- return filter(topVer, allCacheNodes.get(cacheName));
+ return filter(topVer, allCacheNodes.get(CU.cacheId(cacheName)));
}
/**
- * Gets all remote nodes that have at least one cache configured.
+ * Gets all nodes that have cache with given ID.
*
+ * @param cacheId Cache ID.
* @param topVer Topology version.
* @return Collection of nodes.
*/
- Collection<ClusterNode> remoteCacheNodes(final long topVer) {
- return filter(topVer, rmtNodesWithCaches);
+ Collection<ClusterNode> cacheNodes(Integer cacheId, final long topVer) {
+ return filter(topVer, allCacheNodes.get(cacheId));
}
/**
- * Gets all nodes that have cache with given name and should participate in affinity calculation. With
- * partitioned cache nodes with near-only cache do not participate in affinity node calculation.
- *
- * @param cacheName Cache name.
- * @param topVer Topology version.
- * @return Collection of nodes.
- */
- Collection<ClusterNode> cacheAffinityNodes(@Nullable String cacheName, final long topVer) {
- return filter(topVer, affCacheNodes.get(cacheName));
- }
-
- /**
- * Gets all alive nodes that have cache with given name.
+ * Gets all remote nodes that have at least one cache configured.
*
- * @param cacheName Cache name.
* @param topVer Topology version.
* @return Collection of nodes.
*/
- Collection<ClusterNode> aliveCacheNodes(@Nullable String cacheName, final long topVer) {
- return filter(topVer, aliveCacheNodes.get(maskNull(cacheName)));
+ Collection<ClusterNode> remoteCacheNodes(final long topVer) {
+ return filter(topVer, rmtNodesWithCaches);
}
/**
- * Gets all alive remote nodes that have cache with given name.
+ * Gets all nodes that have cache with given ID and should participate in affinity calculation. With
+ * partitioned cache nodes with near-only cache do not participate in affinity node calculation.
*
- * @param cacheName Cache name.
+ * @param cacheId Cache ID.
* @param topVer Topology version.
* @return Collection of nodes.
*/
- Collection<ClusterNode> aliveRemoteCacheNodes(@Nullable String cacheName, final long topVer) {
- return filter(topVer, aliveRmtCacheNodes.get(maskNull(cacheName)));
+ Collection<ClusterNode> cacheAffinityNodes(int cacheId, final long topVer) {
+ return filter(topVer, affCacheNodes.get(cacheId));
}
/**
- * Checks if cache with given name has at least one node with near cache enabled.
+ * Checks if cache with given ID has at least one node with near cache enabled.
*
- * @param cacheName Cache name.
+ * @param cacheId Cache ID.
* @return {@code True} if cache with given name has at least one node with near cache enabled.
*/
- boolean hasNearCache(@Nullable String cacheName) {
- return nearEnabledCaches.contains(cacheName);
+ boolean hasNearCache(int cacheId) {
+ return nearEnabledCaches.contains(cacheId);
}
/**
@@ -2832,51 +2756,10 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
if (leftNode.order() > maxOrder)
return;
- filterNodeMap(aliveCacheNodes, leftNode);
-
- filterNodeMap(aliveRmtCacheNodes, leftNode);
-
aliveSrvNodesWithCaches.remove(leftNode);
}
/**
- * Creates a copy of nodes map without the given node.
- *
- * @param map Map to copy.
- * @param exclNode Node to exclude.
- */
- private void filterNodeMap(ConcurrentMap<String, Collection<ClusterNode>> map, final ClusterNode exclNode) {
- for (String cacheName : registeredCaches.keySet()) {
- String maskedName = maskNull(cacheName);
-
- while (true) {
- Collection<ClusterNode> oldNodes = map.get(maskedName);
-
- if (oldNodes == null || oldNodes.isEmpty())
- break;
-
- Collection<ClusterNode> newNodes = new ArrayList<>(oldNodes);
-
- if (!newNodes.remove(exclNode))
- break;
-
- if (map.replace(maskedName, oldNodes, newNodes))
- break;
- }
- }
- }
-
- /**
- * Replaces {@code null} with {@code NULL_CACHE_NAME}.
- *
- * @param cacheName Cache name.
- * @return Masked name.
- */
- private String maskNull(@Nullable String cacheName) {
- return cacheName == null ? NULL_CACHE_NAME : cacheName;
- }
-
- /**
* @param topVer Topology version.
* @param nodes Nodes.
* @return Filtered collection (potentially empty, but never {@code null}).
http://git-wip-us.apache.org/repos/asf/ignite/blob/0706f1af/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index 47abf2f..d0d2df5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -453,7 +453,7 @@ public class GridCacheUtils {
* that may have already left).
*/
public static Collection<ClusterNode> allNodes(GridCacheContext ctx, AffinityTopologyVersion topOrder) {
- return ctx.discovery().cacheNodes(ctx.namex(), topOrder);
+ return ctx.discovery().cacheNodes(ctx.cacheId(), topOrder);
}
/**
@@ -486,7 +486,7 @@ public class GridCacheUtils {
* @return All nodes on which cache with the same name is started.
*/
public static Collection<ClusterNode> affinityNodes(final GridCacheContext ctx) {
- return ctx.discovery().cacheAffinityNodes(ctx.namex(), AffinityTopologyVersion.NONE);
+ return ctx.discovery().cacheAffinityNodes(ctx.cacheId(), AffinityTopologyVersion.NONE);
}
/**
@@ -497,7 +497,7 @@ public class GridCacheUtils {
* @return Affinity nodes.
*/
public static Collection<ClusterNode> affinityNodes(GridCacheContext ctx, AffinityTopologyVersion topOrder) {
- return ctx.discovery().cacheAffinityNodes(ctx.namex(), topOrder);
+ return ctx.discovery().cacheAffinityNodes(ctx.cacheId(), topOrder);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/0706f1af/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/affinity/GridCacheAffinityImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/affinity/GridCacheAffinityImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/affinity/GridCacheAffinityImpl.java
index 9e85bad..5cba57e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/affinity/GridCacheAffinityImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/affinity/GridCacheAffinityImpl.java
@@ -196,7 +196,7 @@ public class GridCacheAffinityImpl<K, V> implements Affinity<K> {
int nodesCnt;
if (!cctx.isLocal())
- nodesCnt = cctx.discovery().cacheAffinityNodes(cctx.name(), topVer).size();
+ nodesCnt = cctx.discovery().cacheAffinityNodes(cctx.cacheId(), topVer).size();
else
nodesCnt = 1;
http://git-wip-us.apache.org/repos/asf/ignite/blob/0706f1af/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index 816132d..3f7fd0d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -89,7 +89,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
private volatile boolean stopping;
/** A future that will be completed when topology with version topVer will be ready to use. */
- private GridDhtTopologyFuture topReadyFut;
+ private volatile GridDhtTopologyFuture topReadyFut;
/** */
private final GridAtomicLong updateSeq = new GridAtomicLong(1);
@@ -217,16 +217,10 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
/** {@inheritDoc} */
@Override public GridDhtTopologyFuture topologyVersionFuture() {
- lock.readLock().lock();
-
- try {
- assert topReadyFut != null;
+ // TODO
+ assert topReadyFut != null;
- return topReadyFut;
- }
- finally {
- lock.readLock().unlock();
- }
+ return topReadyFut;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/0706f1af/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index 519d0fc..fcb6ae5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -1204,8 +1204,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
if (expVer.equals(curVer))
return false;
- Collection<ClusterNode> cacheNodes0 = ctx.discovery().cacheAffinityNodes(ctx.name(), expVer);
- Collection<ClusterNode> cacheNodes1 = ctx.discovery().cacheAffinityNodes(ctx.name(), curVer);
+ Collection<ClusterNode> cacheNodes0 = ctx.discovery().cacheAffinityNodes(ctx.cacheId(), expVer);
+ Collection<ClusterNode> cacheNodes1 = ctx.discovery().cacheAffinityNodes(ctx.cacheId(), curVer);
if (!cacheNodes0.equals(cacheNodes1) || ctx.affinity().affinityTopologyVersion().compareTo(curVer) < 0)
return true;
http://git-wip-us.apache.org/repos/asf/ignite/blob/0706f1af/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 1b4dcc9..b130ed9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -99,7 +99,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
private volatile boolean stopping;
/** A future that will be completed when topology with version topVer will be ready to use. */
- private GridDhtTopologyFuture topReadyFut;
+ private volatile GridDhtTopologyFuture topReadyFut;
/** */
private final GridAtomicLong updateSeq = new GridAtomicLong(1);
@@ -311,16 +311,9 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
/** {@inheritDoc} */
@Override public GridDhtTopologyFuture topologyVersionFuture() {
- lock.readLock().lock();
-
- try {
- assert topReadyFut != null;
+ assert topReadyFut != null;
- return topReadyFut;
- }
- finally {
- lock.readLock().unlock();
- }
+ return topReadyFut;
}
/** {@inheritDoc} */
@@ -752,6 +745,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
if (part != null)
list.add(part);
}
+
return list;
}
@@ -831,6 +825,9 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
List<ClusterNode> affNodes = affAssignment.get(p);
+ if (CU.cheatCache(cctx.cacheId()))
+ return affNodes;
+
lock.readLock().lock();
try {
http://git-wip-us.apache.org/repos/asf/ignite/blob/0706f1af/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 0e60ff4..e9c21cd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -23,7 +23,6 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -1684,6 +1683,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(), nodeId, req.futureVersion(),
ctx.deploymentEnabled());
+ res.partition(req.partition());
+
assert !req.returnValue() || (req.operation() == TRANSFORM || req.size() == 1);
GridDhtAtomicAbstractUpdateFuture dhtFut = null;
@@ -1704,7 +1705,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
try {
GridDhtPartitionTopology top = topology();
- top.readLock();
+ if (!CU.cheatCache(ctx.cacheId()))
+ top.readLock();
try {
if (top.stopping()) {
@@ -1737,7 +1739,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
if (ver == null) {
// Assign next version for update inside entries lock.
- ver = ctx.versions().next(top.topologyVersion());
+ ver = ctx.versions().next(top.topologyVersion(), req.partition());
if (hasNear)
res.nearVersion(ver);
@@ -1819,7 +1821,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
remap = true;
}
finally {
- top.readUnlock();
+ if (!CU.cheatCache(ctx.cacheId()))
+ top.readUnlock();
}
}
catch (GridCacheEntryRemovedException e) {
@@ -2344,7 +2347,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
AffinityTopologyVersion topVer = req.topologyVersion();
- boolean checkReaders = hasNear || ctx.discovery().hasNearCache(name(), topVer);
+ boolean checkReaders = hasNear || ctx.discovery().hasNearCache(ctx.cacheId(), topVer);
boolean readersOnly = false;
@@ -2579,7 +2582,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
AffinityTopologyVersion topVer = req.topologyVersion();
- boolean checkReaders = hasNear || ctx.discovery().hasNearCache(name(), topVer);
+ boolean checkReaders = hasNear || ctx.discovery().hasNearCache(ctx.cacheId(), topVer);
CacheStorePartialUpdateException storeErr = null;
@@ -2818,6 +2821,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
try {
GridDhtCacheEntry entry = entryExx(key, topVer);
+ if (CU.cheatCache(ctx.cacheId())) // TODO
+ return Collections.singletonList(entry);
+
GridUnsafe.monitorEnter(entry);
if (entry.obsolete())
@@ -2893,6 +2899,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
* @param topVer Topology version.
*/
private void unlockEntries(Collection<GridDhtCacheEntry> locked, AffinityTopologyVersion topVer) {
+ if (CU.cheatCache(ctx.cacheId()))
+ return;
+
// Process deleted entries before locks release.
assert ctx.deferredDelete() : this;
@@ -2904,7 +2913,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
for (GridCacheMapEntry entry : locked) {
if (entry != null && entry.deleted()) {
if (skip == null)
- skip = new HashSet<>(locked.size(), 1.0f);
+ skip = U.newHashSet(locked.size());
skip.add(entry.key());
}
@@ -3050,7 +3059,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
AffinityTopologyVersion topVer = updateReq.topologyVersion();
- Collection<ClusterNode> nodes = ctx.kernalContext().discovery().cacheAffinityNodes(name(), topVer);
+ Collection<ClusterNode> nodes = ctx.kernalContext().discovery().cacheAffinityNodes(ctx.cacheId(), topVer);
// We are on primary node for some key.
assert !nodes.isEmpty() : "Failed to find affinity nodes [name=" + name() + ", topVer=" + topVer +
http://git-wip-us.apache.org/repos/asf/ignite/blob/0706f1af/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
index bd231cf..f148cbb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
@@ -386,54 +386,58 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
/** {@inheritDoc} */
@Override protected void mapOnTopology() {
- cache.topology().readLock();
-
AffinityTopologyVersion topVer = null;
- try {
- if (cache.topology().stopping()) {
- onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " +
- cache.name()));
+ if (!CU.cheatCache(cctx.cacheId())) {
+ cache.topology().readLock();
- return;
- }
+ try {
+ if (cache.topology().stopping()) {
+ onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " +
+ cache.name()));
- GridDhtTopologyFuture fut = cache.topology().topologyVersionFuture();
+ return;
+ }
- if (fut.isDone()) {
- Throwable err = fut.validateCache(cctx);
+ GridDhtTopologyFuture fut = cache.topology().topologyVersionFuture();
- if (err != null) {
- onDone(err);
+ if (fut.isDone()) {
+ Throwable err = fut.validateCache(cctx);
- return;
- }
+ if (err != null) {
+ onDone(err);
- topVer = fut.topologyVersion();
- }
- else {
- if (waitTopFut) {
- assert !topLocked : this;
-
- fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
- @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
- cctx.kernalContext().closure().runLocalSafe(new Runnable() {
- @Override public void run() {
- mapOnTopology();
- }
- });
- }
- });
+ return;
+ }
+
+ topVer = fut.topologyVersion();
}
- else
- onDone(new GridCacheTryPutFailedException());
+ else {
+ if (waitTopFut) {
+ assert !topLocked : this;
+
+ fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+ @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
+ cctx.kernalContext().closure().runLocalSafe(new Runnable() {
+ @Override public void run() {
+ mapOnTopology();
+ }
+ });
+ }
+ });
+ }
+ else
+ onDone(new GridCacheTryPutFailedException());
- return;
+ return;
+ }
+ }
+ finally {
+ cache.topology().readUnlock();
}
}
- finally {
- cache.topology().readUnlock();
- }
+ else
+ topVer = cache.topology().topologyVersionFuture().topologyVersion();
map(topVer);
}
@@ -461,7 +465,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
updVer = this.updVer;
if (updVer == null) {
- updVer = cctx.versions().next(topVer);
+ updVer = futVer;
if (log.isDebugEnabled())
log.debug("Assigned fast-map version for update on near node: " + updVer);
http://git-wip-us.apache.org/repos/asf/ignite/blob/0706f1af/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index cd64117..0b917e9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -495,54 +495,58 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
/** {@inheritDoc} */
@Override protected void mapOnTopology() {
- cache.topology().readLock();
-
AffinityTopologyVersion topVer = null;
- try {
- if (cache.topology().stopping()) {
- onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " +
- cache.name()));
+ if (!CU.cheatCache(cctx.cacheId())) {
+ cache.topology().readLock();
- return;
- }
+ try {
+ if (cache.topology().stopping()) {
+ onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " +
+ cache.name()));
- GridDhtTopologyFuture fut = cache.topology().topologyVersionFuture();
+ return;
+ }
- if (fut.isDone()) {
- Throwable err = fut.validateCache(cctx);
+ GridDhtTopologyFuture fut = cache.topology().topologyVersionFuture();
- if (err != null) {
- onDone(err);
+ if (fut.isDone()) {
+ Throwable err = fut.validateCache(cctx);
- return;
- }
+ if (err != null) {
+ onDone(err);
- topVer = fut.topologyVersion();
- }
- else {
- if (waitTopFut) {
- assert !topLocked : this;
-
- fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
- @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
- cctx.kernalContext().closure().runLocalSafe(new Runnable() {
- @Override public void run() {
- mapOnTopology();
- }
- });
- }
- });
+ return;
+ }
+
+ topVer = fut.topologyVersion();
}
- else
- onDone(new GridCacheTryPutFailedException());
+ else {
+ if (waitTopFut) {
+ assert !topLocked : this;
+
+ fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+ @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
+ cctx.kernalContext().closure().runLocalSafe(new Runnable() {
+ @Override public void run() {
+ mapOnTopology();
+ }
+ });
+ }
+ });
+ }
+ else
+ onDone(new GridCacheTryPutFailedException());
- return;
+ return;
+ }
+ }
+ finally {
+ cache.topology().readUnlock();
}
}
- finally {
- cache.topology().readUnlock();
- }
+ else
+ topVer = cache.topology().topologyVersionFuture().topologyVersion();
map(topVer, null);
}
@@ -635,7 +639,8 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
updVer = this.updVer;
if (updVer == null) {
- updVer = cctx.versions().next(topVer);
+ //updVer = cctx.versions().next(topVer);
+ updVer = futVer;
if (log.isDebugEnabled())
log.debug("Assigned fast-map version for update on near node: " + updVer);
http://git-wip-us.apache.org/repos/asf/ignite/blob/0706f1af/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
index 2e38733..22e01ae 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
@@ -105,6 +105,9 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
/** Near expire times. */
private GridLongList nearExpireTimes;
+ /** Partition ID. */
+ private int partId = -1;
+
/**
* Empty constructor required by {@link Externalizable}.
*/
@@ -154,6 +157,13 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
}
/**
+ * @param partId Partition ID for proper striping on near node.
+ */
+ public void partition(int partId) {
+ this.partId = partId;
+ }
+
+ /**
* Sets update error.
*
* @param err Error.
@@ -431,6 +441,11 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
}
/** {@inheritDoc} */
+ @Override public int partition() {
+ return partId;
+ }
+
+ /** {@inheritDoc} */
@Override public boolean addDeploymentInfo() {
return addDepInfo;
}
@@ -510,12 +525,18 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
writer.incrementState();
case 12:
- if (!writer.writeCollection("remapKeys", remapKeys, MessageCollectionItemType.MSG))
+ if (!writer.writeInt("partId", partId))
return false;
writer.incrementState();
case 13:
+ if (!writer.writeCollection("remapKeys", remapKeys, MessageCollectionItemType.MSG))
+ return false;
+
+ writer.incrementState();
+
+ case 14:
if (!writer.writeMessage("ret", ret))
return false;
@@ -610,7 +631,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
reader.incrementState();
case 12:
- remapKeys = reader.readCollection("remapKeys", MessageCollectionItemType.MSG);
+ partId = reader.readInt("partId");
if (!reader.isLastRead())
return false;
@@ -618,6 +639,14 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
reader.incrementState();
case 13:
+ remapKeys = reader.readCollection("remapKeys", MessageCollectionItemType.MSG);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 14:
ret = reader.readMessage("ret");
if (!reader.isLastRead())
@@ -637,7 +666,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 14;
+ return 15;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/0706f1af/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersion.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersion.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersion.java
index 95aab74..ccc17e5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersion.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersion.java
@@ -354,4 +354,4 @@ public class GridCacheVersion implements Message, Comparable<GridCacheVersion>,
", order=" + order() +
", nodeOrder=" + nodeOrder() + ']';
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/0706f1af/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java
index 9be8b50..fea5ee9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.version;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicLongArray;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.events.DiscoveryEvent;
@@ -64,6 +65,9 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter {
private GridCacheVersion ISOLATED_STREAMER_VER;
/** */
+ private final AtomicLongArray orders = new AtomicLongArray(Runtime.getRuntime().availableProcessors());
+
+ /** */
private final GridLocalEventListener discoLsnr = new GridLocalEventListener() {
@Override public void onEvent(Event evt) {
assert evt.type() == EVT_NODE_METRICS_UPDATED;
@@ -195,7 +199,8 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter {
* @return Next version based on current topology.
*/
public GridCacheVersion next() {
- return next(cctx.kernalContext().discovery().topologyVersion(), true, false, dataCenterId);
+ return next(cctx.kernalContext().discovery().topologyVersion(), true, false, dataCenterId,
+ -1);
}
/**
@@ -203,7 +208,8 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter {
* @return Next version based on current topology with given data center id.
*/
public GridCacheVersion next(byte dataCenterId) {
- return next(cctx.kernalContext().discovery().topologyVersion(), true, false, dataCenterId);
+ return next(cctx.kernalContext().discovery().topologyVersion(), true, false, dataCenterId,
+ -1);
}
/**
@@ -215,7 +221,21 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter {
* @return Next version based on given topology version.
*/
public GridCacheVersion next(AffinityTopologyVersion topVer) {
- return next(topVer.topologyVersion(), true, false, dataCenterId);
+ return next(topVer.topologyVersion(), true, false, dataCenterId,
+ -1);
+ }
+
+ /**
+ * Gets next version based on given topology version. Given value should be
+ * real topology version calculated as number of grid topology changes and
+ * obtained from discovery manager.
+ *
+ * @param topVer Topology version for which new version should be obtained.
+ * @param part Partition.
+ * @return Next version based on given topology version.
+ */
+ public GridCacheVersion next(AffinityTopologyVersion topVer, int part) {
+ return next(topVer.topologyVersion(), true, false, dataCenterId, part);
}
/**
@@ -224,7 +244,8 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter {
* @return Next version for cache store operations.
*/
public GridCacheVersion nextForLoad() {
- return next(cctx.kernalContext().discovery().topologyVersion(), true, true, dataCenterId);
+ return next(cctx.kernalContext().discovery().topologyVersion(), true, true, dataCenterId,
+ -1);
}
/**
@@ -233,7 +254,8 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter {
* @return Next version for cache store operations.
*/
public GridCacheVersion nextForLoad(AffinityTopologyVersion topVer) {
- return next(topVer.topologyVersion(), true, true, dataCenterId);
+ return next(topVer.topologyVersion(), true, true, dataCenterId,
+ -1);
}
/**
@@ -242,7 +264,8 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter {
* @return Next version for cache store operations.
*/
public GridCacheVersion nextForLoad(GridCacheVersion ver) {
- return next(ver.topologyVersion(), false, true, dataCenterId);
+ return next(ver.topologyVersion(), false, true, dataCenterId,
+ -1);
}
/**
@@ -252,7 +275,8 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter {
* @return Next version based on given cache version.
*/
public GridCacheVersion next(GridCacheVersion ver) {
- return next(ver.topologyVersion(), false, false, dataCenterId);
+ return next(ver.topologyVersion(), false, false, dataCenterId,
+ -1);
}
/**
@@ -265,9 +289,16 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter {
* @param addTime If {@code true} then adds to the given topology version number of seconds
* from the start time of the first grid node.
* @param dataCenterId Data center id.
+ * @param part
* @return New lock order.
*/
- private GridCacheVersion next(long topVer, boolean addTime, boolean forLoad, byte dataCenterId) {
+ private GridCacheVersion next(
+ long topVer,
+ boolean addTime,
+ boolean forLoad,
+ byte dataCenterId,
+ int part
+ ) {
if (topVer == -1)
topVer = cctx.kernalContext().discovery().topologyVersion();
@@ -282,7 +313,8 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter {
int locNodeOrder = (int)cctx.localNode().order();
- long ord = forLoad ? loadOrder.incrementAndGet() : order.incrementAndGet();
+ long ord = forLoad ? loadOrder.incrementAndGet() :
+ part == -1 ? order.incrementAndGet() : (orders.incrementAndGet(part % orders.length()) | (((long)part) << 48 ));
GridCacheVersion next = new GridCacheVersion(
(int)topVer,
@@ -291,7 +323,7 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter {
locNodeOrder,
dataCenterId);
- last = next;
+ // last = next; // TODO
return next;
}
@@ -304,4 +336,4 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter {
public GridCacheVersion last() {
return last;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/0706f1af/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java
index 0764316..36178f3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java
@@ -273,31 +273,33 @@ public class GridClockSyncProcessor extends GridProcessorAdapter {
* @return Adjusted time.
*/
public long adjustedTime(long topVer) {
- T2<GridClockDeltaVersion, GridClockDeltaSnapshot> fastSnap = lastSnapshot;
-
- GridClockDeltaSnapshot snap;
-
- if (fastSnap != null && fastSnap.get1().topologyVersion() == topVer)
- snap = fastSnap.get2();
- else {
- // Get last synchronized time on given topology version.
- Map.Entry<GridClockDeltaVersion, GridClockDeltaSnapshot> entry = timeSyncHistory().lowerEntry(
- new GridClockDeltaVersion(0, topVer + 1));
-
- snap = entry == null ? null : entry.getValue();
- }
-
- long now = clockSrc.currentTimeMillis();
-
- if (snap == null)
- return now;
-
- Long delta = snap.deltas().get(ctx.localNodeId());
-
- if (delta == null)
- delta = 0L;
-
- return now + delta;
+// T2<GridClockDeltaVersion, GridClockDeltaSnapshot> fastSnap = lastSnapshot;
+//
+// GridClockDeltaSnapshot snap;
+//
+// if (fastSnap != null && fastSnap.get1().topologyVersion() == topVer)
+// snap = fastSnap.get2();
+// else {
+// // Get last synchronized time on given topology version.
+// Map.Entry<GridClockDeltaVersion, GridClockDeltaSnapshot> entry = timeSyncHistory().lowerEntry(
+// new GridClockDeltaVersion(0, topVer + 1));
+//
+// snap = entry == null ? null : entry.getValue();
+// }
+//
+// long now = clockSrc.currentTimeMillis();
+//
+// if (snap == null)
+// return now;
+//
+// Long delta = snap.deltas().get(ctx.localNodeId());
+//
+// if (delta == null)
+// delta = 0L;
+//
+// return now + delta;
+
+ return System.currentTimeMillis();
}
/**
@@ -478,4 +480,4 @@ public class GridClockSyncProcessor extends GridProcessorAdapter {
curr.onNodeLeft(nodeId);
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/0706f1af/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerAliveCacheSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerAliveCacheSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerAliveCacheSelfTest.java
index 31b4bc7..03cbdbd 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerAliveCacheSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerAliveCacheSelfTest.java
@@ -23,16 +23,11 @@ import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.ignite.Ignite;
-import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.NearCacheConfiguration;
import org.apache.ignite.events.Event;
import org.apache.ignite.events.EventType;
-import org.apache.ignite.internal.IgniteKernal;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
-import org.apache.ignite.internal.processors.cache.GridCacheUtils;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.lang.IgnitePredicate;
@@ -158,8 +153,6 @@ public class GridDiscoveryManagerAliveCacheSelfTest extends GridCommonAbstractTe
stopTempNodes();
latch.await();
-
- validateAlives();
}
}
@@ -200,55 +193,6 @@ public class GridDiscoveryManagerAliveCacheSelfTest extends GridCommonAbstractTe
}
/**
- * Validates that all node collections contain actual information.
- */
- @SuppressWarnings("SuspiciousMethodCalls")
- private void validateAlives() {
- for (Ignite g : alive) {
- log.info("Validate node: " + g.name());
-
- assertEquals("Unexpected nodes number for node: " + g.name(), PERM_NODES_CNT, g.cluster().nodes().size());
- }
-
- for (final Ignite g : alive) {
- IgniteKernal k = (IgniteKernal)g;
-
- GridDiscoveryManager discoMgr = k.context().discovery();
-
- final Collection<ClusterNode> currTop = g.cluster().nodes();
-
- long currVer = discoMgr.topologyVersion();
-
- long startVer = discoMgr.localNode().order();
-
- for (long v = currVer; v > currVer - GridDiscoveryManager.DISCOVERY_HISTORY_SIZE && v >= startVer; v--) {
- F.forAll(discoMgr.aliveCacheNodes(null, new AffinityTopologyVersion(v)),
- new IgnitePredicate<ClusterNode>() {
- @Override public boolean apply(ClusterNode e) {
- return currTop.contains(e);
- }
- });
-
- F.forAll(discoMgr.aliveRemoteCacheNodes(null, new AffinityTopologyVersion(v)),
- new IgnitePredicate<ClusterNode>() {
- @Override public boolean apply(ClusterNode e) {
- return currTop.contains(e) || g.cluster().localNode().equals(e);
- }
- });
-
- GridCacheSharedContext<?, ?> ctx = k.context().cache().context();
-
- ClusterNode oldest =
- ctx.discovery().oldestAliveCacheServerNode(new AffinityTopologyVersion(currVer));
-
- assertNotNull(oldest);
-
- assertTrue(currTop.contains(oldest));
- }
- }
- }
-
- /**
* Starts temporary nodes.
*
* @throws Exception If failed.
@@ -293,4 +237,4 @@ public class GridDiscoveryManagerAliveCacheSelfTest extends GridCommonAbstractTe
G.stop(g.name(), false);
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/0706f1af/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerSelfTest.java
index c9179d4..230a3bc 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerSelfTest.java
@@ -211,4 +211,4 @@ public abstract class GridDiscoveryManagerSelfTest extends GridCommonAbstractTes
public static class ClientDiscovery extends GridDiscoveryManagerSelfTest {
// No-op.
}
-}
\ No newline at end of file
+}