You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2017/08/02 08:25:38 UTC
ignite git commit: IGNITE-5757 - Rent partitions on exchange
completion
Repository: ignite
Updated Branches:
refs/heads/master cb0349654 -> c6fbe2d82
IGNITE-5757 - Rent partitions on exchange completion
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c6fbe2d8
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c6fbe2d8
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c6fbe2d8
Branch: refs/heads/master
Commit: c6fbe2d82a9f56f96c94551b09e85a12d192f32e
Parents: cb03496
Author: Alexey Goncharuk <al...@gmail.com>
Authored: Wed Aug 2 11:25:08 2017 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Wed Aug 2 11:25:22 2017 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheMapEntry.java | 3 +++
.../dht/GridDhtPartitionTopology.java | 4 +++-
.../dht/GridDhtPartitionTopologyImpl.java | 21 +++++++++-------
.../distributed/dht/GridDhtTxPrepareFuture.java | 25 ++++++++++++++++++--
.../distributed/near/GridNearCacheEntry.java | 2 +-
.../near/GridNearTxPrepareRequest.java | 8 +++----
.../cache/transactions/IgniteTxManager.java | 3 ++-
.../IgniteRejectConnectOnNodeStopTest.java | 7 +++++-
.../testframework/junits/GridAbstractTest.java | 3 ++-
9 files changed, 57 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/c6fbe2d8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index edfa950..2ee80a6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -2088,6 +2088,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
long expireTime = expireTimeExtras();
if (expireTime > 0 && (expireTime - U.currentTimeMillis() <= 0)) {
+ if (obsoleteVer == null)
+ obsoleteVer = nextVersion();
+
if (onExpired(this.val, obsoleteVer)) {
if (cctx.deferredDelete()) {
deferred = true;
http://git-wip-us.apache.org/repos/asf/ignite/blob/c6fbe2d8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
index 81d92e0..8688c4f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
@@ -203,7 +203,9 @@ public interface GridDhtPartitionTopology {
* @param p Partition ID.
* @param affAssignment Assignments.
* @param affNodes Node assigned for given partition by affinity.
- * @return Collection of all nodes responsible for this partition with primary node being first.
+ * @return Collection of all nodes responsible for this partition with primary node being first. The first N
+ * elements of this collection (with N being 1 + backups) are actual DHT affinity nodes, other nodes
+ * are current additional owners of the partition after topology change.
*/
@Nullable public List<ClusterNode> nodes(int p, AffinityAssignment affAssignment, List<ClusterNode> affNodes);
http://git-wip-us.apache.org/repos/asf/ignite/blob/c6fbe2d8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index a8e13a0..880a102 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -380,9 +380,6 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
}
}
}
-
- if (node2part != null && node2part.valid())
- checkEvictions(updateSeq, aff);
}
updateRebalanceVersion(aff);
@@ -624,7 +621,12 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
}
}
- updateRebalanceVersion(grp.affinity().assignments(topVer));
+ List<List<ClusterNode>> aff = grp.affinity().assignments(topVer);
+
+ updateRebalanceVersion(aff);
+
+ if (node2part != null && node2part.valid())
+ changed |= checkEvictions(updateSeq, aff);
consistencyCheck();
}
@@ -747,8 +749,9 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
"[part=" + p + ", topVer=" + topVer + ", this.topVer=" + this.topVer + ']');
}
else if (loc != null && state == RENTING && !showRenting)
- throw new GridDhtInvalidPartitionException(p, "Adding entry to partition that is concurrently evicted " +
- "[part=" + p + ", shouldBeMoving=" + loc.reload() + "]");
+ throw new GridDhtInvalidPartitionException(p, "Adding entry to partition that is concurrently " +
+ "evicted [part=" + p + ", shouldBeMoving=" + loc.reload() + ", belongs=" + belongs +
+ ", topVer=" + topVer + ", curTopVer=" + this.topVer + "]");
if (loc == null) {
if (!belongs)
@@ -1312,7 +1315,8 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
if (!affVer.equals(AffinityTopologyVersion.NONE) && affVer.compareTo(topVer) >= 0) {
List<List<ClusterNode>> aff = grp.affinity().assignments(topVer);
- changed |= checkEvictions(updateSeq, aff);
+ if (exchangeVer == null)
+ changed |= checkEvictions(updateSeq, aff);
updateRebalanceVersion(aff);
}
@@ -1501,7 +1505,8 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
if (!affVer.equals(AffinityTopologyVersion.NONE) && affVer.compareTo(topVer) >= 0) {
List<List<ClusterNode>> aff = grp.affinity().assignments(topVer);
- changed |= checkEvictions(updateSeq, aff);
+ if (exchId == null)
+ changed |= checkEvictions(updateSeq, aff);
updateRebalanceVersion(aff);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c6fbe2d8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index a31c540..03d99fc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -1510,7 +1510,7 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite
try {
List<ClusterNode> dhtNodes = dht.topology().nodes(cached.partition(), tx.topologyVersion());
- assert dhtNodes.size() > 0 && dhtNodes.get(0).id().equals(cctx.localNodeId()) : dhtNodes;
+ assert !dhtNodes.isEmpty() && dhtNodes.get(0).id().equals(cctx.localNodeId()) : dhtNodes;
if (log.isDebugEnabled())
log.debug("Mapping entry to DHT nodes [nodes=" + U.toShortString(dhtNodes) +
@@ -1531,7 +1531,7 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite
ClusterNode readerNode = cctx.discovery().node(readerId);
- if (readerNode == null || dhtNodes.contains(readerNode))
+ if (readerNode == null || canSkipNearReader(dht, readerNode, dhtNodes))
continue;
if (log.isDebugEnabled())
@@ -1554,6 +1554,27 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite
}
/**
+ * This method checks if we should skip mapping of an entry update to the near reader. We can skip the update
+ * if the reader is a primary or a backup. If the reader is a partition owner, but not a primary or a backup,
+ * we cannot skip the reader update and must attempt to update a near entry anyway.
+ *
+ * @param dhtCache DHT cache to check mapping.
+ * @param readerNode Reader node.
+ * @param dhtNodes Current DHT nodes (primary + backups first and other DHT nodes afterwards).
+ * @return {@code true} if reader is either a primary or a backup.
+ */
+ private boolean canSkipNearReader(GridDhtCacheAdapter<?, ?> dhtCache, ClusterNode readerNode, List<ClusterNode> dhtNodes) {
+ int limit = Math.min(dhtCache.configuration().getBackups() + 1, dhtNodes.size());
+
+ for (int i = 0; i < limit; i++) {
+ if (dhtNodes.get(i).id().equals(readerNode.id()))
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
* @param entry Entry.
* @param n Node.
* @param globalMap Map.
http://git-wip-us.apache.org/repos/asf/ignite/blob/c6fbe2d8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
index 646281b..6e606bf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
@@ -249,7 +249,7 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
// If we are here, then we already tried to evict this entry.
// If cannot evict, then update.
if (this.dhtVer == null) {
- if (!markObsolete(dhtVer)) {
+ if (!markObsolete(cctx.versions().next())) {
value(val);
ttlAndExpireTimeExtras((int) ttl, expireTime);
http://git-wip-us.apache.org/repos/asf/ignite/blob/c6fbe2d8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
index 29c7aad..875f397 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
@@ -406,13 +406,13 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
StringBuilder flags = new StringBuilder();
if (near())
- flags.append("near");
+ flags.append("[near]");
if (firstClientRequest())
- flags.append("clientReq");
+ flags.append("[firstClientReq]");
if (implicitSingle())
- flags.append("single");
+ flags.append("[implicitSingle]");
if (explicitLock())
- flags.append("explicitLock");
+ flags.append("[explicitLock]");
return S.toString(GridNearTxPrepareRequest.class, this,
"flags", flags.toString(),
http://git-wip-us.apache.org/repos/asf/ignite/blob/c6fbe2d8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index cd68bc9..82692ae 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -863,7 +863,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
GridNearCacheEntry e = near.peekExx(entry.key());
- if (e != null && e.markObsoleteIfEmpty(tx.xidVersion()))
+ if (e != null && e.markObsoleteIfEmpty(null))
near.removeEntry(e);
}
}
@@ -1191,6 +1191,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
throw new IgniteCheckedException("Missing commit version (consider increasing " +
IGNITE_MAX_COMPLETED_TX_COUNT + " system property) [ver=" + tx.xidVersion() +
+ ", committed0=" + committed0 +
", tx=" + tx.getClass().getSimpleName() + ']');
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c6fbe2d8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteRejectConnectOnNodeStopTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteRejectConnectOnNodeStopTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteRejectConnectOnNodeStopTest.java
index d34de12..97d685f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteRejectConnectOnNodeStopTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteRejectConnectOnNodeStopTest.java
@@ -81,6 +81,11 @@ public class IgniteRejectConnectOnNodeStopTest extends GridCommonAbstractTest {
return cfg;
}
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids(true);
+ }
+
/**
* @throws Exception If failed.
*/
@@ -126,7 +131,7 @@ public class IgniteRejectConnectOnNodeStopTest extends GridCommonAbstractTest {
boolean err = false;
- try{
+ try {
stopStartLatch.await();
IgniteCacheMessageRecoveryAbstractTest.closeSessions(srv);
http://git-wip-us.apache.org/repos/asf/ignite/blob/c6fbe2d8/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
index 9b99e01..4965d16 100755
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
@@ -766,7 +766,8 @@ public abstract class GridAbstractTest extends TestCase {
Thread.sleep(1000);
}
- throw new Exception("Failed to wait for proper topology: " + cnt);
+ throw new Exception("Failed to wait for proper topology [expCnt=" + cnt +
+ ", actualTopology=" + grid(0).cluster().nodes() + ']');
}
/** */