You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2015/08/19 09:39:58 UTC

[04/50] incubator-ignite git commit: ignite-946: fixing version retrieval for transactions

ignite-946: fixing version retrieval for transactions


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/2e7799d4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/2e7799d4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/2e7799d4

Branch: refs/heads/ignite-gg-9615-1
Commit: 2e7799d446653bba379cc231628ba2b02c993e5e
Parents: f0fe076
Author: Denis Magda <dm...@gridgain.com>
Authored: Fri Jul 31 15:32:45 2015 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Fri Jul 31 15:49:14 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/CacheInvokeEntry.java      |  2 +-
 .../processors/cache/GridCacheMapEntry.java     |  4 +-
 .../cache/transactions/IgniteTxAdapter.java     | 14 ++-
 .../cache/transactions/IgniteTxEntry.java       | 11 ++-
 .../transactions/IgniteTxLocalAdapter.java      | 90 ++++++++++++++++----
 5 files changed, 98 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2e7799d4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeEntry.java
index e6f8d4e..2d8f738 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeEntry.java
@@ -121,7 +121,7 @@ public class CacheInvokeEntry<K, V> extends CacheLazyEntry<K, V> implements Muta
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
     @Override public <T> T unwrap(Class<T> cls) {
-        if (cls.isAssignableFrom(VersionedEntry.class))
+        if (cls.isAssignableFrom(VersionedEntry.class) && ver != null)
             return (T)new CacheVersionedEntryImpl<>(getKey(), getValue(), ver);
 
         return super.unwrap(cls);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2e7799d4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index ebcb908..43cf2fe 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -1653,7 +1653,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
                         oldVal = rawGetOrUnmarshalUnlocked(true);
 
-                        CacheInvokeEntry<Object, Object> entry = new CacheInvokeEntry(cctx, key, oldVal, this.ver);
+                        CacheInvokeEntry<Object, Object> entry = new CacheInvokeEntry(cctx, key, oldVal, version());
 
                         try {
                             Object computed = entryProcessor.process(entry, invokeArgs);
@@ -1878,7 +1878,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
                 EntryProcessor<Object, Object, ?> entryProcessor = (EntryProcessor<Object, Object, ?>)writeObj;
 
-                CacheInvokeEntry<Object, Object> entry = new CacheInvokeEntry(cctx, key, oldVal, this.ver);
+                CacheInvokeEntry<Object, Object> entry = new CacheInvokeEntry(cctx, key, oldVal, version());
 
                 try {
                     Object computed = entryProcessor.process(entry, invokeArgs);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2e7799d4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index 0d14012..797f75e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -1228,9 +1228,21 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
 
                 Object key = null;
 
+                GridCacheVersion ver;
+
+                try {
+                    ver = txEntry.cached().version();
+                }
+                catch (GridCacheEntryRemovedException e) {
+                    if (log.isDebugEnabled())
+                        log.debug("Failed to get entry version: [msg=" + e.getMessage() + ']');
+
+                    ver = null;
+                }
+
                 for (T2<EntryProcessor<Object, Object, Object>, Object[]> t : txEntry.entryProcessors()) {
                     CacheInvokeEntry<Object, Object> invokeEntry = new CacheInvokeEntry(txEntry.context(),
-                        txEntry.key(), key, cacheVal, val, txEntry.cached().version());
+                        txEntry.key(), key, cacheVal, val, ver);
 
                     try {
                         EntryProcessor<Object, Object, Object> processor = t.get1();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2e7799d4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
index 7f06380..ed57bf2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
@@ -571,10 +571,19 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
         Object val = null;
         Object keyVal = null;
 
+        GridCacheVersion ver;
+
+        try {
+            ver = entry.version();
+        }
+        catch (GridCacheEntryRemovedException e) {
+            ver = null;
+        }
+
         for (T2<EntryProcessor<Object, Object, Object>, Object[]> t : entryProcessors()) {
             try {
                 CacheInvokeEntry<Object, Object> invokeEntry = new CacheInvokeEntry(ctx, key, keyVal, cacheVal, val,
-                    entry.version());
+                    ver);
 
                 EntryProcessor processor = t.get1();
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2e7799d4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index d8797fe..7f171c2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -1938,13 +1938,13 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
         Map<KeyCacheObject, GridCacheDrInfo> drMap
     ) {
         return this.<Object, Object>putAllAsync0(cacheCtx,
-            null,
-            null,
-            null,
-            drMap,
-            false,
-            null,
-            null);
+                                                 null,
+                                                 null,
+                                                 null,
+                                                 drMap,
+                                                 false,
+                                                 null,
+                                                 null);
     }
 
     /** {@inheritDoc} */
@@ -2229,8 +2229,22 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                                     if (retval && !transform)
                                         ret.set(cacheCtx, old, true);
                                     else {
-                                        if (txEntry.op() == TRANSFORM)
-                                            addInvokeResult(txEntry, old, ret);
+                                        if (txEntry.op() == TRANSFORM) {
+                                            GridCacheVersion ver;
+
+                                            try {
+                                                ver = entry.version();
+                                            }
+                                            catch (GridCacheEntryRemovedException ex) {
+                                                if (log.isDebugEnabled())
+                                                    log.debug("Failed to get entry version: [msg=" +
+                                                        ex.getMessage() + ']');
+
+                                                ver = null;
+                                            }
+
+                                            addInvokeResult(txEntry, old, ret, ver);
+                                        }
                                         else
                                             ret.success(true);
                                     }
@@ -2290,8 +2304,21 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
 
                         enlisted.add(cacheKey);
 
-                        if (txEntry.op() == TRANSFORM)
-                            addInvokeResult(txEntry, txEntry.value(), ret);
+                        if (txEntry.op() == TRANSFORM) {
+                            GridCacheVersion ver;
+
+                            try {
+                                ver = entry.version();
+                            }
+                            catch (GridCacheEntryRemovedException e) {
+                                if (log.isDebugEnabled())
+                                    log.debug("Failed to get entry version: [msg=" + e.getMessage() + ']');
+
+                                ver = null;
+                            }
+
+                            addInvokeResult(txEntry, txEntry.value(), ret, ver);
+                        }
                     }
 
                     if (!pessimistic()) {
@@ -2328,8 +2355,21 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
 
                         CacheObject cacheVal = cacheCtx.toCacheObject(val);
 
-                        if (e.op() == TRANSFORM)
-                            addInvokeResult(e, cacheVal, ret);
+                        if (e.op() == TRANSFORM) {
+                            GridCacheVersion ver;
+
+                            try {
+                                ver = e.cached().version();
+                            }
+                            catch (GridCacheEntryRemovedException ex) {
+                                if (log.isDebugEnabled())
+                                    log.debug("Failed to get entry version: [msg=" + ex.getMessage() + ']');
+
+                                ver = null;
+                            }
+
+                            addInvokeResult(e, cacheVal, ret, ver);
+                        }
                         else
                             ret.set(cacheCtx, cacheVal, true);
                     }
@@ -2442,8 +2482,21 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                         }
 
                         if (txEntry.op() == TRANSFORM) {
-                            if (computeInvoke)
-                                addInvokeResult(txEntry, v, ret);
+                            if (computeInvoke) {
+                                GridCacheVersion ver;
+
+                                try {
+                                    ver = cached.version();
+                                }
+                                catch (GridCacheEntryRemovedException e) {
+                                    if (log.isDebugEnabled())
+                                        log.debug("Failed to get entry version: [msg=" + e.getMessage() + ']');
+
+                                    ver = null;
+                                }
+
+                                addInvokeResult(txEntry, v, ret, ver);
+                            }
                         }
                         else
                             ret.value(cacheCtx, v);
@@ -2510,8 +2563,10 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
      * @param txEntry Entry.
      * @param cacheVal Value.
      * @param ret Return value to update.
+     * @param ver Entry version.
      */
-    private void addInvokeResult(IgniteTxEntry txEntry, CacheObject cacheVal, GridCacheReturn ret) {
+    private void addInvokeResult(IgniteTxEntry txEntry, CacheObject cacheVal, GridCacheReturn ret,
+        GridCacheVersion ver) {
         GridCacheContext ctx = txEntry.context();
 
         Object key0 = null;
@@ -2522,8 +2577,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
 
             for (T2<EntryProcessor<Object, Object, Object>, Object[]> t : txEntry.entryProcessors()) {
                 CacheInvokeEntry<Object, Object> invokeEntry =
-                    new CacheInvokeEntry(txEntry.context(), txEntry.key(), key0, cacheVal, val0,
-                        txEntry.cached().version());
+                    new CacheInvokeEntry(txEntry.context(), txEntry.key(), key0, cacheVal, val0, ver);
 
                 EntryProcessor<Object, Object, ?> entryProcessor = t.get1();