You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2017/04/18 13:46:32 UTC

[47/62] [abbrv] ignite git commit: IGNITE-4985 - Do not acquire asyncOp semaphore for retry operations

IGNITE-4985 - Do not acquire asyncOp semaphore for retry operations


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b2fb9be1
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b2fb9be1
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b2fb9be1

Branch: refs/heads/ignite-4938
Commit: b2fb9be1d0be6ec765ce706c9264116b90b3ce3a
Parents: 6e12daa
Author: Alexey Goncharuk <al...@gmail.com>
Authored: Tue Apr 18 12:56:33 2017 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Tue Apr 18 12:57:03 2017 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheAdapter.java      | 41 ++++++++++----------
 .../dht/atomic/GridDhtAtomicCache.java          |  6 +--
 .../dht/colocated/GridDhtColocatedCache.java    |  6 +--
 .../near/GridNearTransactionalCache.java        |  2 +-
 .../local/atomic/GridLocalAtomicCache.java      |  6 +--
 ...ridCacheReplicatedSynchronousCommitTest.java |  2 +-
 6 files changed, 31 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/b2fb9be1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index b9fa6c9..b38e481 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -2214,7 +2214,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
                         recovery,
                         needVer);
                 }
-            }, ctx.operationContextPerCall());
+            }, ctx.operationContextPerCall(), /*retry*/false);
         }
     }
 
@@ -3971,14 +3971,14 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
                         }
                     });
 
-                saveFuture(holder, f);
+                saveFuture(holder, f, /*retry*/false);
 
                 return f;
             }
 
             IgniteInternalFuture<IgniteInternalTx> f = tx.commitNearTxLocalAsync();
 
-            saveFuture(holder, f);
+            saveFuture(holder, f, /*retry*/false);
 
             ctx.tm().resetContext();
 
@@ -4139,18 +4139,18 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
                     !skipStore,
                     0);
 
-                return asyncOp(tx, op, opCtx);
+                return asyncOp(tx, op, opCtx, /*retry*/false);
             }
             else {
                 AsyncOpRetryFuture<T> fut = new AsyncOpRetryFuture<>(op, retries, opCtx);
 
-                fut.execute();
+                fut.execute(/*retry*/false);
 
                 return fut;
             }
         }
         else
-            return asyncOp(tx, op, opCtx);
+            return asyncOp(tx, op, opCtx, /*retry*/false);
     }
 
     /**
@@ -4164,9 +4164,10 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
     protected <T> IgniteInternalFuture<T> asyncOp(
         GridNearTxLocal tx,
         final AsyncOp<T> op,
-        final CacheOperationContext opCtx
+        final CacheOperationContext opCtx,
+        final boolean retry
     ) {
-        IgniteInternalFuture<T> fail = asyncOpAcquire();
+        IgniteInternalFuture<T> fail = asyncOpAcquire(retry);
 
         if (fail != null)
             return fail;
@@ -4209,7 +4210,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
                         }
                     });
 
-                saveFuture(holder, f);
+                saveFuture(holder, f, retry);
 
                 return f;
             }
@@ -4233,7 +4234,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
                 }
             });
 
-            saveFuture(holder, f);
+            saveFuture(holder, f, retry);
 
             if (tx.implicit())
                 ctx.tm().resetContext();
@@ -4252,7 +4253,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
      * @param holder Future holder.
      * @param fut Future to save.
      */
-    protected void saveFuture(final FutureHolder holder, IgniteInternalFuture<?> fut) {
+    protected void saveFuture(final FutureHolder holder, IgniteInternalFuture<?> fut, final boolean retry) {
         assert holder != null;
         assert fut != null;
         assert holder.holdsLock();
@@ -4262,12 +4263,12 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
         if (fut.isDone()) {
             holder.future(null);
 
-            asyncOpRelease();
+            asyncOpRelease(retry);
         }
         else {
             fut.listen(new CI1<IgniteInternalFuture<?>>() {
                 @Override public void apply(IgniteInternalFuture<?> f) {
-                    asyncOpRelease();
+                    asyncOpRelease(retry);
 
                     if (!holder.tryLock())
                         return;
@@ -4289,9 +4290,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
      *
      * @return Failed future if waiting was interrupted.
      */
-    @Nullable protected <T> IgniteInternalFuture<T> asyncOpAcquire() {
+    @Nullable protected <T> IgniteInternalFuture<T> asyncOpAcquire(boolean retry) {
         try {
-            if (asyncOpsSem != null)
+            if (!retry && asyncOpsSem != null)
                 asyncOpsSem.acquire();
 
             return null;
@@ -4307,8 +4308,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
     /**
      * Releases asynchronous operations permit, if limited.
      */
-    private void asyncOpRelease() {
-        if (asyncOpsSem != null)
+    private void asyncOpRelease(boolean retry) {
+        if (!retry && asyncOpsSem != null)
             asyncOpsSem.release();
     }
 
@@ -4775,7 +4776,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
         /**
          *
          */
-        public void execute() {
+        public void execute(boolean retry) {
             tx = ctx.tm().newTx(
                 true,
                 op.single(),
@@ -4786,7 +4787,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
                 opCtx == null || !opCtx.skipStore(),
                 0);
 
-            IgniteInternalFuture<T> fut = asyncOp(tx, op, opCtx);
+            IgniteInternalFuture<T> fut = asyncOp(tx, op, opCtx, retry);
 
             fut.listen(new IgniteInClosure<IgniteInternalFuture<T>>() {
                 @Override public void apply(IgniteInternalFuture<T> fut) {
@@ -4816,7 +4817,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
                                         try {
                                             topFut.get();
 
-                                            execute();
+                                            execute(/*retry*/true);
                                         }
                                         catch (IgniteCheckedException e) {
                                             onDone(e);

http://git-wip-us.apache.org/repos/asf/ignite/blob/b2fb9be1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 87a5536..2dacd12 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -817,7 +817,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      */
     @SuppressWarnings("unchecked")
     private <T> IgniteInternalFuture<T> asyncOp(final CO<IgniteInternalFuture<T>> op) {
-        IgniteInternalFuture<T> fail = asyncOpAcquire();
+        IgniteInternalFuture<T> fail = asyncOpAcquire(/*retry*/false);
 
         if (fail != null)
             return fail;
@@ -841,14 +841,14 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                         }
                     });
 
-                saveFuture(holder, f);
+                saveFuture(holder, f, /*retry*/false);
 
                 return f;
             }
 
             IgniteInternalFuture<T> f = op.apply();
 
-            saveFuture(holder, f);
+            saveFuture(holder, f, /*retry*/false);
 
             return f;
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/b2fb9be1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index f922d09..2292cb2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -62,7 +62,6 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTran
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearUnlockRequest;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalAdapter;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.future.GridEmbeddedFuture;
@@ -72,7 +71,6 @@ import org.apache.ignite.internal.util.typedef.C2;
 import org.apache.ignite.internal.util.typedef.CI2;
 import org.apache.ignite.internal.util.typedef.CX1;
 import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -241,7 +239,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
                         }
                     });
                 }
-            }, opCtx);
+            }, opCtx, /*retry*/false);
         }
 
         AffinityTopologyVersion topVer = tx == null ?
@@ -308,7 +306,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
                         recovery,
                         needVer);
                 }
-            }, opCtx);
+            }, opCtx, /*retry*/false);
         }
 
         AffinityTopologyVersion topVer = tx == null ?

http://git-wip-us.apache.org/repos/asf/ignite/blob/b2fb9be1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
index 1468e8a..cc90be0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
@@ -152,7 +152,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V>
                         recovery,
                         needVer);
                 }
-            }, opCtx);
+            }, opCtx, /*retry*/false);
         }
 
         subjId = ctx.subjectIdPerCall(subjId, opCtx);

http://git-wip-us.apache.org/repos/asf/ignite/blob/b2fb9be1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
index b8c0e36..dfbc5af 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
@@ -1464,7 +1464,7 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> {
      */
     @SuppressWarnings("unchecked")
     private IgniteInternalFuture asyncOp(final Callable<?> op) {
-        IgniteInternalFuture fail = asyncOpAcquire();
+        IgniteInternalFuture fail = asyncOpAcquire(/*retry*/false);
 
         if (fail != null)
             return fail;
@@ -1484,14 +1484,14 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> {
                         }
                     });
 
-                saveFuture(holder, f);
+                saveFuture(holder, f, /*retry*/false);
 
                 return f;
             }
 
             IgniteInternalFuture f = ctx.closures().callLocalSafe(op);
 
-            saveFuture(holder, f);
+            saveFuture(holder, f, /*retry*/false);
 
             return f;
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/b2fb9be1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheReplicatedSynchronousCommitTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheReplicatedSynchronousCommitTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheReplicatedSynchronousCommitTest.java
index 3c241b8..10d75ce 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheReplicatedSynchronousCommitTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheReplicatedSynchronousCommitTest.java
@@ -119,7 +119,7 @@ public class GridCacheReplicatedSynchronousCommitTest extends GridCommonAbstract
             for (TestCommunicationSpi commSpi : commSpis)
                 cnt += commSpi.messagesCount();
 
-            assert cnt == ADDITION_CACHE_NUMBER;
+            assertEquals(ADDITION_CACHE_NUMBER, cnt);
         }
         finally {
             stopAllGrids();