You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/07/04 08:19:45 UTC

[22/28] ignite git commit: IGNITE-5613 - Fixed deadlock on sequence update inside transaction

IGNITE-5613 - Fixed deadlock on sequence update inside transaction


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7db925c1
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7db925c1
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7db925c1

Branch: refs/heads/ignite-2.1.2-exchange
Commit: 7db925c1c445260ffe45a8ede54d24d99db0fddd
Parents: b762417
Author: Alexey Goncharuk <al...@gmail.com>
Authored: Thu Jun 29 17:11:39 2017 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Mon Jul 3 12:14:59 2017 +0300

----------------------------------------------------------------------
 .../GridCacheAtomicSequenceImpl.java            | 99 ++++++++++++--------
 1 file changed, 59 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/7db925c1/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
index 5a87e4a..31ec16f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
@@ -30,6 +30,7 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
@@ -91,8 +92,14 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc
     /**  Sequence batch size */
     private volatile int batchSize;
 
-    /** Synchronization lock. */
-    private final Lock lock = new ReentrantLock();
+    /** Synchronization lock for local value updates. */
+    private final Lock localUpdate = new ReentrantLock();
+
+    /** Synchronization for distributed sequence update. Acquired by threads with free topology (not in TX). */
+    private final ReentrantLock distUpdateFreeTop = new ReentrantLock();
+
+    /** Synchronization for distributed sequence update. Acquired by threads with locked topology (inside TX). */
+    private final ReentrantLock distUpdateLockedTop = new ReentrantLock();
 
     /** Callable for execution {@link #incrementAndGet} operation in async and sync mode.  */
     private final Callable<Long> incAndGetCall = internalUpdate(1, true);
@@ -214,7 +221,7 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc
 
         assert l > 0;
 
-        lock.lock();
+        localUpdate.lock();
 
         try {
             // If reserved range isn't exhausted.
@@ -225,7 +232,24 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc
 
                 return updated ? locVal0 + l : locVal0;
             }
+        }
+        finally {
+            localUpdate.unlock();
+        }
+
+        AffinityTopologyVersion lockedVer = ctx.shared().lockedTopologyVersion(null);
+
+        // We need two separate locks here because two independent thread may attempt to update the sequence
+        // simultaneously, one thread with locked topology and other with unlocked.
+        // We cannot use the same lock for both cases because it leads to a deadlock when free-topology thread
+        // waits for topology change, and locked topology thread waits to acquire the lock.
+        // If a thread has locked topology, it must bypass sync with non-locked threads, but at the same time
+        // we do not want multiple threads to attempt to run identical cache updates.
+        ReentrantLock distLock = lockedVer == null ? distUpdateFreeTop : distUpdateLockedTop;
+
+        distLock.lock();
 
+        try {
             if (updateCall == null)
                 updateCall = internalUpdate(l, updated);
 
@@ -240,7 +264,7 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc
             }
         }
         finally {
-            lock.unlock();
+            distLock.unlock();
         }
     }
 
@@ -260,13 +284,13 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc
     @Override public void batchSize(int size) {
         A.ensure(size > 0, " Batch size can't be less then 0: " + size);
 
-        lock.lock();
+        localUpdate.lock();
 
         try {
             batchSize = size;
         }
         finally {
-            lock.unlock();
+            localUpdate.unlock();
         }
     }
 
@@ -348,6 +372,8 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc
     private Callable<Long> internalUpdate(final long l, final boolean updated) {
         return new Callable<Long>() {
             @Override public Long call() throws Exception {
+                assert distUpdateFreeTop.isHeldByCurrentThread() || distUpdateLockedTop.isHeldByCurrentThread();
+
                 try (GridNearTxLocal tx = CU.txStartInternal(ctx, seqView, PESSIMISTIC, REPEATABLE_READ)) {
                     GridCacheAtomicSequenceValue seq = seqView.get(key);
 
@@ -359,46 +385,39 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc
 
                     long newUpBound;
 
-                    lock.lock();
+                    curLocVal = locVal;
 
-                    try {
-                        curLocVal = locVal;
+                    // If local range was already reserved in another thread.
+                    if (curLocVal + l <= upBound) {
+                        locVal = curLocVal + l;
 
-                        // If local range was already reserved in another thread.
-                        if (curLocVal + l <= upBound) {
-                            locVal = curLocVal + l;
-
-                            return updated ? curLocVal + l : curLocVal;
-                        }
+                        return updated ? curLocVal + l : curLocVal;
+                    }
 
-                        long curGlobalVal = seq.get();
+                    long curGlobalVal = seq.get();
 
-                        long newLocVal;
+                    long newLocVal;
 
-                        /* We should use offset because we already reserved left side of range.*/
-                        long off = batchSize > 1 ? batchSize - 1 : 1;
+                    /* We should use offset because we already reserved left side of range.*/
+                    long off = batchSize > 1 ? batchSize - 1 : 1;
 
-                        // Calculate new values for local counter, global counter and upper bound.
-                        if (curLocVal + l >= curGlobalVal) {
-                            newLocVal = curLocVal + l;
+                    // Calculate new values for local counter, global counter and upper bound.
+                    if (curLocVal + l >= curGlobalVal) {
+                        newLocVal = curLocVal + l;
 
-                            newUpBound = newLocVal + off;
-                        }
-                        else {
-                            newLocVal = curGlobalVal;
+                        newUpBound = newLocVal + off;
+                    }
+                    else {
+                        newLocVal = curGlobalVal;
 
-                            newUpBound = newLocVal + off;
-                        }
+                        newUpBound = newLocVal + off;
+                    }
 
-                        locVal = newLocVal;
-                        upBound = newUpBound;
+                    locVal = newLocVal;
+                    upBound = newUpBound;
 
-                        if (updated)
-                            curLocVal = newLocVal;
-                    }
-                    finally {
-                        lock.unlock();
-                    }
+                    if (updated)
+                        curLocVal = newLocVal;
 
                     // Global counter must be more than reserved upper bound.
                     seq.set(newUpBound + 1);
@@ -419,13 +438,13 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc
     }
 
     /** {@inheritDoc} */
-    @Override public void onActivate(GridKernalContext kctx) throws IgniteCheckedException {
-        this.seqView = kctx.cache().atomicsCache();
-        this.ctx = seqView.context();
+    @Override public void onActivate(GridKernalContext kctx) {
+        seqView = kctx.cache().atomicsCache();
+        ctx = seqView.context();
     }
 
     /** {@inheritDoc} */
-    @Override public void onDeActivate(GridKernalContext kctx) throws IgniteCheckedException {
+    @Override public void onDeActivate(GridKernalContext kctx) {
 
     }