You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/04/18 15:04:28 UTC
[29/46] ignite git commit: IGNITE-4985 - Do not acquire asyncOp
semaphore for retry operations
IGNITE-4985 - Do not acquire asyncOp semaphore for retry operations
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b2fb9be1
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b2fb9be1
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b2fb9be1
Branch: refs/heads/ignite-1561-1
Commit: b2fb9be1d0be6ec765ce706c9264116b90b3ce3a
Parents: 6e12daa
Author: Alexey Goncharuk <al...@gmail.com>
Authored: Tue Apr 18 12:56:33 2017 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Tue Apr 18 12:57:03 2017 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheAdapter.java | 41 ++++++++++----------
.../dht/atomic/GridDhtAtomicCache.java | 6 +--
.../dht/colocated/GridDhtColocatedCache.java | 6 +--
.../near/GridNearTransactionalCache.java | 2 +-
.../local/atomic/GridLocalAtomicCache.java | 6 +--
...ridCacheReplicatedSynchronousCommitTest.java | 2 +-
6 files changed, 31 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/b2fb9be1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index b9fa6c9..b38e481 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -2214,7 +2214,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
recovery,
needVer);
}
- }, ctx.operationContextPerCall());
+ }, ctx.operationContextPerCall(), /*retry*/false);
}
}
@@ -3971,14 +3971,14 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
}
});
- saveFuture(holder, f);
+ saveFuture(holder, f, /*retry*/false);
return f;
}
IgniteInternalFuture<IgniteInternalTx> f = tx.commitNearTxLocalAsync();
- saveFuture(holder, f);
+ saveFuture(holder, f, /*retry*/false);
ctx.tm().resetContext();
@@ -4139,18 +4139,18 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
!skipStore,
0);
- return asyncOp(tx, op, opCtx);
+ return asyncOp(tx, op, opCtx, /*retry*/false);
}
else {
AsyncOpRetryFuture<T> fut = new AsyncOpRetryFuture<>(op, retries, opCtx);
- fut.execute();
+ fut.execute(/*retry*/false);
return fut;
}
}
else
- return asyncOp(tx, op, opCtx);
+ return asyncOp(tx, op, opCtx, /*retry*/false);
}
/**
@@ -4164,9 +4164,10 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
protected <T> IgniteInternalFuture<T> asyncOp(
GridNearTxLocal tx,
final AsyncOp<T> op,
- final CacheOperationContext opCtx
+ final CacheOperationContext opCtx,
+ final boolean retry
) {
- IgniteInternalFuture<T> fail = asyncOpAcquire();
+ IgniteInternalFuture<T> fail = asyncOpAcquire(retry);
if (fail != null)
return fail;
@@ -4209,7 +4210,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
}
});
- saveFuture(holder, f);
+ saveFuture(holder, f, retry);
return f;
}
@@ -4233,7 +4234,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
}
});
- saveFuture(holder, f);
+ saveFuture(holder, f, retry);
if (tx.implicit())
ctx.tm().resetContext();
@@ -4252,7 +4253,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
* @param holder Future holder.
* @param fut Future to save.
*/
- protected void saveFuture(final FutureHolder holder, IgniteInternalFuture<?> fut) {
+ protected void saveFuture(final FutureHolder holder, IgniteInternalFuture<?> fut, final boolean retry) {
assert holder != null;
assert fut != null;
assert holder.holdsLock();
@@ -4262,12 +4263,12 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
if (fut.isDone()) {
holder.future(null);
- asyncOpRelease();
+ asyncOpRelease(retry);
}
else {
fut.listen(new CI1<IgniteInternalFuture<?>>() {
@Override public void apply(IgniteInternalFuture<?> f) {
- asyncOpRelease();
+ asyncOpRelease(retry);
if (!holder.tryLock())
return;
@@ -4289,9 +4290,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
*
* @return Failed future if waiting was interrupted.
*/
- @Nullable protected <T> IgniteInternalFuture<T> asyncOpAcquire() {
+ @Nullable protected <T> IgniteInternalFuture<T> asyncOpAcquire(boolean retry) {
try {
- if (asyncOpsSem != null)
+ if (!retry && asyncOpsSem != null)
asyncOpsSem.acquire();
return null;
@@ -4307,8 +4308,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
/**
* Releases asynchronous operations permit, if limited.
*/
- private void asyncOpRelease() {
- if (asyncOpsSem != null)
+ private void asyncOpRelease(boolean retry) {
+ if (!retry && asyncOpsSem != null)
asyncOpsSem.release();
}
@@ -4775,7 +4776,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
/**
*
*/
- public void execute() {
+ public void execute(boolean retry) {
tx = ctx.tm().newTx(
true,
op.single(),
@@ -4786,7 +4787,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
opCtx == null || !opCtx.skipStore(),
0);
- IgniteInternalFuture<T> fut = asyncOp(tx, op, opCtx);
+ IgniteInternalFuture<T> fut = asyncOp(tx, op, opCtx, retry);
fut.listen(new IgniteInClosure<IgniteInternalFuture<T>>() {
@Override public void apply(IgniteInternalFuture<T> fut) {
@@ -4816,7 +4817,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
try {
topFut.get();
- execute();
+ execute(/*retry*/true);
}
catch (IgniteCheckedException e) {
onDone(e);
http://git-wip-us.apache.org/repos/asf/ignite/blob/b2fb9be1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 87a5536..2dacd12 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -817,7 +817,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
*/
@SuppressWarnings("unchecked")
private <T> IgniteInternalFuture<T> asyncOp(final CO<IgniteInternalFuture<T>> op) {
- IgniteInternalFuture<T> fail = asyncOpAcquire();
+ IgniteInternalFuture<T> fail = asyncOpAcquire(/*retry*/false);
if (fail != null)
return fail;
@@ -841,14 +841,14 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
});
- saveFuture(holder, f);
+ saveFuture(holder, f, /*retry*/false);
return f;
}
IgniteInternalFuture<T> f = op.apply();
- saveFuture(holder, f);
+ saveFuture(holder, f, /*retry*/false);
return f;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b2fb9be1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index f922d09..2292cb2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -62,7 +62,6 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTran
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearUnlockRequest;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalAdapter;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.future.GridEmbeddedFuture;
@@ -72,7 +71,6 @@ import org.apache.ignite.internal.util.typedef.C2;
import org.apache.ignite.internal.util.typedef.CI2;
import org.apache.ignite.internal.util.typedef.CX1;
import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -241,7 +239,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
}
});
}
- }, opCtx);
+ }, opCtx, /*retry*/false);
}
AffinityTopologyVersion topVer = tx == null ?
@@ -308,7 +306,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
recovery,
needVer);
}
- }, opCtx);
+ }, opCtx, /*retry*/false);
}
AffinityTopologyVersion topVer = tx == null ?
http://git-wip-us.apache.org/repos/asf/ignite/blob/b2fb9be1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
index 1468e8a..cc90be0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
@@ -152,7 +152,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V>
recovery,
needVer);
}
- }, opCtx);
+ }, opCtx, /*retry*/false);
}
subjId = ctx.subjectIdPerCall(subjId, opCtx);
http://git-wip-us.apache.org/repos/asf/ignite/blob/b2fb9be1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
index b8c0e36..dfbc5af 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
@@ -1464,7 +1464,7 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> {
*/
@SuppressWarnings("unchecked")
private IgniteInternalFuture asyncOp(final Callable<?> op) {
- IgniteInternalFuture fail = asyncOpAcquire();
+ IgniteInternalFuture fail = asyncOpAcquire(/*retry*/false);
if (fail != null)
return fail;
@@ -1484,14 +1484,14 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> {
}
});
- saveFuture(holder, f);
+ saveFuture(holder, f, /*retry*/false);
return f;
}
IgniteInternalFuture f = ctx.closures().callLocalSafe(op);
- saveFuture(holder, f);
+ saveFuture(holder, f, /*retry*/false);
return f;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b2fb9be1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheReplicatedSynchronousCommitTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheReplicatedSynchronousCommitTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheReplicatedSynchronousCommitTest.java
index 3c241b8..10d75ce 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheReplicatedSynchronousCommitTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheReplicatedSynchronousCommitTest.java
@@ -119,7 +119,7 @@ public class GridCacheReplicatedSynchronousCommitTest extends GridCommonAbstract
for (TestCommunicationSpi commSpi : commSpis)
cnt += commSpi.messagesCount();
- assert cnt == ADDITION_CACHE_NUMBER;
+ assertEquals(ADDITION_CACHE_NUMBER, cnt);
}
finally {
stopAllGrids();