You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/07/03 14:05:10 UTC
[10/14] ignite git commit: IGNITE-5613 - Fixed deadlock on sequence
update inside transaction
IGNITE-5613 - Fixed deadlock on sequence update inside transaction
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7db925c1
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7db925c1
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7db925c1
Branch: refs/heads/ignite-gg-12389
Commit: 7db925c1c445260ffe45a8ede54d24d99db0fddd
Parents: b762417
Author: Alexey Goncharuk <al...@gmail.com>
Authored: Thu Jun 29 17:11:39 2017 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Mon Jul 3 12:14:59 2017 +0300
----------------------------------------------------------------------
.../GridCacheAtomicSequenceImpl.java | 99 ++++++++++++--------
1 file changed, 59 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/7db925c1/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
index 5a87e4a..31ec16f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
@@ -30,6 +30,7 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
@@ -91,8 +92,14 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc
/** Sequence batch size */
private volatile int batchSize;
- /** Synchronization lock. */
- private final Lock lock = new ReentrantLock();
+ /** Synchronization lock for local value updates. */
+ private final Lock localUpdate = new ReentrantLock();
+
+ /** Synchronization for distributed sequence update. Acquired by threads with free topology (not in TX). */
+ private final ReentrantLock distUpdateFreeTop = new ReentrantLock();
+
+ /** Synchronization for distributed sequence update. Acquired by threads with locked topology (inside TX). */
+ private final ReentrantLock distUpdateLockedTop = new ReentrantLock();
/** Callable for execution {@link #incrementAndGet} operation in async and sync mode. */
private final Callable<Long> incAndGetCall = internalUpdate(1, true);
@@ -214,7 +221,7 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc
assert l > 0;
- lock.lock();
+ localUpdate.lock();
try {
// If reserved range isn't exhausted.
@@ -225,7 +232,24 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc
return updated ? locVal0 + l : locVal0;
}
+ }
+ finally {
+ localUpdate.unlock();
+ }
+
+ AffinityTopologyVersion lockedVer = ctx.shared().lockedTopologyVersion(null);
+
+ // We need two separate locks here because two independent thread may attempt to update the sequence
+ // simultaneously, one thread with locked topology and other with unlocked.
+ // We cannot use the same lock for both cases because it leads to a deadlock when free-topology thread
+ // waits for topology change, and locked topology thread waits to acquire the lock.
+ // If a thread has locked topology, it must bypass sync with non-locked threads, but at the same time
+ // we do not want multiple threads to attempt to run identical cache updates.
+ ReentrantLock distLock = lockedVer == null ? distUpdateFreeTop : distUpdateLockedTop;
+
+ distLock.lock();
+ try {
if (updateCall == null)
updateCall = internalUpdate(l, updated);
@@ -240,7 +264,7 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc
}
}
finally {
- lock.unlock();
+ distLock.unlock();
}
}
@@ -260,13 +284,13 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc
@Override public void batchSize(int size) {
A.ensure(size > 0, " Batch size can't be less then 0: " + size);
- lock.lock();
+ localUpdate.lock();
try {
batchSize = size;
}
finally {
- lock.unlock();
+ localUpdate.unlock();
}
}
@@ -348,6 +372,8 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc
private Callable<Long> internalUpdate(final long l, final boolean updated) {
return new Callable<Long>() {
@Override public Long call() throws Exception {
+ assert distUpdateFreeTop.isHeldByCurrentThread() || distUpdateLockedTop.isHeldByCurrentThread();
+
try (GridNearTxLocal tx = CU.txStartInternal(ctx, seqView, PESSIMISTIC, REPEATABLE_READ)) {
GridCacheAtomicSequenceValue seq = seqView.get(key);
@@ -359,46 +385,39 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc
long newUpBound;
- lock.lock();
+ curLocVal = locVal;
- try {
- curLocVal = locVal;
+ // If local range was already reserved in another thread.
+ if (curLocVal + l <= upBound) {
+ locVal = curLocVal + l;
- // If local range was already reserved in another thread.
- if (curLocVal + l <= upBound) {
- locVal = curLocVal + l;
-
- return updated ? curLocVal + l : curLocVal;
- }
+ return updated ? curLocVal + l : curLocVal;
+ }
- long curGlobalVal = seq.get();
+ long curGlobalVal = seq.get();
- long newLocVal;
+ long newLocVal;
- /* We should use offset because we already reserved left side of range.*/
- long off = batchSize > 1 ? batchSize - 1 : 1;
+ /* We should use offset because we already reserved left side of range.*/
+ long off = batchSize > 1 ? batchSize - 1 : 1;
- // Calculate new values for local counter, global counter and upper bound.
- if (curLocVal + l >= curGlobalVal) {
- newLocVal = curLocVal + l;
+ // Calculate new values for local counter, global counter and upper bound.
+ if (curLocVal + l >= curGlobalVal) {
+ newLocVal = curLocVal + l;
- newUpBound = newLocVal + off;
- }
- else {
- newLocVal = curGlobalVal;
+ newUpBound = newLocVal + off;
+ }
+ else {
+ newLocVal = curGlobalVal;
- newUpBound = newLocVal + off;
- }
+ newUpBound = newLocVal + off;
+ }
- locVal = newLocVal;
- upBound = newUpBound;
+ locVal = newLocVal;
+ upBound = newUpBound;
- if (updated)
- curLocVal = newLocVal;
- }
- finally {
- lock.unlock();
- }
+ if (updated)
+ curLocVal = newLocVal;
// Global counter must be more than reserved upper bound.
seq.set(newUpBound + 1);
@@ -419,13 +438,13 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc
}
/** {@inheritDoc} */
- @Override public void onActivate(GridKernalContext kctx) throws IgniteCheckedException {
- this.seqView = kctx.cache().atomicsCache();
- this.ctx = seqView.context();
+ @Override public void onActivate(GridKernalContext kctx) {
+ seqView = kctx.cache().atomicsCache();
+ ctx = seqView.context();
}
/** {@inheritDoc} */
- @Override public void onDeActivate(GridKernalContext kctx) throws IgniteCheckedException {
+ @Override public void onDeActivate(GridKernalContext kctx) {
}