You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ib...@apache.org on 2021/06/29 07:32:20 UTC
[ignite] branch master updated: IGNITE-15017 Contention in lock on
Compound future (#9201)
This is an automated email from the ASF dual-hosted git repository.
ibessonov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 0dddeca IGNITE-15017 Contention in lock on Compound future (#9201)
0dddeca is described below
commit 0dddeca1bdde5c7e65bd5bbdf866adf50637b83a
Author: Vladislav Pyatkov <vl...@gmail.com>
AuthorDate: Tue Jun 29 10:32:04 2021 +0300
IGNITE-15017 Contention in lock on Compound future (#9201)
---
.../distributed/GridCacheTxRecoveryFuture.java | 7 +-
.../cache/distributed/dht/GridDhtLockFuture.java | 7 +-
.../distributed/dht/GridDhtTxPrepareFuture.java | 7 +-
.../dht/colocated/GridDhtColocatedLockFuture.java | 7 +-
.../cache/distributed/near/GridNearLockFuture.java | 7 +-
...dNearOptimisticSerializableTxPrepareFuture.java | 7 +-
.../near/GridNearOptimisticTxPrepareFuture.java | 21 +++-
.../near/GridNearPessimisticTxPrepareFuture.java | 7 +-
.../distributed/near/GridNearTxFinishFuture.java | 19 ++--
.../near/GridNearTxQueryEnlistFuture.java | 13 ++-
.../internal/util/future/GridCompoundFuture.java | 117 +++++++++++++++------
.../metastorage/DistributedMetaStorageTest.java | 4 +-
12 files changed, 168 insertions(+), 55 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
index 43126b8..16b4fce 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
@@ -417,7 +417,9 @@ public class GridCacheTxRecoveryFuture extends GridCacheCompoundIdentityFuture<B
*/
private MiniFuture miniFuture(IgniteUuid miniId) {
// We iterate directly over the futs collection here to avoid copy.
- synchronized (this) {
+ compoundsReadLock();
+
+ try {
int size = futuresCountNoLock();
// Avoid iterator creation.
@@ -437,6 +439,9 @@ public class GridCacheTxRecoveryFuture extends GridCacheCompoundIdentityFuture<B
}
}
}
+ finally {
+ compoundsReadUnlock();
+ }
return null;
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
index 06e554c..c9b779c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
@@ -584,7 +584,9 @@ public final class GridDhtLockFuture extends GridCacheCompoundIdentityFuture<Boo
*/
private MiniFuture miniFuture(IgniteUuid miniId) {
// We iterate directly over the futs collection here to avoid copy.
- synchronized (this) {
+ compoundsReadLock();
+
+ try {
int size = futuresCountNoLock();
// Avoid iterator creation.
@@ -599,6 +601,9 @@ public final class GridDhtLockFuture extends GridCacheCompoundIdentityFuture<Boo
}
}
}
+ finally {
+ compoundsReadUnlock();
+ }
return null;
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index d04baea..ccadd5e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -618,7 +618,9 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite
*/
private MiniFuture miniFuture(int miniId) {
// We iterate directly over the futs collection here to avoid copy.
- synchronized (this) {
+ compoundsReadLock();
+
+ try {
int size = futuresCountNoLock();
// Avoid iterator creation.
@@ -638,6 +640,9 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite
}
}
}
+ finally {
+ compoundsReadUnlock();
+ }
return null;
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index d7e9922..03a9d62 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -550,7 +550,9 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF
@SuppressWarnings({"IfMayBeConditional"})
private MiniFuture miniFuture(int miniId) {
// We iterate directly over the futs collection here to avoid copy.
- synchronized (this) {
+ compoundsReadLock();
+
+ try {
int size = futuresCountNoLock();
// Avoid iterator creation.
@@ -570,6 +572,9 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF
}
}
}
+ finally {
+ compoundsReadUnlock();
+ }
return null;
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
index 14767e7..9149eec 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
@@ -577,7 +577,9 @@ public final class GridNearLockFuture extends GridCacheCompoundIdentityFuture<Bo
@SuppressWarnings({"IfMayBeConditional"})
private MiniFuture miniFuture(int miniId) {
// We iterate directly over the futs collection here to avoid copy.
- synchronized (this) {
+ compoundsReadLock();
+
+ try {
int size = futuresCountNoLock();
// Avoid iterator creation.
@@ -597,6 +599,9 @@ public final class GridNearLockFuture extends GridCacheCompoundIdentityFuture<Bo
}
}
}
+ finally {
+ compoundsReadUnlock();
+ }
return null;
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
index 3154114..f7c04c5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
@@ -237,7 +237,9 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
*/
private MiniFuture miniFuture(int miniId) {
// We iterate directly over the futs collection here to avoid copy.
- synchronized (GridNearOptimisticSerializableTxPrepareFuture.this) {
+ compoundsReadLock();
+
+ try {
int size = futuresCountNoLock();
// Avoid iterator creation.
@@ -257,6 +259,9 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
}
}
}
+ finally {
+ compoundsReadUnlock();
+ }
return null;
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index 1de3789..47b75eb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -197,7 +197,9 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
* @return Keys for which {@code MiniFuture} isn't completed.
*/
public Set<IgniteTxKey> requestedKeys() {
- synchronized (this) {
+ compoundsReadLock();
+
+ try {
int size = futuresCountNoLock();
for (int i = 0; i < size; i++) {
@@ -217,6 +219,9 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
}
}
}
+ finally {
+ compoundsReadUnlock();
+ }
return null;
}
@@ -229,7 +234,9 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
*/
private MiniFuture miniFuture(int miniId) {
// We iterate directly over the futs collection here to avoid copy.
- synchronized (this) {
+ compoundsReadLock();
+
+ try {
int size = futuresCountNoLock();
// Avoid iterator creation.
@@ -249,6 +256,9 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
}
}
}
+ finally {
+ compoundsReadUnlock();
+ }
return null;
}
@@ -718,7 +728,9 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
if (keyLockFut != null)
keys = new HashSet<>(keyLockFut.lockKeys);
else {
- synchronized (this) {
+ compoundsReadLock();
+
+ try {
int size = futuresCountNoLock();
for (int i = 0; i < size; i++) {
@@ -738,6 +750,9 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
}
}
}
+ finally {
+ compoundsReadUnlock();
+ }
}
add(new GridEmbeddedFuture<>(new IgniteBiClosure<TxDeadlock, Exception, Object>() {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
index 5300ad3..1135dbc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
@@ -148,7 +148,9 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
*/
private MiniFuture miniFuture(int miniId) {
// We iterate directly over the futs collection here to avoid copy.
- synchronized (this) {
+ compoundsReadLock();
+
+ try {
int size = futuresCountNoLock();
// Avoid iterator creation.
@@ -167,6 +169,9 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
}
}
}
+ finally {
+ compoundsReadUnlock();
+ }
return null;
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index 7a4cf40..e74d8e6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@ -208,7 +208,9 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit
if (!isDone()) {
FinishMiniFuture finishFut = null;
- synchronized (this) {
+ compoundsReadLock();
+
+ try {
int size = futuresCountNoLock();
for (int i = 0; i < size; i++) {
@@ -227,6 +229,9 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit
}
}
}
+ finally {
+ compoundsReadUnlock();
+ }
if (finishFut != null)
finishFut.onNearFinishResponse(res);
@@ -1015,15 +1020,15 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit
Collection<UUID> backups = txNodes.get(nodeId);
if (!F.isEmpty(backups)) {
- final CheckRemoteTxMiniFuture mini;
-
- synchronized (GridNearTxFinishFuture.this) {
+ CheckRemoteTxMiniFuture mini = (CheckRemoteTxMiniFuture)compoundsLockedExclusively(() -> {
int futId = Integer.MIN_VALUE + futuresCountNoLock();
- mini = new CheckRemoteTxMiniFuture(futId, new HashSet<>(backups));
+ CheckRemoteTxMiniFuture miniFut = new CheckRemoteTxMiniFuture(futId, new HashSet<>(backups));
- add(mini);
- }
+ add(miniFut);
+
+ return miniFut;
+ });
GridDhtTxFinishRequest req = checkCommittedRequest(mini.futureId(), true);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryEnlistFuture.java
index 3ef2d36..77111da 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryEnlistFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryEnlistFuture.java
@@ -273,10 +273,17 @@ public class GridNearTxQueryEnlistFuture extends GridNearTxQueryAbstractEnlistFu
* @param miniId Mini ID to find.
* @return Mini future.
*/
- private synchronized MiniFuture miniFuture(int miniId) {
- IgniteInternalFuture<Long> fut = future(Math.abs(miniId) - 1);
+ private MiniFuture miniFuture(int miniId) {
+ compoundsReadLock();
- return !fut.isDone() ? (MiniFuture)fut : null;
+ try {
+ IgniteInternalFuture<Long> fut = future(Math.abs(miniId) - 1);
+
+ return !fut.isDone() ? (MiniFuture)fut : null;
+ }
+ finally {
+ compoundsReadUnlock();
+ }
}
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
index 6eb3e4a..bca24ea 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
@@ -22,6 +22,8 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Supplier;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.IgniteFutureCancelledCheckedException;
@@ -72,6 +74,12 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> implements Ig
@SuppressWarnings("unused")
private volatile int lsnrCalls;
+ /** The lock responds for a consistency of compounds. */
+ private ReentrantReadWriteLock compoundsLock = new ReentrantReadWriteLock();
+
+ /** Count of compounds in the future. */
+ private volatile int size;
+
/**
* Default constructor.
*/
@@ -157,18 +165,55 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> implements Ig
}
/**
+ * Locks compound to read.
+ */
+ protected void compoundsReadLock() {
+ compoundsLock.readLock().lock();
+ }
+
+ /**
+ * Unlocks compound to read.
+ */
+ protected void compoundsReadUnlock() {
+ compoundsLock.readLock().unlock();
+ }
+
+ /**
+ * Locks compounds list and executes code in {@code supplier}, when the lock holds.
+ *
+ * @param supplier Closure to execute some code when the compounds are locked exclusively.
+ * @return A result of the {@code supplier}.
+ */
+ protected Object compoundsLockedExclusively(Supplier<Object> supplier) {
+ compoundsLock.writeLock().lock();
+ try {
+ return supplier.get();
+ }
+ finally {
+ compoundsLock.writeLock().unlock();
+ }
+ }
+
+ /**
* Gets collection of futures.
*
* @return Collection of futures.
*/
- public final synchronized Collection<IgniteInternalFuture<T>> futures() {
- if (futs == null)
- return Collections.emptyList();
+ public final Collection<IgniteInternalFuture<T>> futures() {
+ compoundsLock.readLock().lock();
- if (futs instanceof IgniteInternalFuture)
- return Collections.singletonList((IgniteInternalFuture<T>)futs);
+ try {
+ if (futs == null)
+ return Collections.emptyList();
+
+ if (futs instanceof IgniteInternalFuture)
+ return Collections.singletonList((IgniteInternalFuture<T>)futs);
- return new ArrayList<>((Collection<IgniteInternalFuture<T>>)futs);
+ return new ArrayList<>((Collection<IgniteInternalFuture<T>>)futs);
+ }
+ finally {
+ compoundsLock.readLock().unlock();
+ }
}
/**
@@ -200,19 +245,24 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> implements Ig
* @return {@code True} if there are pending futures.
*/
protected final boolean hasPending() {
- synchronized (this) {
- int size = futuresCountNoLock();
+ compoundsLock.readLock().lock();
+
+ try {
+ int size0 = size;
// Avoid iterator creation and collection copy.
- for (int i = 0; i < size; i++) {
+ for (int i = 0; i < size0; i++) {
IgniteInternalFuture<T> fut = future(i);
if (!fut.isDone())
return true;
}
- }
- return false;
+ return false;
+ }
+ finally {
+ compoundsLock.readLock().unlock();
+ }
}
/**
@@ -223,7 +273,9 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> implements Ig
public final GridCompoundFuture<T, R> add(IgniteInternalFuture<T> fut) {
assert fut != null;
- synchronized (this) {
+ compoundsLock.writeLock().lock();
+
+ try {
if (futs == null)
futs = fut;
else if (futs instanceof IgniteInternalFuture) {
@@ -236,6 +288,11 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> implements Ig
}
else
((Collection<IgniteInternalFuture>)futs).add(fut);
+
+ size++;
+ }
+ finally {
+ compoundsLock.writeLock().unlock();
}
fut.listen(this);
@@ -255,8 +312,16 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> implements Ig
/**
* Clear futures.
*/
- protected final synchronized void clear() {
- futs = null;
+ protected final void clear() {
+ compoundsLock.writeLock().lock();
+
+ try {
+ futs = null;
+ size = 0;
+ }
+ finally {
+ compoundsLock.writeLock().unlock();
+ }
}
/**
@@ -281,7 +346,7 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> implements Ig
* Check completeness of the future.
*/
private void checkComplete() {
- if (initialized() && !isDone() && lsnrCalls == futuresCount()) {
+ if (initialized() && !isDone() && lsnrCalls == size) {
try {
onDone(rdc != null ? rdc.reduce() : null);
}
@@ -326,8 +391,7 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> implements Ig
*/
@SuppressWarnings("unchecked")
protected final IgniteInternalFuture<T> future(int idx) {
- assert Thread.holdsLock(this);
- assert futs != null && idx >= 0 && idx < futuresCountNoLock();
+ assert futs != null && idx >= 0 && idx < size;
if (futs instanceof IgniteInternalFuture) {
assert idx == 0;
@@ -342,28 +406,13 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> implements Ig
* @return Futures size.
*/
protected final int futuresCountNoLock() {
- assert Thread.holdsLock(this);
-
- if (futs == null)
- return 0;
-
- if (futs instanceof IgniteInternalFuture)
- return 1;
-
- return ((Collection<IgniteInternalFuture>)futs).size();
- }
-
- /**
- * @return Futures size.
- */
- private synchronized int futuresCount() {
- return futuresCountNoLock();
+ return size;
}
/**
* @return {@code True} if has at least one future.
*/
- protected final synchronized boolean hasFutures() {
+ protected final boolean hasFutures() {
return futs != null;
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStorageTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStorageTest.java
index a02ae06..30ce39e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStorageTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStorageTest.java
@@ -173,7 +173,9 @@ public class DistributedMetaStorageTest extends GridCommonAbstractTest {
try {
IgniteKernal clientGrid = IgnitionEx.gridx(clientName);
- return clientGrid != null && clientGrid.context().distributedMetastorage() != null;
+ return clientGrid != null
+ && clientGrid.context().distributedMetastorage() != null
+ && clientGrid.context().discovery().localNode() != null;
}
catch (Exception ignored) {
return false;