You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/03/02 11:31:10 UTC
[33/50] [abbrv] ignite git commit: Performance optimizations.
Performance optimizations.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b02ad0de
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b02ad0de
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b02ad0de
Branch: refs/heads/ignite-4565-ddl
Commit: b02ad0deaae78424356f9a4b1748fc43b21eac03
Parents: 7cb3e68
Author: yzhdanov <yz...@apache.org>
Authored: Fri Feb 17 14:10:20 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Feb 17 14:10:20 2017 +0300
----------------------------------------------------------------------
.../ignite/internal/GridKernalGatewayImpl.java | 25 +-
.../org/apache/ignite/internal/GridTopic.java | 2 +-
.../client/util/GridClientConsistentHash.java | 14 +-
.../managers/communication/GridIoManager.java | 29 ++-
.../discovery/GridDiscoveryManager.java | 258 +++++--------------
.../eventstorage/GridEventStorageManager.java | 34 ++-
.../affinity/GridAffinityAssignmentCache.java | 8 +-
.../cache/CacheAffinitySharedManager.java | 2 +-
.../cache/GridCacheEvictionManager.java | 60 +++--
.../processors/cache/GridCacheGateway.java | 48 ++--
.../processors/cache/GridCacheIoManager.java | 19 +-
.../processors/cache/GridCacheUtils.java | 6 +-
.../cache/affinity/GridCacheAffinityImpl.java | 2 +-
.../dht/GridClientPartitionTopology.java | 13 +-
.../dht/GridDhtAssignmentFetchFuture.java | 5 +-
.../distributed/dht/GridDhtCacheAdapter.java | 4 +-
.../dht/GridDhtPartitionTopologyImpl.java | 14 +-
.../GridDhtAtomicAbstractUpdateFuture.java | 7 +-
.../GridDhtAtomicAbstractUpdateRequest.java | 2 +-
.../dht/atomic/GridDhtAtomicCache.java | 16 +-
.../atomic/GridDhtAtomicSingleUpdateFuture.java | 6 +-
.../GridDhtAtomicSingleUpdateRequest.java | 5 +-
.../dht/atomic/GridDhtAtomicUpdateFuture.java | 6 +-
.../dht/atomic/GridDhtAtomicUpdateRequest.java | 10 +-
.../dht/atomic/GridDhtAtomicUpdateResponse.java | 31 ++-
.../GridNearAtomicSingleUpdateFuture.java | 7 +-
.../dht/atomic/GridNearAtomicUpdateFuture.java | 7 +-
.../atomic/GridNearAtomicUpdateResponse.java | 35 ++-
.../dht/preloader/GridDhtPreloader.java | 39 ++-
.../near/GridNearSingleGetRequest.java | 5 +
.../cache/transactions/IgniteTxHandler.java | 4 +-
.../cache/version/GridCacheVersion.java | 2 +-
.../cache/version/GridCacheVersionManager.java | 2 +-
.../clock/GridClockSyncProcessor.java | 2 +-
.../ignite/internal/util/GridBusyLock.java | 2 +-
.../util/StripedCompositeReadWriteLock.java | 50 +++-
.../ignite/internal/util/StripedExecutor.java | 10 +-
.../nio/GridAbstractCommunicationClient.java | 37 +--
.../util/nio/GridCommunicationClient.java | 5 -
.../communication/tcp/TcpCommunicationSpi.java | 9 +
.../org/apache/ignite/thread/IgniteThread.java | 26 +-
.../GridDiscoveryManagerAliveCacheSelfTest.java | 55 ----
.../discovery/GridDiscoveryManagerSelfTest.java | 214 ---------------
.../testsuites/IgniteKernalSelfTestSuite.java | 5 +-
.../ignite/tools/classgen/ClassesGenerator.java | 4 +-
45 files changed, 462 insertions(+), 684 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/b02ad0de/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java
index fe8c580..7cbf84a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java
@@ -22,9 +22,11 @@ import java.io.Serializable;
import java.io.StringWriter;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteClientDisconnectedException;
-import org.apache.ignite.internal.util.GridSpinReadWriteLock;
+import org.apache.ignite.internal.util.StripedCompositeReadWriteLock;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.future.IgniteFutureImpl;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
@@ -40,7 +42,8 @@ public class GridKernalGatewayImpl implements GridKernalGateway, Serializable {
/** */
@GridToStringExclude
- private final GridSpinReadWriteLock rwLock = new GridSpinReadWriteLock();
+ private final ReadWriteLock rwLock =
+ new StripedCompositeReadWriteLock(Runtime.getRuntime().availableProcessors());
/** */
@GridToStringExclude
@@ -73,13 +76,15 @@ public class GridKernalGatewayImpl implements GridKernalGateway, Serializable {
if (stackTrace == null)
stackTrace = stackTrace();
- rwLock.readLock();
+ Lock lock = rwLock.readLock();
+
+ lock.lock();
GridKernalState state = this.state.get();
if (state != GridKernalState.STARTED) {
// Unlock just acquired lock.
- rwLock.readUnlock();
+ lock.unlock();
if (state == GridKernalState.DISCONNECTED) {
assert reconnectFut != null;
@@ -96,7 +101,7 @@ public class GridKernalGatewayImpl implements GridKernalGateway, Serializable {
if (stackTrace == null)
stackTrace = stackTrace();
- rwLock.readLock();
+ rwLock.readLock().lock();
if (state.get() == GridKernalState.DISCONNECTED)
throw new IgniteClientDisconnectedException(reconnectFut, "Client node disconnected: " + gridName);
@@ -104,7 +109,7 @@ public class GridKernalGatewayImpl implements GridKernalGateway, Serializable {
/** {@inheritDoc} */
@Override public void readUnlock() {
- rwLock.readUnlock();
+ rwLock.readLock().unlock();
}
/** {@inheritDoc} */
@@ -118,7 +123,7 @@ public class GridKernalGatewayImpl implements GridKernalGateway, Serializable {
// Busy wait is intentional.
while (true)
try {
- if (rwLock.tryWriteLock(200, TimeUnit.MILLISECONDS))
+ if (rwLock.writeLock().tryLock(200, TimeUnit.MILLISECONDS))
break;
else
Thread.sleep(200);
@@ -135,7 +140,7 @@ public class GridKernalGatewayImpl implements GridKernalGateway, Serializable {
/** {@inheritDoc} */
@Override public boolean tryWriteLock(long timeout) throws InterruptedException {
- boolean acquired = rwLock.tryWriteLock(timeout, TimeUnit.MILLISECONDS);
+ boolean acquired = rwLock.writeLock().tryLock(timeout, TimeUnit.MILLISECONDS);
if (acquired) {
if (stackTrace == null)
@@ -194,7 +199,7 @@ public class GridKernalGatewayImpl implements GridKernalGateway, Serializable {
/** {@inheritDoc} */
@Override public void writeUnlock() {
- rwLock.writeUnlock();
+ rwLock.writeLock().unlock();
}
/** {@inheritDoc} */
@@ -222,4 +227,4 @@ public class GridKernalGatewayImpl implements GridKernalGateway, Serializable {
@Override public String toString() {
return S.toString(GridKernalGatewayImpl.class, this);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b02ad0de/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
index 2962540..c2e0452 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
@@ -769,4 +769,4 @@ public enum GridTopic {
return S.toString(T8.class, this);
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b02ad0de/modules/core/src/main/java/org/apache/ignite/internal/client/util/GridClientConsistentHash.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/util/GridClientConsistentHash.java b/modules/core/src/main/java/org/apache/ignite/internal/client/util/GridClientConsistentHash.java
index 8134906..0c9a3fb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/util/GridClientConsistentHash.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/util/GridClientConsistentHash.java
@@ -439,13 +439,9 @@ public class GridClientConsistentHash<N> {
/** {@inheritDoc} */
@Override public String toString() {
- StringBuilder sb = new StringBuilder(getClass().getSimpleName());
-
- sb.append(" [affSeed=").append(affSeed).
- append(", circle=").append(circle).
- append(", nodesComp=").append(nodesComp).
- append(", nodes=").append(nodes).append("]");
-
- return sb.toString();
+ return getClass().getSimpleName() + " [affSeed=" + affSeed +
+ ", circle=" + circle +
+ ", nodesComp=" + nodesComp +
+ ", nodes=" + nodes + "]";
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b02ad0de/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 84b4543..108ecd5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -35,6 +35,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.ignite.IgniteCheckedException;
@@ -57,7 +58,7 @@ import org.apache.ignite.internal.processors.platform.message.PlatformMessageFil
import org.apache.ignite.internal.processors.pool.PoolProcessor;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashSet;
-import org.apache.ignite.internal.util.GridSpinReadWriteLock;
+import org.apache.ignite.internal.util.StripedCompositeReadWriteLock;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridTuple3;
@@ -160,7 +161,8 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
private final Marshaller marsh;
/** Busy lock. */
- private final GridSpinReadWriteLock busyLock = new GridSpinReadWriteLock();
+ private final ReadWriteLock busyLock =
+ new StripedCompositeReadWriteLock(Runtime.getRuntime().availableProcessors());
/** Lock to sync maps access. */
private final ReadWriteLock lock = new ReentrantReadWriteLock();
@@ -577,7 +579,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
// Busy wait is intentional.
while (true) {
try {
- if (busyLock.tryWriteLock(200, TimeUnit.MILLISECONDS))
+ if (busyLock.writeLock().tryLock(200, TimeUnit.MILLISECONDS))
break;
else
Thread.sleep(200);
@@ -601,7 +603,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
stopping = true;
}
finally {
- busyLock.writeUnlock();
+ busyLock.writeLock().unlock();
}
}
@@ -623,7 +625,9 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
assert nodeId != null;
assert msg != null;
- busyLock.readLock();
+ Lock busyLock0 = busyLock.readLock();
+
+ busyLock0.lock();
try {
if (stopping) {
@@ -712,7 +716,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
U.error(log, "Failed to process message (will ignore): " + msg, e);
}
finally {
- busyLock.readUnlock();
+ busyLock0.unlock();
}
}
@@ -798,11 +802,12 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
if (msg.topicOrdinal() == TOPIC_IO_TEST.ordinal()) {
IgniteIoTestMessage msg0 = (IgniteIoTestMessage)msg.message();
- if (msg0.processFromNioThread()) {
+ if (msg0.processFromNioThread())
c.run();
+ else
+ ctx.getStripedExecutorService().execute(-1, c);
- return;
- }
+ return;
}
if (ctx.config().getStripedPoolSize() > 0 &&
@@ -2173,7 +2178,9 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
return;
}
- busyLock.readLock();
+ Lock lock = busyLock.readLock();
+
+ lock.lock();
try {
if (stopping) {
@@ -2251,7 +2258,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
}
}
finally {
- busyLock.readUnlock();
+ lock.unlock();
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b02ad0de/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 71d8ad9..9ea707d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.managers.discovery;
-import java.io.Externalizable;
import java.io.Serializable;
import java.lang.management.GarbageCollectorMXBean;
import java.lang.management.ManagementFactory;
@@ -74,6 +73,7 @@ import org.apache.ignite.internal.managers.communication.GridIoManager;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheAffinitySharedManager;
+import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.jobmetrics.GridJobMetrics;
import org.apache.ignite.internal.processors.security.SecurityContext;
@@ -144,9 +144,6 @@ import static org.apache.ignite.plugin.segmentation.SegmentationPolicy.NOOP;
* Discovery SPI manager.
*/
public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
- /** Fake key for {@code null}-named caches. Used inside {@link DiscoCache}. */
- private static final String NULL_CACHE_NAME = UUID.randomUUID().toString();
-
/** Metrics update frequency. */
private static final long METRICS_UPDATE_FREQ = 3000;
@@ -1577,7 +1574,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
* @return Collection of cache nodes.
*/
public Collection<ClusterNode> nodes(AffinityTopologyVersion topVer) {
- return resolveDiscoCache(null, topVer).allNodes();
+ return resolveDiscoCache(CU.cacheId(null), topVer).allNodes();
}
/**
@@ -1585,7 +1582,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
* @return All server nodes for given topology version.
*/
public List<ClusterNode> serverNodes(AffinityTopologyVersion topVer) {
- return resolveDiscoCache(null, topVer).srvNodes;
+ return resolveDiscoCache(CU.cacheId(null), topVer).srvNodes;
}
/**
@@ -1596,7 +1593,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
* @return Node.
*/
public ClusterNode node(AffinityTopologyVersion topVer, UUID id) {
- return resolveDiscoCache(null, topVer).node(id);
+ return resolveDiscoCache(CU.cacheId(null), topVer).node(id);
}
/**
@@ -1607,49 +1604,38 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
* @return Collection of cache nodes.
*/
public Collection<ClusterNode> cacheNodes(@Nullable String cacheName, AffinityTopologyVersion topVer) {
- return resolveDiscoCache(cacheName, topVer).cacheNodes(cacheName, topVer.topologyVersion());
+ return resolveDiscoCache(CU.cacheId(cacheName), topVer).cacheNodes(cacheName, topVer.topologyVersion());
}
/**
- * Gets all nodes with at least one cache configured.
+ * Gets cache nodes for cache with given ID.
*
+ * @param cacheId Cache ID.
* @param topVer Topology version.
* @return Collection of cache nodes.
*/
- public Collection<ClusterNode> cacheNodes(AffinityTopologyVersion topVer) {
- return resolveDiscoCache(null, topVer).allNodesWithCaches(topVer.topologyVersion());
+ public Collection<ClusterNode> cacheNodes(int cacheId, AffinityTopologyVersion topVer) {
+ return resolveDiscoCache(cacheId, topVer).cacheNodes(cacheId, topVer.topologyVersion());
}
/**
- * Gets cache remote nodes for cache with given name.
- *
- * @param topVer Topology version.
- * @return Collection of cache nodes.
- */
- public Collection<ClusterNode> remoteCacheNodes(AffinityTopologyVersion topVer) {
- return resolveDiscoCache(null, topVer).remoteCacheNodes(topVer.topologyVersion());
- }
-
- /**
- * Gets cache nodes for cache with given name.
+ * Gets all nodes with at least one cache configured.
*
- * @param cacheName Cache name.
* @param topVer Topology version.
* @return Collection of cache nodes.
*/
- Collection<ClusterNode> aliveCacheNodes(@Nullable String cacheName, AffinityTopologyVersion topVer) {
- return resolveDiscoCache(cacheName, topVer).aliveCacheNodes(cacheName, topVer.topologyVersion());
+ public Collection<ClusterNode> cacheNodes(AffinityTopologyVersion topVer) {
+ return resolveDiscoCache(CU.cacheId(null), topVer).allNodesWithCaches(topVer.topologyVersion());
}
/**
* Gets cache remote nodes for cache with given name.
*
- * @param cacheName Cache name.
* @param topVer Topology version.
* @return Collection of cache nodes.
*/
- Collection<ClusterNode> aliveRemoteCacheNodes(@Nullable String cacheName, AffinityTopologyVersion topVer) {
- return resolveDiscoCache(cacheName, topVer).aliveRemoteCacheNodes(cacheName, topVer.topologyVersion());
+ public Collection<ClusterNode> remoteCacheNodes(AffinityTopologyVersion topVer) {
+ return resolveDiscoCache(CU.cacheId(null), topVer).remoteCacheNodes(topVer.topologyVersion());
}
/**
@@ -1657,7 +1643,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
* @return Oldest alive server nodes with at least one cache configured.
*/
@Nullable public ClusterNode oldestAliveCacheServerNode(AffinityTopologyVersion topVer) {
- DiscoCache cache = resolveDiscoCache(null, topVer);
+ DiscoCache cache = resolveDiscoCache(CU.cacheId(null), topVer);
Map.Entry<ClusterNode, Boolean> e = cache.aliveSrvNodesWithCaches.firstEntry();
@@ -1672,7 +1658,20 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
* @return Collection of cache affinity nodes.
*/
public Collection<ClusterNode> cacheAffinityNodes(@Nullable String cacheName, AffinityTopologyVersion topVer) {
- return resolveDiscoCache(cacheName, topVer).cacheAffinityNodes(cacheName, topVer.topologyVersion());
+ int cacheId = CU.cacheId(cacheName);
+
+ return resolveDiscoCache(cacheId, topVer).cacheAffinityNodes(cacheId, topVer.topologyVersion());
+ }
+
+ /**
+ * Gets cache nodes for cache with given ID that participate in affinity calculation.
+ *
+ * @param cacheId Cache ID.
+ * @param topVer Topology version.
+ * @return Collection of cache affinity nodes.
+ */
+ public Collection<ClusterNode> cacheAffinityNodes(int cacheId, AffinityTopologyVersion topVer) {
+ return resolveDiscoCache(cacheId, topVer).cacheAffinityNodes(cacheId, topVer.topologyVersion());
}
/**
@@ -1742,31 +1741,34 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
}
/**
- * Checks if cache with given name has at least one node with near cache enabled.
+ * Checks if cache with given ID has at least one node with near cache enabled.
*
- * @param cacheName Cache name.
+ * @param cacheId Cache ID.
* @param topVer Topology version.
* @return {@code True} if cache with given name has at least one node with near cache enabled.
*/
- public boolean hasNearCache(@Nullable String cacheName, AffinityTopologyVersion topVer) {
- return resolveDiscoCache(cacheName, topVer).hasNearCache(cacheName);
+ public boolean hasNearCache(int cacheId, AffinityTopologyVersion topVer) {
+ return resolveDiscoCache(cacheId, topVer).hasNearCache(cacheId);
}
/**
* Gets discovery cache for given topology version.
*
- * @param cacheName Cache name (participates in exception message).
+ * @param cacheId Cache ID (participates in exception message).
* @param topVer Topology version.
* @return Discovery cache.
*/
- private DiscoCache resolveDiscoCache(@Nullable String cacheName, AffinityTopologyVersion topVer) {
+ private DiscoCache resolveDiscoCache(int cacheId, AffinityTopologyVersion topVer) {
Snapshot snap = topSnap.get();
DiscoCache cache = AffinityTopologyVersion.NONE.equals(topVer) || topVer.equals(snap.topVer) ?
snap.discoCache : discoCacheHist.get(topVer);
if (cache == null) {
- throw new IgniteException("Failed to resolve nodes topology [cacheName=" + cacheName +
+ DynamicCacheDescriptor desc = ctx.cache().cacheDescriptor(cacheId);
+
+ throw new IgniteException("Failed to resolve nodes topology [" +
+ "cacheName=" + (desc != null ? desc.cacheConfiguration().getName() : "N/A") +
", topVer=" + topVer +
", history=" + discoCacheHist.keySet() +
", snap=" + snap +
@@ -2093,19 +2095,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
evts.add(new GridTuple5<>(type, topVer, node, topSnapshot, data));
}
- /**
- * @param node Node to get a short description for.
- * @return Short description for the node to be used in 'quiet' mode.
- */
- private String quietNode(ClusterNode node) {
- assert node != null;
-
- return "nodeId8=" + node.id().toString().substring(0, 8) + ", " +
- "addrs=" + U.addressesAsString(node) + ", " +
- "order=" + node.order() + ", " +
- "CPUs=" + node.metrics().getTotalCpus();
- }
-
/** {@inheritDoc} */
@Override protected void body() throws InterruptedException {
while (!isCancelled()) {
@@ -2415,11 +2404,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
/** Topology await version. */
private long awaitVer;
- /** Empty constructor required by {@link Externalizable}. */
- private DiscoTopologyFuture() {
- // No-op.
- }
-
/**
* @param ctx Context.
* @param awaitVer Await version.
@@ -2509,19 +2493,15 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
/** Cache nodes by cache name. */
@GridToStringInclude
- private final Map<String, Collection<ClusterNode>> allCacheNodes;
-
- /** Remote cache nodes by cache name. */
- @GridToStringInclude
- private final Map<String, Collection<ClusterNode>> rmtCacheNodes;
+ private final Map<Integer, Collection<ClusterNode>> allCacheNodes;
/** Cache nodes by cache name. */
@GridToStringInclude
- private final Map<String, Collection<ClusterNode>> affCacheNodes;
+ private final Map<Integer, Collection<ClusterNode>> affCacheNodes;
/** Caches where at least one node has near cache enabled. */
@GridToStringInclude
- private final Set<String> nearEnabledCaches;
+ private final Set<Integer> nearEnabledCaches;
/** Nodes grouped by version. */
private final NavigableMap<IgniteProductVersion, Collection<ClusterNode>> nodesByVer;
@@ -2539,18 +2519,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
private final long maxOrder;
/**
- * Cached alive nodes list. As long as this collection doesn't accept {@code null}s use {@link
- * #maskNull(String)} before passing raw cache names to it.
- */
- private final ConcurrentMap<String, Collection<ClusterNode>> aliveCacheNodes;
-
- /**
- * Cached alive remote nodes list. As long as this collection doesn't accept {@code null}s use {@link
- * #maskNull(String)} before passing raw cache names to it.
- */
- private final ConcurrentMap<String, Collection<ClusterNode>> aliveRmtCacheNodes;
-
- /**
* Cached alive server remote nodes with caches.
*/
private final ConcurrentSkipListMap<ClusterNode, Boolean> aliveSrvNodesWithCaches;
@@ -2578,20 +2546,17 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
allNodes = Collections.unmodifiableList(all);
- Map<String, Collection<ClusterNode>> cacheMap = new HashMap<>(allNodes.size(), 1.0f);
- Map<String, Collection<ClusterNode>> rmtCacheMap = new HashMap<>(allNodes.size(), 1.0f);
- Map<String, Collection<ClusterNode>> dhtNodesMap = new HashMap<>(allNodes.size(), 1.0f);
- Collection<ClusterNode> nodesWithCaches = new HashSet<>(allNodes.size());
- Collection<ClusterNode> rmtNodesWithCaches = new HashSet<>(allNodes.size());
+ Map<Integer, Collection<ClusterNode>> cacheMap = U.newHashMap(allNodes.size());
+ Map<Integer, Collection<ClusterNode>> dhtNodesMap = U.newHashMap(allNodes.size());
+ Collection<ClusterNode> nodesWithCaches = U.newHashSet(allNodes.size());
+ Collection<ClusterNode> rmtNodesWithCaches = U.newHashSet(allNodes.size());
- aliveCacheNodes = new ConcurrentHashMap8<>(allNodes.size(), 1.0f);
- aliveRmtCacheNodes = new ConcurrentHashMap8<>(allNodes.size(), 1.0f);
aliveSrvNodesWithCaches = new ConcurrentSkipListMap<>(GridNodeOrderComparator.INSTANCE);
nodesByVer = new TreeMap<>();
long maxOrder0 = 0;
- Set<String> nearEnabledSet = new HashSet<>();
+ Set<Integer> nearEnabledSet = new HashSet<>();
List<ClusterNode> srvNodes = new ArrayList<>();
@@ -2620,21 +2585,11 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
addToMap(cacheMap, cacheName, node);
- if (alive(node.id()))
- addToMap(aliveCacheNodes, maskNull(cacheName), node);
-
if (filter.dataNode(node))
addToMap(dhtNodesMap, cacheName, node);
if (filter.nearNode(node))
- nearEnabledSet.add(cacheName);
-
- if (!loc.id().equals(node.id())) {
- addToMap(rmtCacheMap, cacheName, node);
-
- if (alive(node.id()))
- addToMap(aliveRmtCacheNodes, maskNull(cacheName), node);
- }
+ nearEnabledSet.add(CU.cacheId(cacheName));
hasCaches = true;
}
@@ -2674,7 +2629,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
maxOrder = maxOrder0;
allCacheNodes = Collections.unmodifiableMap(cacheMap);
- rmtCacheNodes = Collections.unmodifiableMap(rmtCacheMap);
affCacheNodes = Collections.unmodifiableMap(dhtNodesMap);
allNodesWithCaches = Collections.unmodifiableCollection(nodesWithCaches);
this.rmtNodesWithCaches = Collections.unmodifiableCollection(rmtNodesWithCaches);
@@ -2684,7 +2638,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
daemonNodes = Collections.unmodifiableList(new ArrayList<>(
F.view(F.concat(false, loc, rmts), F0.not(FILTER_DAEMON))));
- Map<UUID, ClusterNode> nodeMap = new HashMap<>(allNodes().size() + daemonNodes.size(), 1.0f);
+ Map<UUID, ClusterNode> nodeMap = U.newHashMap(allNodes().size() + daemonNodes.size());
for (ClusterNode n : F.concat(false, allNodes(), daemonNodes()))
nodeMap.put(n.id(), n);
@@ -2699,13 +2653,13 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
* @param cacheName Cache name.
* @param rich Node to add
*/
- private void addToMap(Map<String, Collection<ClusterNode>> cacheMap, String cacheName, ClusterNode rich) {
- Collection<ClusterNode> cacheNodes = cacheMap.get(cacheName);
+ private void addToMap(Map<Integer, Collection<ClusterNode>> cacheMap, String cacheName, ClusterNode rich) {
+ Collection<ClusterNode> cacheNodes = cacheMap.get(CU.cacheId(cacheName));
if (cacheNodes == null) {
cacheNodes = new ArrayList<>(allNodes.size());
- cacheMap.put(cacheName, cacheNodes);
+ cacheMap.put(CU.cacheId(cacheName), cacheNodes);
}
cacheNodes.add(rich);
@@ -2727,28 +2681,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
}
/**
- * Gets collection of nodes which have version equal or greater than {@code ver}.
- *
- * @param ver Version to check.
- * @return Collection of nodes with version equal or greater than {@code ver}.
- */
- Collection<ClusterNode> elderNodes(IgniteProductVersion ver) {
- Map.Entry<IgniteProductVersion, Collection<ClusterNode>> entry = nodesByVer.ceilingEntry(ver);
-
- if (entry == null)
- return Collections.emptyList();
-
- return entry.getValue();
- }
-
- /**
- * @return Versions map.
- */
- NavigableMap<IgniteProductVersion, Collection<ClusterNode>> versionsMap() {
- return nodesByVer;
- }
-
- /**
* Gets collection of nodes with at least one cache configured.
*
* @param topVer Topology version (maximum allowed node order).
@@ -2766,61 +2698,50 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
* @return Collection of nodes.
*/
Collection<ClusterNode> cacheNodes(@Nullable String cacheName, final long topVer) {
- return filter(topVer, allCacheNodes.get(cacheName));
+ return filter(topVer, allCacheNodes.get(CU.cacheId(cacheName)));
}
/**
- * Gets all remote nodes that have at least one cache configured.
+ * Gets all nodes that have cache with given ID.
*
+ * @param cacheId Cache ID.
* @param topVer Topology version.
* @return Collection of nodes.
*/
- Collection<ClusterNode> remoteCacheNodes(final long topVer) {
- return filter(topVer, rmtNodesWithCaches);
- }
-
- /**
- * Gets all nodes that have cache with given name and should participate in affinity calculation. With
- * partitioned cache nodes with near-only cache do not participate in affinity node calculation.
- *
- * @param cacheName Cache name.
- * @param topVer Topology version.
- * @return Collection of nodes.
- */
- Collection<ClusterNode> cacheAffinityNodes(@Nullable String cacheName, final long topVer) {
- return filter(topVer, affCacheNodes.get(cacheName));
+ Collection<ClusterNode> cacheNodes(Integer cacheId, final long topVer) {
+ return filter(topVer, allCacheNodes.get(cacheId));
}
/**
- * Gets all alive nodes that have cache with given name.
+ * Gets all remote nodes that have at least one cache configured.
*
- * @param cacheName Cache name.
* @param topVer Topology version.
* @return Collection of nodes.
*/
- Collection<ClusterNode> aliveCacheNodes(@Nullable String cacheName, final long topVer) {
- return filter(topVer, aliveCacheNodes.get(maskNull(cacheName)));
+ Collection<ClusterNode> remoteCacheNodes(final long topVer) {
+ return filter(topVer, rmtNodesWithCaches);
}
/**
- * Gets all alive remote nodes that have cache with given name.
+ * Gets all nodes that have cache with given ID and should participate in affinity calculation. With
+ * partitioned cache nodes with near-only cache do not participate in affinity node calculation.
*
- * @param cacheName Cache name.
+ * @param cacheId Cache ID.
* @param topVer Topology version.
* @return Collection of nodes.
*/
- Collection<ClusterNode> aliveRemoteCacheNodes(@Nullable String cacheName, final long topVer) {
- return filter(topVer, aliveRmtCacheNodes.get(maskNull(cacheName)));
+ Collection<ClusterNode> cacheAffinityNodes(int cacheId, final long topVer) {
+ return filter(topVer, affCacheNodes.get(cacheId));
}
/**
- * Checks if cache with given name has at least one node with near cache enabled.
+ * Checks if cache with given ID has at least one node with near cache enabled.
*
- * @param cacheName Cache name.
+ * @param cacheId Cache ID.
* @return {@code True} if cache with given name has at least one node with near cache enabled.
*/
- boolean hasNearCache(@Nullable String cacheName) {
- return nearEnabledCaches.contains(cacheName);
+ boolean hasNearCache(int cacheId) {
+ return nearEnabledCaches.contains(cacheId);
}
/**
@@ -2832,51 +2753,10 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
if (leftNode.order() > maxOrder)
return;
- filterNodeMap(aliveCacheNodes, leftNode);
-
- filterNodeMap(aliveRmtCacheNodes, leftNode);
-
aliveSrvNodesWithCaches.remove(leftNode);
}
/**
- * Creates a copy of nodes map without the given node.
- *
- * @param map Map to copy.
- * @param exclNode Node to exclude.
- */
- private void filterNodeMap(ConcurrentMap<String, Collection<ClusterNode>> map, final ClusterNode exclNode) {
- for (String cacheName : registeredCaches.keySet()) {
- String maskedName = maskNull(cacheName);
-
- while (true) {
- Collection<ClusterNode> oldNodes = map.get(maskedName);
-
- if (oldNodes == null || oldNodes.isEmpty())
- break;
-
- Collection<ClusterNode> newNodes = new ArrayList<>(oldNodes);
-
- if (!newNodes.remove(exclNode))
- break;
-
- if (map.replace(maskedName, oldNodes, newNodes))
- break;
- }
- }
- }
-
- /**
- * Replaces {@code null} with {@code NULL_CACHE_NAME}.
- *
- * @param cacheName Cache name.
- * @return Masked name.
- */
- private String maskNull(@Nullable String cacheName) {
- return cacheName == null ? NULL_CACHE_NAME : cacheName;
- }
-
- /**
* @param topVer Topology version.
* @param nodes Nodes.
* @return Filtered collection (potentially empty, but never {@code null}).
http://git-wip-us.apache.org/repos/asf/ignite/blob/b02ad0de/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
index 607bb96..b5d5ee2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
@@ -100,6 +100,9 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
/** Events of these types should be recorded. */
private volatile int[] inclEvtTypes;
+ /** */
+ private boolean stopped;
+
/**
* Maps event type to boolean ({@code true} for recordable events).
* This array is used for listeners notification. It may be wider,
@@ -212,7 +215,16 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
* @return {@code true} if entered to busy state.
*/
private boolean enterBusy() {
- return busyLock.readLock().tryLock();
+ if (!busyLock.readLock().tryLock())
+ return false;
+
+ if (stopped) {
+ busyLock.readLock().unlock();
+
+ return false;
+ }
+
+ return true;
}
/**
@@ -225,15 +237,23 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
/** {@inheritDoc} */
@SuppressWarnings({"LockAcquiredButNotSafelyReleased"})
@Override public void onKernalStop0(boolean cancel) {
- // Acquire write lock so that any new thread could not be started.
busyLock.writeLock().lock();
- if (msgLsnr != null)
- ctx.io().removeMessageListener(TOPIC_EVENT, msgLsnr);
+ try {
+ if (msgLsnr != null)
+ ctx.io().removeMessageListener(
+ TOPIC_EVENT,
+ msgLsnr);
+
+ msgLsnr = null;
- msgLsnr = null;
+ lsnrs.clear();
- lsnrs.clear();
+ stopped = true;
+ }
+ finally {
+ busyLock.writeLock().unlock();
+ }
}
/** {@inheritDoc} */
@@ -1203,4 +1223,4 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
return lsnr.hashCode();
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b02ad0de/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
index a388c7a..144b162 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
@@ -63,7 +63,7 @@ public class GridAffinityAssignmentCache {
private final String cacheName;
/** */
- private final Integer cacheId;
+ private final int cacheId;
/** Number of backups. */
private final int backups;
@@ -169,7 +169,7 @@ public class GridAffinityAssignmentCache {
/**
* @return Cache ID.
*/
- public Integer cacheId() {
+ public int cacheId() {
return cacheId;
}
@@ -266,7 +266,7 @@ public class GridAffinityAssignmentCache {
List<ClusterNode> sorted;
if (!locCache) {
- sorted = new ArrayList<>(ctx.discovery().cacheAffinityNodes(cacheName, topVer));
+ sorted = new ArrayList<>(ctx.discovery().cacheAffinityNodes(cacheId(), topVer));
Collections.sort(sorted, GridNodeOrderComparator.INSTANCE);
}
@@ -617,4 +617,4 @@ public class GridAffinityAssignmentCache {
return S.toString(AffinityReadyFuture.class, this);
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b02ad0de/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 2890887..7bf5fd8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -843,7 +843,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
return true;
// If local node did not initiate exchange or local node is the only cache node in grid.
- Collection<ClusterNode> affNodes = cctx.discovery().cacheAffinityNodes(aff.cacheName(), fut.topologyVersion());
+ Collection<ClusterNode> affNodes = cctx.discovery().cacheAffinityNodes(aff.cacheId(), fut.topologyVersion());
DynamicCacheDescriptor cacheDesc = registeredCaches.get(aff.cacheId());
http://git-wip-us.apache.org/repos/asf/ignite/blob/b02ad0de/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
index f8722d6..9284143 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
@@ -160,6 +160,9 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter {
/** Stopping flag. */
private volatile boolean stopping;
+ /** Stopped flag. */
+ private boolean stopped;
+
/** Current future. */
private final AtomicReference<EvictionFuture> curEvictFut = new AtomicReference<>();
@@ -311,19 +314,28 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter {
busyLock.block();
- // Stop backup worker.
- if (evictSync && !cctx.isNear() && backupWorker != null) {
- backupWorker.cancel();
+ try {
+ // Stop backup worker.
+ if (evictSync && !cctx.isNear() && backupWorker != null) {
+ backupWorker.cancel();
- U.join(backupWorkerThread, log);
- }
+ U.join(
+ backupWorkerThread,
+ log);
+ }
- // Cancel all active futures.
- for (EvictionFuture fut : futs.values())
- fut.cancel();
+ // Cancel all active futures.
+ for (EvictionFuture fut : futs.values())
+ fut.cancel();
- if (log.isDebugEnabled())
- log.debug("Eviction manager stopped on node: " + cctx.nodeId());
+ if (log.isDebugEnabled())
+ log.debug("Eviction manager stopped on node: " + cctx.nodeId());
+ }
+ finally {
+ stopped = true;
+
+ busyLock.unblock();
+ }
}
/**
@@ -345,7 +357,7 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter {
log.debug("Processing eviction response [node=" + nodeId + ", localNode=" + cctx.nodeId() +
", res=" + res + ']');
- if (!busyLock.enterBusy())
+ if (!enterBusy())
return;
try {
@@ -363,6 +375,22 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter {
}
/**
+ * @return {@code True} if entered busy.
+ */
+ private boolean enterBusy() {
+ if (!busyLock.enterBusy())
+ return false;
+
+ if (stopped) {
+ busyLock.leaveBusy();
+
+ return false;
+ }
+
+ return true;
+ }
+
+ /**
* @param nodeId Sender node ID.
* @param req Request.
*/
@@ -370,7 +398,7 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter {
assert nodeId != null;
assert req != null;
- if (!busyLock.enterBusy())
+ if (!enterBusy())
return;
try {
@@ -811,7 +839,7 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter {
if (!cctx.isNear() && evictSync && !cctx.affinity().primaryByPartition(cctx.localNode(), e.partition(), topVer))
return;
- if (!busyLock.enterBusy())
+ if (!enterBusy())
return;
try {
@@ -1145,7 +1173,7 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter {
fut.listen(new CI1<IgniteInternalFuture<?>>() {
@Override public void apply(IgniteInternalFuture<?> f) {
- if (!busyLock.enterBusy()) {
+ if (!enterBusy()) {
if (log.isDebugEnabled())
log.debug("Will not notify eviction future completion (grid is stopping): " +
f);
@@ -1187,7 +1215,7 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter {
* @param topVer Topology version on future complete.
*/
private void onFutureCompleted(EvictionFuture fut, AffinityTopologyVersion topVer) {
- if (!busyLock.enterBusy())
+ if (!enterBusy())
return;
try {
@@ -1366,7 +1394,7 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter {
if (!evictSyncAgr)
return;
- if (!busyLock.enterBusy())
+ if (!enterBusy())
return;
try {
http://git-wip-us.apache.org/repos/asf/ignite/blob/b02ad0de/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
index 1562d70..1bf9468 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
@@ -17,13 +17,15 @@
package org.apache.ignite.internal.processors.cache;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
import javax.cache.CacheException;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteClientDisconnectedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
-import org.apache.ignite.internal.util.GridSpinReadWriteLock;
+import org.apache.ignite.internal.util.StripedCompositeReadWriteLock;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -45,7 +47,8 @@ public class GridCacheGateway<K, V> {
private IgniteFuture<?> reconnectFut;
/** */
- private GridSpinReadWriteLock rwLock = new GridSpinReadWriteLock();
+ private StripedCompositeReadWriteLock rwLock =
+ new StripedCompositeReadWriteLock(Runtime.getRuntime().availableProcessors());
/**
* @param ctx Cache context.
@@ -63,7 +66,7 @@ public class GridCacheGateway<K, V> {
if (ctx.deploymentEnabled())
ctx.deploy().onEnter();
- rwLock.readLock();
+ rwLock.readLock().lock();
checkState(true, true);
}
@@ -78,7 +81,7 @@ public class GridCacheGateway<K, V> {
if (state != State.STARTED) {
if (lock)
- rwLock.readUnlock();
+ rwLock.readLock().unlock();
if (state == State.STOPPED) {
if (stopErr)
@@ -106,7 +109,7 @@ public class GridCacheGateway<K, V> {
onEnter();
// Must unlock in case of unexpected errors to avoid deadlocks during kernal stop.
- rwLock.readLock();
+ rwLock.readLock().lock();
return checkState(true, false);
}
@@ -139,10 +142,10 @@ public class GridCacheGateway<K, V> {
*/
public void leave() {
try {
- leaveNoLock();
+ leaveNoLock();
}
finally {
- rwLock.readUnlock();
+ rwLock.readLock().unlock();
}
}
@@ -168,7 +171,9 @@ public class GridCacheGateway<K, V> {
onEnter();
- rwLock.readLock();
+ Lock lock = rwLock.readLock();
+
+ lock.lock();
checkState(true, true);
@@ -178,7 +183,7 @@ public class GridCacheGateway<K, V> {
return setOperationContextPerCall(opCtx);
}
catch (Throwable e) {
- rwLock.readUnlock();
+ lock.unlock();
throw e;
}
@@ -219,7 +224,7 @@ public class GridCacheGateway<K, V> {
leaveNoLock(prev);
}
finally {
- rwLock.readUnlock();
+ rwLock.readLock().unlock();
}
}
@@ -269,14 +274,14 @@ public class GridCacheGateway<K, V> {
*
*/
public void writeLock(){
- rwLock.writeLock();
+ rwLock.writeLock().lock();
}
/**
*
*/
public void writeUnlock() {
- rwLock.writeUnlock();
+ rwLock.writeLock().unlock();
}
/**
@@ -295,15 +300,14 @@ public class GridCacheGateway<K, V> {
boolean interrupted = false;
while (true) {
- if (rwLock.tryWriteLock())
- break;
- else {
- try {
+ try {
+ if (rwLock.writeLock().tryLock(200, TimeUnit.MILLISECONDS))
+ break;
+ else
U.sleep(200);
- }
- catch (IgniteInterruptedCheckedException ignore) {
- interrupted = true;
- }
+ }
+ catch (IgniteInterruptedCheckedException | InterruptedException ignore) {
+ interrupted = true;
}
}
@@ -314,7 +318,7 @@ public class GridCacheGateway<K, V> {
state.set(State.STOPPED);
}
finally {
- rwLock.writeUnlock();
+ rwLock.writeLock().unlock();
}
}
@@ -331,4 +335,4 @@ public class GridCacheGateway<K, V> {
/** */
STOPPED
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b02ad0de/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 924ce79..d20310b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -26,6 +26,7 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
@@ -74,7 +75,7 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxStateAwa
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.F0;
import org.apache.ignite.internal.util.GridLeanSet;
-import org.apache.ignite.internal.util.GridSpinReadWriteLock;
+import org.apache.ignite.internal.util.StripedCompositeReadWriteLock;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.P1;
@@ -120,7 +121,8 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
private boolean stopping;
/** Mutex. */
- private final GridSpinReadWriteLock rw = new GridSpinReadWriteLock();
+ private final StripedCompositeReadWriteLock rw =
+ new StripedCompositeReadWriteLock(Runtime.getRuntime().availableProcessors());
/** Deployment enabled. */
private boolean depEnabled;
@@ -316,7 +318,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
// Busy wait is intentional.
while (true) {
try {
- if (rw.tryWriteLock(200, TimeUnit.MILLISECONDS))
+ if (rw.writeLock().tryLock(200, TimeUnit.MILLISECONDS))
break;
else
Thread.sleep(200);
@@ -335,7 +337,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
stopping = true;
}
finally {
- rw.writeUnlock();
+ rw.writeLock().unlock();
}
}
@@ -347,7 +349,9 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
@SuppressWarnings({"unchecked", "ConstantConditions", "ThrowableResultOfMethodCallIgnored"})
private void onMessage0(final UUID nodeId, final GridCacheMessage cacheMsg,
final IgniteBiInClosure<UUID, GridCacheMessage> c) {
- rw.readLock();
+ Lock lock = rw.readLock();
+
+ lock.lock();
try {
if (stopping) {
@@ -378,7 +382,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
if (depEnabled)
cctx.deploy().ignoreOwnership(false);
- rw.readUnlock();
+ lock.unlock();
}
}
@@ -821,9 +825,6 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
*/
private void processMessage(UUID nodeId, GridCacheMessage msg, IgniteBiInClosure<UUID, GridCacheMessage> c) {
try {
- // We will not end up with storing a bunch of new UUIDs
- // in each cache entry, since node ID is stored in NIO session
- // on handshake.
c.apply(nodeId, msg);
if (log.isDebugEnabled())
http://git-wip-us.apache.org/repos/asf/ignite/blob/b02ad0de/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index e274485..1c59390 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -454,7 +454,7 @@ public class GridCacheUtils {
* that may have already left).
*/
public static Collection<ClusterNode> allNodes(GridCacheContext ctx, AffinityTopologyVersion topOrder) {
- return ctx.discovery().cacheNodes(ctx.namex(), topOrder);
+ return ctx.discovery().cacheNodes(ctx.cacheId(), topOrder);
}
/**
@@ -487,7 +487,7 @@ public class GridCacheUtils {
* @return All nodes on which cache with the same name is started.
*/
public static Collection<ClusterNode> affinityNodes(final GridCacheContext ctx) {
- return ctx.discovery().cacheAffinityNodes(ctx.namex(), AffinityTopologyVersion.NONE);
+ return ctx.discovery().cacheAffinityNodes(ctx.cacheId(), AffinityTopologyVersion.NONE);
}
/**
@@ -498,7 +498,7 @@ public class GridCacheUtils {
* @return Affinity nodes.
*/
public static Collection<ClusterNode> affinityNodes(GridCacheContext ctx, AffinityTopologyVersion topOrder) {
- return ctx.discovery().cacheAffinityNodes(ctx.namex(), topOrder);
+ return ctx.discovery().cacheAffinityNodes(ctx.cacheId(), topOrder);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/b02ad0de/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/affinity/GridCacheAffinityImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/affinity/GridCacheAffinityImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/affinity/GridCacheAffinityImpl.java
index 11361a2..41b3281 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/affinity/GridCacheAffinityImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/affinity/GridCacheAffinityImpl.java
@@ -196,7 +196,7 @@ public class GridCacheAffinityImpl<K, V> implements Affinity<K> {
int nodesCnt;
if (!cctx.isLocal())
- nodesCnt = cctx.discovery().cacheAffinityNodes(cctx.name(), topVer).size();
+ nodesCnt = cctx.discovery().cacheAffinityNodes(cctx.cacheId(), topVer).size();
else
nodesCnt = 1;
http://git-wip-us.apache.org/repos/asf/ignite/blob/b02ad0de/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index 7c1f760..a1fbd72 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -88,7 +88,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
private volatile boolean stopping;
/** A future that will be completed when topology with version topVer will be ready to use. */
- private GridDhtTopologyFuture topReadyFut;
+ private volatile GridDhtTopologyFuture topReadyFut;
/** */
private final GridAtomicLong updateSeq = new GridAtomicLong(1);
@@ -216,16 +216,9 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
/** {@inheritDoc} */
@Override public GridDhtTopologyFuture topologyVersionFuture() {
- lock.readLock().lock();
-
- try {
- assert topReadyFut != null;
+ assert topReadyFut != null;
- return topReadyFut;
- }
- finally {
- lock.readLock().unlock();
- }
+ return topReadyFut;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/b02ad0de/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
index d1e3780..b5cb5cf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
@@ -78,9 +78,10 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffin
AffinityTopologyVersion topVer
) {
this.ctx = ctx;
- this.key = new T2<>(CU.cacheId(cacheName), topVer);
+ int cacheId = CU.cacheId(cacheName);
+ this.key = new T2<>(cacheId, topVer);
- Collection<ClusterNode> availableNodes = ctx.discovery().cacheAffinityNodes(cacheName, topVer);
+ Collection<ClusterNode> availableNodes = ctx.discovery().cacheAffinityNodes(cacheId, topVer);
LinkedList<ClusterNode> tmp = new LinkedList<>();
http://git-wip-us.apache.org/repos/asf/ignite/blob/b02ad0de/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index f5865e6..1cd3cfb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -1199,8 +1199,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
if (expVer.equals(curVer))
return false;
- Collection<ClusterNode> cacheNodes0 = ctx.discovery().cacheAffinityNodes(ctx.name(), expVer);
- Collection<ClusterNode> cacheNodes1 = ctx.discovery().cacheAffinityNodes(ctx.name(), curVer);
+ Collection<ClusterNode> cacheNodes0 = ctx.discovery().cacheAffinityNodes(ctx.cacheId(), expVer);
+ Collection<ClusterNode> cacheNodes1 = ctx.discovery().cacheAffinityNodes(ctx.cacheId(), curVer);
if (!cacheNodes0.equals(cacheNodes1) || ctx.affinity().affinityTopologyVersion().compareTo(curVer) < 0)
return true;
http://git-wip-us.apache.org/repos/asf/ignite/blob/b02ad0de/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 75a275c..966a186 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -99,7 +99,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
private volatile boolean stopping;
/** A future that will be completed when topology with version topVer will be ready to use. */
- private GridDhtTopologyFuture topReadyFut;
+ private volatile GridDhtTopologyFuture topReadyFut;
/** */
private final GridAtomicLong updateSeq = new GridAtomicLong(1);
@@ -311,16 +311,9 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
/** {@inheritDoc} */
@Override public GridDhtTopologyFuture topologyVersionFuture() {
- lock.readLock().lock();
-
- try {
- assert topReadyFut != null;
+ assert topReadyFut != null;
- return topReadyFut;
- }
- finally {
- lock.readLock().unlock();
- }
+ return topReadyFut;
}
/** {@inheritDoc} */
@@ -752,6 +745,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
if (part != null)
list.add(part);
}
+
return list;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b02ad0de/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
index 1b175d0..4cb113e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
@@ -111,10 +111,13 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
GridCacheVersion writeVer,
GridNearAtomicAbstractUpdateRequest updateReq,
- GridNearAtomicUpdateResponse updateRes) {
+ GridNearAtomicUpdateResponse updateRes
+ ) {
this.cctx = cctx;
- futVer = cctx.versions().next(updateReq.topologyVersion());
+ this.futVer = cctx.isLocalNode(updateRes.nodeId()) ?
+ cctx.versions().next(updateReq.topologyVersion()) : // Generate new if request mapped to local.
+ updateReq.futureVersion();
this.updateReq = updateReq;
this.completionCb = completionCb;
this.updateRes = updateRes;
http://git-wip-us.apache.org/repos/asf/ignite/blob/b02ad0de/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
index f0bea07..deb9ce4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
@@ -134,7 +134,7 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag
boolean addPrevVal,
int partId,
@Nullable CacheObject prevVal,
- @Nullable Long updateCntr
+ long updateCntr
);
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/b02ad0de/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 1b6179e..4745ff7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -23,7 +23,6 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -1775,6 +1774,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(), nodeId, req.futureVersion(),
ctx.deploymentEnabled());
+ res.partition(req.partition());
+
assert !req.returnValue() || (req.operation() == TRANSFORM || req.size() == 1);
GridDhtAtomicAbstractUpdateFuture dhtFut = null;
@@ -2435,7 +2436,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
AffinityTopologyVersion topVer = req.topologyVersion();
- boolean checkReaders = hasNear || ctx.discovery().hasNearCache(name(), topVer);
+ boolean checkReaders = hasNear || ctx.discovery().hasNearCache(ctx.cacheId(), topVer);
boolean readersOnly = false;
@@ -2670,7 +2671,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
AffinityTopologyVersion topVer = req.topologyVersion();
- boolean checkReaders = hasNear || ctx.discovery().hasNearCache(name(), topVer);
+ boolean checkReaders = hasNear || ctx.discovery().hasNearCache(ctx.cacheId(), topVer);
CacheStorePartialUpdateException storeErr = null;
@@ -2996,7 +2997,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
for (GridCacheMapEntry entry : locked) {
if (entry != null && entry.deleted()) {
if (skip == null)
- skip = new HashSet<>(locked.size(), 1.0f);
+ skip = U.newHashSet(locked.size());
skip.add(entry.key());
}
@@ -3142,7 +3143,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
AffinityTopologyVersion topVer = updateReq.topologyVersion();
- Collection<ClusterNode> nodes = ctx.kernalContext().discovery().cacheAffinityNodes(name(), topVer);
+ Collection<ClusterNode> nodes = ctx.kernalContext().discovery().cacheAffinityNodes(ctx.cacheId(), topVer);
// We are on primary node for some key.
assert !nodes.isEmpty() : "Failed to find affinity nodes [name=" + name() + ", topVer=" + topVer +
@@ -3186,7 +3187,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
@SuppressWarnings("unchecked")
private void processNearAtomicUpdateResponse(UUID nodeId, GridNearAtomicUpdateResponse res) {
if (msgLog.isDebugEnabled())
- msgLog.debug("Received near atomic update response [futId" + res.futureVersion() + ", node=" + nodeId + ']');
+ msgLog.debug("Received near atomic update response " +
+ "[futId=" + res.futureVersion() + ", node=" + nodeId + ']');
res.nodeId(ctx.localNodeId());
@@ -3217,6 +3219,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
GridDhtAtomicUpdateResponse res = new GridDhtAtomicUpdateResponse(ctx.cacheId(), req.futureVersion(),
ctx.deploymentEnabled());
+ res.partition(req.partition());
+
Boolean replicate = ctx.isDrEnabled();
boolean intercept = req.forceTransformBackups() && ctx.config().getInterceptor() != null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/b02ad0de/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
index 20d6e90..0dc2754 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
@@ -67,7 +67,11 @@ class GridDhtAtomicSingleUpdateFuture extends GridDhtAtomicAbstractUpdateFuture
GridNearAtomicAbstractUpdateRequest updateReq,
GridNearAtomicUpdateResponse updateRes
) {
- super(cctx, completionCb, writeVer, updateReq, updateRes);
+ super(cctx,
+ completionCb,
+ writeVer,
+ updateReq,
+ updateRes);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/b02ad0de/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
index 0af7cf5..a7e6c24 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
@@ -161,7 +161,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
boolean addPrevVal,
int partId,
@Nullable CacheObject prevVal,
- @Nullable Long updateCntr
+ long updateCntr
) {
assert entryProcessor == null;
assert ttl <= 0 : ttl;
@@ -177,8 +177,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
if (addPrevVal)
this.prevVal = prevVal;
- if (updateCntr != null)
- this.updateCntr = updateCntr;
+ this.updateCntr = updateCntr;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/b02ad0de/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index efb35c4..5429adc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -66,7 +66,11 @@ class GridDhtAtomicUpdateFuture extends GridDhtAtomicAbstractUpdateFuture {
GridNearAtomicAbstractUpdateRequest updateReq,
GridNearAtomicUpdateResponse updateRes
) {
- super(cctx, completionCb, writeVer, updateReq, updateRes);
+ super(cctx,
+ completionCb,
+ writeVer,
+ updateReq,
+ updateRes);
keys = new ArrayList<>(updateReq.size());
mappings = U.newHashMap(updateReq.size());
http://git-wip-us.apache.org/repos/asf/ignite/blob/b02ad0de/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
index 1854e52..7144963 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
@@ -227,7 +227,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
boolean addPrevVal,
int partId,
@Nullable CacheObject prevVal,
- @Nullable Long updateCntr
+ long updateCntr
) {
keys.add(key);
@@ -248,12 +248,10 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
prevVals.add(prevVal);
}
- if (updateCntr != null) {
- if (updateCntrs == null)
- updateCntrs = new GridLongList();
+ if (updateCntrs == null)
+ updateCntrs = new GridLongList();
- updateCntrs.add(updateCntr);
- }
+ updateCntrs.add(updateCntr);
// In case there is no conflict, do not create the list.
if (conflictVer != null) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/b02ad0de/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
index ff12af0..c3d3ca9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
@@ -69,6 +69,9 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri
@GridDirectCollection(KeyCacheObject.class)
private List<KeyCacheObject> nearEvicted;
+ /** */
+ private int partId = -1;
+
/**
* Empty constructor required by {@link Externalizable}.
*/
@@ -157,6 +160,18 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri
nearEvicted.add(key);
}
+ /**
+ * @param partId Partition ID to set.
+ */
+ public void partition(int partId) {
+ this.partId = partId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int partition() {
+ return partId;
+ }
+
/** {@inheritDoc} */
@Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
super.prepareMarshal(ctx);
@@ -234,6 +249,12 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri
writer.incrementState();
+ case 7:
+ if (!writer.writeInt("partId", partId))
+ return false;
+
+ writer.incrementState();
+
}
return true;
@@ -282,6 +303,14 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri
reader.incrementState();
+ case 7:
+ partId = reader.readInt("partId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
}
return reader.afterMessageRead(GridDhtAtomicUpdateResponse.class);
@@ -294,7 +323,7 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 7;
+ return 8;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/b02ad0de/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
index 891a20c..0a816a7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
@@ -379,12 +379,11 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
/** {@inheritDoc} */
@Override protected void mapOnTopology() {
- cache.topology().readLock();
-
AffinityTopologyVersion topVer;
-
GridCacheVersion futVer;
+ cache.topology().readLock();
+
try {
if (cache.topology().stopping()) {
onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " +
@@ -454,7 +453,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
updVer = this.updVer;
if (updVer == null) {
- updVer = cctx.versions().next(topVer);
+ updVer = futVer;
if (log.isDebugEnabled())
log.debug("Assigned fast-map version for update on near node: " + updVer);
http://git-wip-us.apache.org/repos/asf/ignite/blob/b02ad0de/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 2315a18..f182ecb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -488,12 +488,11 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
/** {@inheritDoc} */
@Override protected void mapOnTopology() {
- cache.topology().readLock();
-
AffinityTopologyVersion topVer;
-
GridCacheVersion futVer;
+ cache.topology().readLock();
+
try {
if (cache.topology().stopping()) {
onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " +
@@ -628,7 +627,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
updVer = this.updVer;
if (updVer == null) {
- updVer = cctx.versions().next(topVer);
+ updVer = futVer;
if (log.isDebugEnabled())
log.debug("Assigned fast-map version for update on near node: " + updVer);
http://git-wip-us.apache.org/repos/asf/ignite/blob/b02ad0de/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
index 2e38733..22e01ae 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
@@ -105,6 +105,9 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
/** Near expire times. */
private GridLongList nearExpireTimes;
+ /** Partition ID. */
+ private int partId = -1;
+
/**
* Empty constructor required by {@link Externalizable}.
*/
@@ -154,6 +157,13 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
}
/**
+ * @param partId Partition ID for proper striping on near node.
+ */
+ public void partition(int partId) {
+ this.partId = partId;
+ }
+
+ /**
* Sets update error.
*
* @param err Error.
@@ -431,6 +441,11 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
}
/** {@inheritDoc} */
+ @Override public int partition() {
+ return partId;
+ }
+
+ /** {@inheritDoc} */
@Override public boolean addDeploymentInfo() {
return addDepInfo;
}
@@ -510,12 +525,18 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
writer.incrementState();
case 12:
- if (!writer.writeCollection("remapKeys", remapKeys, MessageCollectionItemType.MSG))
+ if (!writer.writeInt("partId", partId))
return false;
writer.incrementState();
case 13:
+ if (!writer.writeCollection("remapKeys", remapKeys, MessageCollectionItemType.MSG))
+ return false;
+
+ writer.incrementState();
+
+ case 14:
if (!writer.writeMessage("ret", ret))
return false;
@@ -610,7 +631,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
reader.incrementState();
case 12:
- remapKeys = reader.readCollection("remapKeys", MessageCollectionItemType.MSG);
+ partId = reader.readInt("partId");
if (!reader.isLastRead())
return false;
@@ -618,6 +639,14 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
reader.incrementState();
case 13:
+ remapKeys = reader.readCollection("remapKeys", MessageCollectionItemType.MSG);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 14:
ret = reader.readMessage("ret");
if (!reader.isLastRead())
@@ -637,7 +666,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 14;
+ return 15;
}
/** {@inheritDoc} */