You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2015/11/05 04:28:33 UTC

[41/66] [abbrv] ignite git commit: IGNITE-950 - Fixing context for async ops. Debug is enabled.

IGNITE-950 - Fixing context for async ops. Debug is enabled.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f261704c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f261704c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f261704c

Branch: refs/heads/ignite-1753-1282
Commit: f261704c3b83b97d95b2dabecfaf35096a6a4f9f
Parents: ab32d0a
Author: Alexey Goncharuk <al...@gmail.com>
Authored: Tue Nov 3 18:37:55 2015 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Tue Nov 3 18:37:55 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheAdapter.java      | 82 ++++++++++++--------
 .../dht/colocated/GridDhtColocatedCache.java    |  2 +-
 .../near/GridNearTransactionalCache.java        |  2 +-
 3 files changed, 50 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/f261704c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 9a61bdb..07a4ac7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -1823,7 +1823,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
                 @Override public IgniteInternalFuture<Map<K1, V1>> op(IgniteTxLocalAdapter tx) {
                     return tx.getAllAsync(ctx, keys, deserializePortable, skipVals, false, !readThrough);
                 }
-            });
+            }, ctx.operationContextPerCall());
         }
     }
 
@@ -3987,11 +3987,11 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
 
         IgniteTxLocalAdapter tx = ctx.tm().threadLocalTx(ctx);
 
+        CacheOperationContext opCtx = ctx.operationContextPerCall();
+
         if (tx == null || tx.implicit()) {
             boolean skipStore = ctx.skipStore(); // Save value of thread-local flag.
 
-            CacheOperationContext opCtx = ctx.operationContextPerCall();
-
             int retries = opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES;
 
             if (retries == 1) {
@@ -4005,10 +4005,10 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
                     !skipStore,
                     0);
 
-                return asyncOp(tx, op);
+                return asyncOp(tx, op, opCtx);
             }
             else {
-                AsyncOpRetryFuture<T> fut = new AsyncOpRetryFuture<>(op, skipStore, retries);
+                AsyncOpRetryFuture<T> fut = new AsyncOpRetryFuture<>(op, retries, opCtx);
 
                 fut.execute();
 
@@ -4016,7 +4016,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
             }
         }
         else
-            return asyncOp(tx, op);
+            return asyncOp(tx, op, opCtx);
     }
 
     /**
@@ -4026,7 +4026,11 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
      * @return Future.
      */
     @SuppressWarnings("unchecked")
-    protected <T> IgniteInternalFuture<T> asyncOp(IgniteTxLocalAdapter tx, final AsyncOp<T> op) {
+    protected <T> IgniteInternalFuture<T> asyncOp(
+        IgniteTxLocalAdapter tx,
+        final AsyncOp<T> op,
+        final CacheOperationContext opCtx
+    ) {
         IgniteInternalFuture<T> fail = asyncOpAcquire();
 
         if (fail != null)
@@ -4049,24 +4053,31 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
                                 return new GridFinishedFuture<>(
                                     new IgniteCheckedException("Operation has been cancelled (node is stopping)."));
 
-                            return op.op(tx0).chain(new CX1<IgniteInternalFuture<T>, T>() {
-                                @Override public T applyx(IgniteInternalFuture<T> tFut) throws IgniteCheckedException {
-                                    try {
-                                        return tFut.get();
-                                    }
-                                    catch (IgniteTxRollbackCheckedException e) {
-                                        throw e;
-                                    }
-                                    catch (IgniteCheckedException e1) {
-                                        tx0.rollbackAsync();
+                            ctx.operationContextPerCall(opCtx);
 
-                                        throw e1;
-                                    }
-                                    finally {
-                                        ctx.shared().txContextReset();
+                            try {
+                                return op.op(tx0).chain(new CX1<IgniteInternalFuture<T>, T>() {
+                                    @Override public T applyx(IgniteInternalFuture<T> tFut) throws IgniteCheckedException {
+                                        try {
+                                            return tFut.get();
+                                        }
+                                        catch (IgniteTxRollbackCheckedException e) {
+                                            throw e;
+                                        }
+                                        catch (IgniteCheckedException e1) {
+                                            tx0.rollbackAsync();
+
+                                            throw e1;
+                                        }
+                                        finally {
+                                            ctx.shared().txContextReset();
+                                        }
                                     }
-                                }
-                            });
+                                });
+                            }
+                            finally {
+                                ctx.operationContextPerCall(null);
+                            }
                         }
                     });
 
@@ -4631,28 +4642,31 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
         private AsyncOp<T> op;
 
         /** */
-        private boolean skipStore;
-
-        /** */
         private int retries;
 
         /** */
         private IgniteTxLocalAdapter tx;
 
+        /** */
+        private CacheOperationContext opCtx;
+
         /**
          * @param op Operation.
-         * @param skipStore Skip store flag.
          * @param retries Number of retries.
+         * @param opCtx Operation context per call to save.
          */
-        public AsyncOpRetryFuture(AsyncOp<T> op,
-            boolean skipStore,
-            int retries) {
+        public AsyncOpRetryFuture(
+            AsyncOp<T> op,
+            int retries,
+            CacheOperationContext opCtx
+        ) {
             assert retries > 1 : retries;
 
+            tx = null;
+
             this.op = op;
-            this.tx = null;
-            this.skipStore = skipStore;
             this.retries = retries;
+            this.opCtx = opCtx;
         }
 
         /**
@@ -4666,10 +4680,10 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
                 OPTIMISTIC,
                 READ_COMMITTED,
                 ctx.kernalContext().config().getTransactionConfiguration().getDefaultTxTimeout(),
-                !skipStore,
+                !opCtx.skipStore(),
                 0);
 
-            IgniteInternalFuture<T> fut = asyncOp(tx, op);
+            IgniteInternalFuture<T> fut = asyncOp(tx, op, opCtx);
 
             fut.listen(new IgniteInClosure<IgniteInternalFuture<T>>() {
                 @Override public void apply(IgniteInternalFuture<T> fut) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/f261704c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index efc10b2..907c68d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -216,7 +216,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
                         false,
                         opCtx != null && opCtx.skipStore());
                 }
-            });
+            }, opCtx);
         }
 
         AffinityTopologyVersion topVer = tx == null ?

http://git-wip-us.apache.org/repos/asf/ignite/blob/f261704c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
index 8740e44..65a054c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
@@ -147,7 +147,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V>
                         false,
                         skipStore);
                 }
-            });
+            }, opCtx);
         }
 
         subjId = ctx.subjectIdPerCall(subjId, opCtx);