You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ib...@apache.org on 2021/06/29 07:32:20 UTC

[ignite] branch master updated: IGNITE-15017 Contention in lock on Compound future (#9201)

This is an automated email from the ASF dual-hosted git repository.

ibessonov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 0dddeca  IGNITE-15017 Contention in lock on Compound future (#9201)
0dddeca is described below

commit 0dddeca1bdde5c7e65bd5bbdf866adf50637b83a
Author: Vladislav Pyatkov <vl...@gmail.com>
AuthorDate: Tue Jun 29 10:32:04 2021 +0300

    IGNITE-15017 Contention in lock on Compound future (#9201)
---
 .../distributed/GridCacheTxRecoveryFuture.java     |   7 +-
 .../cache/distributed/dht/GridDhtLockFuture.java   |   7 +-
 .../distributed/dht/GridDhtTxPrepareFuture.java    |   7 +-
 .../dht/colocated/GridDhtColocatedLockFuture.java  |   7 +-
 .../cache/distributed/near/GridNearLockFuture.java |   7 +-
 ...dNearOptimisticSerializableTxPrepareFuture.java |   7 +-
 .../near/GridNearOptimisticTxPrepareFuture.java    |  21 +++-
 .../near/GridNearPessimisticTxPrepareFuture.java   |   7 +-
 .../distributed/near/GridNearTxFinishFuture.java   |  19 ++--
 .../near/GridNearTxQueryEnlistFuture.java          |  13 ++-
 .../internal/util/future/GridCompoundFuture.java   | 117 +++++++++++++++------
 .../metastorage/DistributedMetaStorageTest.java    |   4 +-
 12 files changed, 168 insertions(+), 55 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
index 43126b8..16b4fce 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
@@ -417,7 +417,9 @@ public class GridCacheTxRecoveryFuture extends GridCacheCompoundIdentityFuture<B
      */
     private MiniFuture miniFuture(IgniteUuid miniId) {
         // We iterate directly over the futs collection here to avoid copy.
-        synchronized (this) {
+        compoundsReadLock();
+
+        try {
             int size = futuresCountNoLock();
 
             // Avoid iterator creation.
@@ -437,6 +439,9 @@ public class GridCacheTxRecoveryFuture extends GridCacheCompoundIdentityFuture<B
                 }
             }
         }
+        finally {
+            compoundsReadUnlock();
+        }
 
         return null;
     }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
index 06e554c..c9b779c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
@@ -584,7 +584,9 @@ public final class GridDhtLockFuture extends GridCacheCompoundIdentityFuture<Boo
      */
     private MiniFuture miniFuture(IgniteUuid miniId) {
         // We iterate directly over the futs collection here to avoid copy.
-        synchronized (this) {
+        compoundsReadLock();
+
+        try {
             int size = futuresCountNoLock();
 
             // Avoid iterator creation.
@@ -599,6 +601,9 @@ public final class GridDhtLockFuture extends GridCacheCompoundIdentityFuture<Boo
                 }
             }
         }
+        finally {
+            compoundsReadUnlock();
+        }
 
         return null;
     }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index d04baea..ccadd5e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -618,7 +618,9 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite
      */
     private MiniFuture miniFuture(int miniId) {
         // We iterate directly over the futs collection here to avoid copy.
-        synchronized (this) {
+        compoundsReadLock();
+
+        try {
             int size = futuresCountNoLock();
 
             // Avoid iterator creation.
@@ -638,6 +640,9 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite
                 }
             }
         }
+        finally {
+            compoundsReadUnlock();
+        }
 
         return null;
     }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index d7e9922..03a9d62 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -550,7 +550,9 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF
     @SuppressWarnings({"IfMayBeConditional"})
     private MiniFuture miniFuture(int miniId) {
         // We iterate directly over the futs collection here to avoid copy.
-        synchronized (this) {
+        compoundsReadLock();
+
+        try {
             int size = futuresCountNoLock();
 
             // Avoid iterator creation.
@@ -570,6 +572,9 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF
                 }
             }
         }
+        finally {
+            compoundsReadUnlock();
+        }
 
         return null;
     }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
index 14767e7..9149eec 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
@@ -577,7 +577,9 @@ public final class GridNearLockFuture extends GridCacheCompoundIdentityFuture<Bo
     @SuppressWarnings({"IfMayBeConditional"})
     private MiniFuture miniFuture(int miniId) {
         // We iterate directly over the futs collection here to avoid copy.
-        synchronized (this) {
+        compoundsReadLock();
+
+        try {
             int size = futuresCountNoLock();
 
             // Avoid iterator creation.
@@ -597,6 +599,9 @@ public final class GridNearLockFuture extends GridCacheCompoundIdentityFuture<Bo
                 }
             }
         }
+        finally {
+            compoundsReadUnlock();
+        }
 
         return null;
     }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
index 3154114..f7c04c5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
@@ -237,7 +237,9 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
      */
     private MiniFuture miniFuture(int miniId) {
         // We iterate directly over the futs collection here to avoid copy.
-        synchronized (GridNearOptimisticSerializableTxPrepareFuture.this) {
+        compoundsReadLock();
+
+        try {
             int size = futuresCountNoLock();
 
             // Avoid iterator creation.
@@ -257,6 +259,9 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
                 }
             }
         }
+        finally {
+            compoundsReadUnlock();
+        }
 
         return null;
     }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index 1de3789..47b75eb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -197,7 +197,9 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
      * @return Keys for which {@code MiniFuture} isn't completed.
      */
     public Set<IgniteTxKey> requestedKeys() {
-        synchronized (this) {
+        compoundsReadLock();
+
+        try {
             int size = futuresCountNoLock();
 
             for (int i = 0; i < size; i++) {
@@ -217,6 +219,9 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
                 }
             }
         }
+        finally {
+            compoundsReadUnlock();
+        }
 
         return null;
     }
@@ -229,7 +234,9 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
      */
     private MiniFuture miniFuture(int miniId) {
         // We iterate directly over the futs collection here to avoid copy.
-        synchronized (this) {
+        compoundsReadLock();
+
+        try {
             int size = futuresCountNoLock();
 
             // Avoid iterator creation.
@@ -249,6 +256,9 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
                 }
             }
         }
+        finally {
+            compoundsReadUnlock();
+        }
 
         return null;
     }
@@ -718,7 +728,9 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
                 if (keyLockFut != null)
                     keys = new HashSet<>(keyLockFut.lockKeys);
                 else {
-                    synchronized (this) {
+                    compoundsReadLock();
+
+                    try {
                         int size = futuresCountNoLock();
 
                         for (int i = 0; i < size; i++) {
@@ -738,6 +750,9 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
                             }
                         }
                     }
+                    finally {
+                        compoundsReadUnlock();
+                    }
                 }
 
                 add(new GridEmbeddedFuture<>(new IgniteBiClosure<TxDeadlock, Exception, Object>() {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
index 5300ad3..1135dbc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
@@ -148,7 +148,9 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
      */
     private MiniFuture miniFuture(int miniId) {
         // We iterate directly over the futs collection here to avoid copy.
-        synchronized (this) {
+        compoundsReadLock();
+
+        try {
             int size = futuresCountNoLock();
 
             // Avoid iterator creation.
@@ -167,6 +169,9 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
                 }
             }
         }
+        finally {
+            compoundsReadUnlock();
+        }
 
         return null;
     }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index 7a4cf40..e74d8e6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@ -208,7 +208,9 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit
         if (!isDone()) {
             FinishMiniFuture finishFut = null;
 
-            synchronized (this) {
+            compoundsReadLock();
+
+            try {
                 int size = futuresCountNoLock();
 
                 for (int i = 0; i < size; i++) {
@@ -227,6 +229,9 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit
                     }
                 }
             }
+            finally {
+                compoundsReadUnlock();
+            }
 
             if (finishFut != null)
                 finishFut.onNearFinishResponse(res);
@@ -1015,15 +1020,15 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit
                         Collection<UUID> backups = txNodes.get(nodeId);
 
                         if (!F.isEmpty(backups)) {
-                            final CheckRemoteTxMiniFuture mini;
-
-                            synchronized (GridNearTxFinishFuture.this) {
+                            CheckRemoteTxMiniFuture mini = (CheckRemoteTxMiniFuture)compoundsLockedExclusively(() -> {
                                 int futId = Integer.MIN_VALUE + futuresCountNoLock();
 
-                                mini = new CheckRemoteTxMiniFuture(futId, new HashSet<>(backups));
+                                CheckRemoteTxMiniFuture miniFut = new CheckRemoteTxMiniFuture(futId, new HashSet<>(backups));
 
-                                add(mini);
-                            }
+                                add(miniFut);
+
+                                return miniFut;
+                            });
 
                             GridDhtTxFinishRequest req = checkCommittedRequest(mini.futureId(), true);
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryEnlistFuture.java
index 3ef2d36..77111da 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryEnlistFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryEnlistFuture.java
@@ -273,10 +273,17 @@ public class GridNearTxQueryEnlistFuture extends GridNearTxQueryAbstractEnlistFu
      * @param miniId Mini ID to find.
      * @return Mini future.
      */
-    private synchronized MiniFuture miniFuture(int miniId) {
-        IgniteInternalFuture<Long> fut = future(Math.abs(miniId) - 1);
+    private MiniFuture miniFuture(int miniId) {
+        compoundsReadLock();
 
-        return !fut.isDone() ? (MiniFuture)fut : null;
+        try {
+            IgniteInternalFuture<Long> fut = future(Math.abs(miniId) - 1);
+
+            return !fut.isDone() ? (MiniFuture)fut : null;
+        }
+        finally {
+            compoundsReadUnlock();
+        }
     }
 
     /**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
index 6eb3e4a..bca24ea 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
@@ -22,6 +22,8 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Supplier;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.IgniteFutureCancelledCheckedException;
@@ -72,6 +74,12 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> implements Ig
     @SuppressWarnings("unused")
     private volatile int lsnrCalls;
 
+    /** The lock responds for a consistency of compounds. */
+    private ReentrantReadWriteLock compoundsLock = new ReentrantReadWriteLock();
+
+    /** Count of compounds in the future. */
+    private volatile int size;
+
     /**
      * Default constructor.
      */
@@ -157,18 +165,55 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> implements Ig
     }
 
     /**
+     * Locks compound to read.
+     */
+    protected void compoundsReadLock() {
+        compoundsLock.readLock().lock();
+    }
+
+    /**
+     * Unlocks compound to read.
+     */
+    protected void compoundsReadUnlock() {
+        compoundsLock.readLock().unlock();
+    }
+
+    /**
+     * Locks compounds list and executes code in {@code supplier}, when the lock holds.
+     *
+     * @param supplier Closure to execute some code when the compounds are locked exclusively.
+     * @return A result of the {@code supplier}.
+     */
+    protected Object compoundsLockedExclusively(Supplier<Object> supplier) {
+        compoundsLock.writeLock().lock();
+        try {
+            return supplier.get();
+        }
+        finally {
+            compoundsLock.writeLock().unlock();
+        }
+    }
+
+    /**
      * Gets collection of futures.
      *
      * @return Collection of futures.
      */
-    public final synchronized Collection<IgniteInternalFuture<T>> futures() {
-        if (futs == null)
-            return Collections.emptyList();
+    public final Collection<IgniteInternalFuture<T>> futures() {
+        compoundsLock.readLock().lock();
 
-        if (futs instanceof IgniteInternalFuture)
-            return Collections.singletonList((IgniteInternalFuture<T>)futs);
+        try {
+            if (futs == null)
+                return Collections.emptyList();
+
+            if (futs instanceof IgniteInternalFuture)
+                return Collections.singletonList((IgniteInternalFuture<T>)futs);
 
-        return new ArrayList<>((Collection<IgniteInternalFuture<T>>)futs);
+            return new ArrayList<>((Collection<IgniteInternalFuture<T>>)futs);
+        }
+        finally {
+            compoundsLock.readLock().unlock();
+        }
     }
 
     /**
@@ -200,19 +245,24 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> implements Ig
      * @return {@code True} if there are pending futures.
      */
     protected final boolean hasPending() {
-        synchronized (this) {
-            int size = futuresCountNoLock();
+        compoundsLock.readLock().lock();
+
+        try {
+            int size0 = size;
 
             // Avoid iterator creation and collection copy.
-            for (int i = 0; i < size; i++) {
+            for (int i = 0; i < size0; i++) {
                 IgniteInternalFuture<T> fut = future(i);
 
                 if (!fut.isDone())
                     return true;
             }
-        }
 
-        return false;
+            return false;
+        }
+        finally {
+            compoundsLock.readLock().unlock();
+        }
     }
 
     /**
@@ -223,7 +273,9 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> implements Ig
     public final GridCompoundFuture<T, R> add(IgniteInternalFuture<T> fut) {
         assert fut != null;
 
-        synchronized (this) {
+        compoundsLock.writeLock().lock();
+
+        try {
             if (futs == null)
                 futs = fut;
             else if (futs instanceof IgniteInternalFuture) {
@@ -236,6 +288,11 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> implements Ig
             }
             else
                 ((Collection<IgniteInternalFuture>)futs).add(fut);
+
+            size++;
+        }
+        finally {
+            compoundsLock.writeLock().unlock();
         }
 
         fut.listen(this);
@@ -255,8 +312,16 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> implements Ig
     /**
      * Clear futures.
      */
-    protected final synchronized void clear() {
-        futs = null;
+    protected final void clear() {
+        compoundsLock.writeLock().lock();
+
+        try {
+            futs = null;
+            size = 0;
+        }
+        finally {
+            compoundsLock.writeLock().unlock();
+        }
     }
 
     /**
@@ -281,7 +346,7 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> implements Ig
      * Check completeness of the future.
      */
     private void checkComplete() {
-        if (initialized() && !isDone() && lsnrCalls == futuresCount()) {
+        if (initialized() && !isDone() && lsnrCalls == size) {
             try {
                 onDone(rdc != null ? rdc.reduce() : null);
             }
@@ -326,8 +391,7 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> implements Ig
      */
     @SuppressWarnings("unchecked")
     protected final IgniteInternalFuture<T> future(int idx) {
-        assert Thread.holdsLock(this);
-        assert futs != null && idx >= 0 && idx < futuresCountNoLock();
+        assert futs != null && idx >= 0 && idx < size;
 
         if (futs instanceof IgniteInternalFuture) {
             assert idx == 0;
@@ -342,28 +406,13 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> implements Ig
      * @return Futures size.
      */
     protected final int futuresCountNoLock() {
-        assert Thread.holdsLock(this);
-
-        if (futs == null)
-            return 0;
-
-        if (futs instanceof IgniteInternalFuture)
-            return 1;
-
-        return ((Collection<IgniteInternalFuture>)futs).size();
-    }
-
-    /**
-     * @return Futures size.
-     */
-    private synchronized int futuresCount() {
-        return futuresCountNoLock();
+        return size;
     }
 
     /**
      * @return {@code True} if has at least one future.
      */
-    protected final synchronized boolean hasFutures() {
+    protected final boolean hasFutures() {
         return futs != null;
     }
 
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStorageTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStorageTest.java
index a02ae06..30ce39e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStorageTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStorageTest.java
@@ -173,7 +173,9 @@ public class DistributedMetaStorageTest extends GridCommonAbstractTest {
             try {
                 IgniteKernal clientGrid = IgnitionEx.gridx(clientName);
 
-                return clientGrid != null && clientGrid.context().distributedMetastorage() != null;
+                return clientGrid != null
+                    && clientGrid.context().distributedMetastorage() != null
+                    && clientGrid.context().discovery().localNode() != null;
             }
             catch (Exception ignored) {
                 return false;