You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/08/02 09:03:20 UTC

[06/11] ignite git commit: IGNITE-5757 - Rent partitions on exchange completion

IGNITE-5757 - Rent partitions on exchange completion


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c6fbe2d8
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c6fbe2d8
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c6fbe2d8

Branch: refs/heads/ignite-5578
Commit: c6fbe2d82a9f56f96c94551b09e85a12d192f32e
Parents: cb03496
Author: Alexey Goncharuk <al...@gmail.com>
Authored: Wed Aug 2 11:25:08 2017 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Wed Aug 2 11:25:22 2017 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheMapEntry.java     |  3 +++
 .../dht/GridDhtPartitionTopology.java           |  4 +++-
 .../dht/GridDhtPartitionTopologyImpl.java       | 21 +++++++++-------
 .../distributed/dht/GridDhtTxPrepareFuture.java | 25 ++++++++++++++++++--
 .../distributed/near/GridNearCacheEntry.java    |  2 +-
 .../near/GridNearTxPrepareRequest.java          |  8 +++----
 .../cache/transactions/IgniteTxManager.java     |  3 ++-
 .../IgniteRejectConnectOnNodeStopTest.java      |  7 +++++-
 .../testframework/junits/GridAbstractTest.java  |  3 ++-
 9 files changed, 57 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/c6fbe2d8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index edfa950..2ee80a6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -2088,6 +2088,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                     long expireTime = expireTimeExtras();
 
                     if (expireTime > 0 && (expireTime - U.currentTimeMillis() <= 0)) {
+                        if (obsoleteVer == null)
+                            obsoleteVer = nextVersion();
+
                         if (onExpired(this.val, obsoleteVer)) {
                             if (cctx.deferredDelete()) {
                                 deferred = true;

http://git-wip-us.apache.org/repos/asf/ignite/blob/c6fbe2d8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
index 81d92e0..8688c4f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
@@ -203,7 +203,9 @@ public interface GridDhtPartitionTopology {
      * @param p Partition ID.
      * @param affAssignment Assignments.
      * @param affNodes Node assigned for given partition by affinity.
-     * @return Collection of all nodes responsible for this partition with primary node being first.
+     * @return Collection of all nodes responsible for this partition with primary node being first. The first N
+     *      elements of this collection (with N being 1 + backups) are actual DHT affinity nodes, other nodes
+     *      are current additional owners of the partition after topology change.
      */
     @Nullable public List<ClusterNode> nodes(int p, AffinityAssignment affAssignment, List<ClusterNode> affNodes);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/c6fbe2d8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index a8e13a0..880a102 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -380,9 +380,6 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                     }
                 }
             }
-
-            if (node2part != null && node2part.valid())
-                checkEvictions(updateSeq, aff);
         }
 
         updateRebalanceVersion(aff);
@@ -624,7 +621,12 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                 }
             }
 
-            updateRebalanceVersion(grp.affinity().assignments(topVer));
+            List<List<ClusterNode>> aff = grp.affinity().assignments(topVer);
+
+            updateRebalanceVersion(aff);
+
+            if (node2part != null && node2part.valid())
+                changed |= checkEvictions(updateSeq, aff);
 
             consistencyCheck();
         }
@@ -747,8 +749,9 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                         "[part=" + p + ", topVer=" + topVer + ", this.topVer=" + this.topVer + ']');
             }
             else if (loc != null && state == RENTING && !showRenting)
-                throw new GridDhtInvalidPartitionException(p, "Adding entry to partition that is concurrently evicted " +
-                    "[part=" + p + ", shouldBeMoving=" + loc.reload() + "]");
+                throw new GridDhtInvalidPartitionException(p, "Adding entry to partition that is concurrently " +
+                    "evicted [part=" + p + ", shouldBeMoving=" + loc.reload() + ", belongs=" + belongs +
+                    ", topVer=" + topVer + ", curTopVer=" + this.topVer + "]");
 
             if (loc == null) {
                 if (!belongs)
@@ -1312,7 +1315,8 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
             if (!affVer.equals(AffinityTopologyVersion.NONE) && affVer.compareTo(topVer) >= 0) {
                 List<List<ClusterNode>> aff = grp.affinity().assignments(topVer);
 
-                changed |= checkEvictions(updateSeq, aff);
+                if (exchangeVer == null)
+                    changed |= checkEvictions(updateSeq, aff);
 
                 updateRebalanceVersion(aff);
             }
@@ -1501,7 +1505,8 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
             if (!affVer.equals(AffinityTopologyVersion.NONE) && affVer.compareTo(topVer) >= 0) {
                 List<List<ClusterNode>> aff = grp.affinity().assignments(topVer);
 
-                changed |= checkEvictions(updateSeq, aff);
+                if (exchId == null)
+                    changed |= checkEvictions(updateSeq, aff);
 
                 updateRebalanceVersion(aff);
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/c6fbe2d8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index a31c540..03d99fc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -1510,7 +1510,7 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite
             try {
                 List<ClusterNode> dhtNodes = dht.topology().nodes(cached.partition(), tx.topologyVersion());
 
-                assert dhtNodes.size() > 0 && dhtNodes.get(0).id().equals(cctx.localNodeId()) : dhtNodes;
+                assert !dhtNodes.isEmpty() && dhtNodes.get(0).id().equals(cctx.localNodeId()) : dhtNodes;
 
                 if (log.isDebugEnabled())
                     log.debug("Mapping entry to DHT nodes [nodes=" + U.toShortString(dhtNodes) +
@@ -1531,7 +1531,7 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite
 
                         ClusterNode readerNode = cctx.discovery().node(readerId);
 
-                        if (readerNode == null || dhtNodes.contains(readerNode))
+                        if (readerNode == null || canSkipNearReader(dht, readerNode, dhtNodes))
                             continue;
 
                         if (log.isDebugEnabled())
@@ -1554,6 +1554,27 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite
     }
 
     /**
+     * This method checks if we should skip mapping of an entry update to the near reader. We can skip the update
+     * if the reader is a primary or a backup. If the reader is a partition owner, but not a primary or a backup,
+     * we cannot skip the reader update and must attempt to update a near entry anyway.
+     *
+     * @param dhtCache DHT cache to check mapping.
+     * @param readerNode Reader node.
+     * @param dhtNodes Current DHT nodes (primary + backups first and other DHT nodes afterwards).
+     * @return {@code true} if reader is either a primary or a backup.
+     */
+    private boolean canSkipNearReader(GridDhtCacheAdapter<?, ?> dhtCache, ClusterNode readerNode, List<ClusterNode> dhtNodes) {
+        int limit = Math.min(dhtCache.configuration().getBackups() + 1, dhtNodes.size());
+
+        for (int i = 0; i < limit; i++) {
+            if (dhtNodes.get(i).id().equals(readerNode.id()))
+                return true;
+        }
+
+        return false;
+    }
+
+    /**
      * @param entry Entry.
      * @param n Node.
      * @param globalMap Map.

http://git-wip-us.apache.org/repos/asf/ignite/blob/c6fbe2d8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
index 646281b..6e606bf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
@@ -249,7 +249,7 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
                 // If we are here, then we already tried to evict this entry.
                 // If cannot evict, then update.
                 if (this.dhtVer == null) {
-                    if (!markObsolete(dhtVer)) {
+                    if (!markObsolete(cctx.versions().next())) {
                         value(val);
 
                         ttlAndExpireTimeExtras((int) ttl, expireTime);

http://git-wip-us.apache.org/repos/asf/ignite/blob/c6fbe2d8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
index 29c7aad..875f397 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
@@ -406,13 +406,13 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
         StringBuilder flags = new StringBuilder();
 
         if (near())
-            flags.append("near");
+            flags.append("[near]");
         if (firstClientRequest())
-            flags.append("clientReq");
+            flags.append("[firstClientReq]");
         if (implicitSingle())
-            flags.append("single");
+            flags.append("[implicitSingle]");
         if (explicitLock())
-            flags.append("explicitLock");
+            flags.append("[explicitLock]");
 
         return S.toString(GridNearTxPrepareRequest.class, this,
             "flags", flags.toString(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/c6fbe2d8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index cd68bc9..82692ae 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -863,7 +863,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
 
                     GridNearCacheEntry e = near.peekExx(entry.key());
 
-                    if (e != null && e.markObsoleteIfEmpty(tx.xidVersion()))
+                    if (e != null && e.markObsoleteIfEmpty(null))
                         near.removeEntry(e);
                 }
             }
@@ -1191,6 +1191,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
 
             throw new IgniteCheckedException("Missing commit version (consider increasing " +
                 IGNITE_MAX_COMPLETED_TX_COUNT + " system property) [ver=" + tx.xidVersion() +
+                ", committed0=" + committed0 +
                 ", tx=" + tx.getClass().getSimpleName() + ']');
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/c6fbe2d8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteRejectConnectOnNodeStopTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteRejectConnectOnNodeStopTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteRejectConnectOnNodeStopTest.java
index d34de12..97d685f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteRejectConnectOnNodeStopTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteRejectConnectOnNodeStopTest.java
@@ -81,6 +81,11 @@ public class IgniteRejectConnectOnNodeStopTest extends GridCommonAbstractTest {
         return cfg;
     }
 
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids(true);
+    }
+
     /**
      * @throws Exception If failed.
      */
@@ -126,7 +131,7 @@ public class IgniteRejectConnectOnNodeStopTest extends GridCommonAbstractTest {
 
         boolean err = false;
 
-        try{
+        try {
             stopStartLatch.await();
 
             IgniteCacheMessageRecoveryAbstractTest.closeSessions(srv);

http://git-wip-us.apache.org/repos/asf/ignite/blob/c6fbe2d8/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
index 9b99e01..4965d16 100755
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
@@ -766,7 +766,8 @@ public abstract class GridAbstractTest extends TestCase {
                 Thread.sleep(1000);
         }
 
-        throw new Exception("Failed to wait for proper topology: " + cnt);
+        throw new Exception("Failed to wait for proper topology [expCnt=" + cnt +
+            ", actualTopology=" + grid(0).cluster().nodes() + ']');
     }
 
     /** */