You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2015/11/05 04:28:33 UTC
[41/66] [abbrv] ignite git commit: IGNITE-950 - Fixing context for
async ops. Debug is enabled.
IGNITE-950 - Fixing context for async ops. Debug is enabled.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f261704c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f261704c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f261704c
Branch: refs/heads/ignite-1753-1282
Commit: f261704c3b83b97d95b2dabecfaf35096a6a4f9f
Parents: ab32d0a
Author: Alexey Goncharuk <al...@gmail.com>
Authored: Tue Nov 3 18:37:55 2015 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Tue Nov 3 18:37:55 2015 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheAdapter.java | 82 ++++++++++++--------
.../dht/colocated/GridDhtColocatedCache.java | 2 +-
.../near/GridNearTransactionalCache.java | 2 +-
3 files changed, 50 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/f261704c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 9a61bdb..07a4ac7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -1823,7 +1823,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
@Override public IgniteInternalFuture<Map<K1, V1>> op(IgniteTxLocalAdapter tx) {
return tx.getAllAsync(ctx, keys, deserializePortable, skipVals, false, !readThrough);
}
- });
+ }, ctx.operationContextPerCall());
}
}
@@ -3987,11 +3987,11 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
IgniteTxLocalAdapter tx = ctx.tm().threadLocalTx(ctx);
+ CacheOperationContext opCtx = ctx.operationContextPerCall();
+
if (tx == null || tx.implicit()) {
boolean skipStore = ctx.skipStore(); // Save value of thread-local flag.
- CacheOperationContext opCtx = ctx.operationContextPerCall();
-
int retries = opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES;
if (retries == 1) {
@@ -4005,10 +4005,10 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
!skipStore,
0);
- return asyncOp(tx, op);
+ return asyncOp(tx, op, opCtx);
}
else {
- AsyncOpRetryFuture<T> fut = new AsyncOpRetryFuture<>(op, skipStore, retries);
+ AsyncOpRetryFuture<T> fut = new AsyncOpRetryFuture<>(op, retries, opCtx);
fut.execute();
@@ -4016,7 +4016,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
}
}
else
- return asyncOp(tx, op);
+ return asyncOp(tx, op, opCtx);
}
/**
@@ -4026,7 +4026,11 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
* @return Future.
*/
@SuppressWarnings("unchecked")
- protected <T> IgniteInternalFuture<T> asyncOp(IgniteTxLocalAdapter tx, final AsyncOp<T> op) {
+ protected <T> IgniteInternalFuture<T> asyncOp(
+ IgniteTxLocalAdapter tx,
+ final AsyncOp<T> op,
+ final CacheOperationContext opCtx
+ ) {
IgniteInternalFuture<T> fail = asyncOpAcquire();
if (fail != null)
@@ -4049,24 +4053,31 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
return new GridFinishedFuture<>(
new IgniteCheckedException("Operation has been cancelled (node is stopping)."));
- return op.op(tx0).chain(new CX1<IgniteInternalFuture<T>, T>() {
- @Override public T applyx(IgniteInternalFuture<T> tFut) throws IgniteCheckedException {
- try {
- return tFut.get();
- }
- catch (IgniteTxRollbackCheckedException e) {
- throw e;
- }
- catch (IgniteCheckedException e1) {
- tx0.rollbackAsync();
+ ctx.operationContextPerCall(opCtx);
- throw e1;
- }
- finally {
- ctx.shared().txContextReset();
+ try {
+ return op.op(tx0).chain(new CX1<IgniteInternalFuture<T>, T>() {
+ @Override public T applyx(IgniteInternalFuture<T> tFut) throws IgniteCheckedException {
+ try {
+ return tFut.get();
+ }
+ catch (IgniteTxRollbackCheckedException e) {
+ throw e;
+ }
+ catch (IgniteCheckedException e1) {
+ tx0.rollbackAsync();
+
+ throw e1;
+ }
+ finally {
+ ctx.shared().txContextReset();
+ }
}
- }
- });
+ });
+ }
+ finally {
+ ctx.operationContextPerCall(null);
+ }
}
});
@@ -4631,28 +4642,31 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
private AsyncOp<T> op;
/** */
- private boolean skipStore;
-
- /** */
private int retries;
/** */
private IgniteTxLocalAdapter tx;
+ /** */
+ private CacheOperationContext opCtx;
+
/**
* @param op Operation.
- * @param skipStore Skip store flag.
* @param retries Number of retries.
+ * @param opCtx Operation context per call to save.
*/
- public AsyncOpRetryFuture(AsyncOp<T> op,
- boolean skipStore,
- int retries) {
+ public AsyncOpRetryFuture(
+ AsyncOp<T> op,
+ int retries,
+ CacheOperationContext opCtx
+ ) {
assert retries > 1 : retries;
+ tx = null;
+
this.op = op;
- this.tx = null;
- this.skipStore = skipStore;
this.retries = retries;
+ this.opCtx = opCtx;
}
/**
@@ -4666,10 +4680,10 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
OPTIMISTIC,
READ_COMMITTED,
ctx.kernalContext().config().getTransactionConfiguration().getDefaultTxTimeout(),
- !skipStore,
+ !opCtx.skipStore(),
0);
- IgniteInternalFuture<T> fut = asyncOp(tx, op);
+ IgniteInternalFuture<T> fut = asyncOp(tx, op, opCtx);
fut.listen(new IgniteInClosure<IgniteInternalFuture<T>>() {
@Override public void apply(IgniteInternalFuture<T> fut) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/f261704c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index efc10b2..907c68d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -216,7 +216,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
false,
opCtx != null && opCtx.skipStore());
}
- });
+ }, opCtx);
}
AffinityTopologyVersion topVer = tx == null ?
http://git-wip-us.apache.org/repos/asf/ignite/blob/f261704c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
index 8740e44..65a054c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
@@ -147,7 +147,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V>
false,
skipStore);
}
- });
+ }, opCtx);
}
subjId = ctx.subjectIdPerCall(subjId, opCtx);