You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vk...@apache.org on 2015/07/23 04:53:25 UTC

[22/24] incubator-ignite git commit: IGNITE-104 - Ordered ATOMIC updates

IGNITE-104 - Ordered ATOMIC updates


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/73f1be64
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/73f1be64
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/73f1be64

Branch: refs/heads/ignite-104
Commit: 73f1be64fe7beef1b687aaadcc3184991e49fa10
Parents: b181411
Author: Valentin Kulichenko <vk...@gridgain.com>
Authored: Wed Jul 22 18:38:45 2015 -0700
Committer: Valentin Kulichenko <vk...@gridgain.com>
Committed: Wed Jul 22 18:38:45 2015 -0700

----------------------------------------------------------------------
 .../processors/cache/GridCacheMapEntry.java     |  9 ++++--
 .../dht/atomic/GridDhtAtomicCache.java          | 31 ++++++++++++++------
 .../distributed/near/GridNearAtomicCache.java   | 22 +++++++++++---
 3 files changed, 46 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/73f1be64/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 4680994..b0237f8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -1611,7 +1611,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
         CacheObject oldVal;
         CacheObject updated;
 
-        GridCacheVersion enqueueVer = null;
+        GridCacheVersion rmvVer = null;
 
         GridCacheVersionConflictContext<?, ?> conflictCtx = null;
 
@@ -2120,7 +2120,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
                     }
                 }
 
-                enqueueVer = newVer;
+                rmvVer = newVer;
 
                 boolean hasValPtr = hasOffHeapPointer();
 
@@ -2163,6 +2163,9 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
                     }
                 }
 
+                if (!cctx.deferredDelete())
+                    markObsolete(rmvVer);
+
                 res = hadVal;
             }
 
@@ -2194,7 +2197,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
             invokeRes,
             newSysTtl,
             newSysExpireTime,
-            enqueueVer,
+            rmvVer,
             conflictCtx,
             true);
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/73f1be64/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index aaf373d..fb309c3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -1181,13 +1181,22 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                 if (locked != null)
                     unlockEntries(locked, req.topologyVersion());
 
-                // Enqueue if necessary after locks release.
                 if (deleted != null) {
                     assert !deleted.isEmpty();
-                    assert ctx.deferredDelete(this) : this;
 
-                    for (IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion> e : deleted)
-                        ctx.onDeferredDelete(e.get1(), e.get2());
+                    boolean deferred = ctx.deferredDelete();
+
+                    for (IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion> e : deleted) {
+                        if (deferred)
+                            ctx.onDeferredDelete(e.get1(), e.get2());
+                        else {
+                            GridDhtCacheEntry entry = e.get1();
+
+                            assert entry.obsolete();
+
+                            removeEntry(entry);
+                        }
+                    }
                 }
             }
         }
@@ -2182,9 +2191,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      * @param topVer Topology version.
      */
     private void unlockEntries(Collection<GridDhtCacheEntry> locked, AffinityTopologyVersion topVer) {
-        // Process deleted entries before locks release.
-        assert ctx.deferredDelete(this) : this;
-
         // Entries to skip eviction manager notification for.
         // Enqueue entries while holding locks.
         Collection<KeyCacheObject> skip = null;
@@ -2468,8 +2474,15 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                             req.subjectId(),
                             taskName);
 
-                        if (updRes.removeVersion() != null)
-                            ctx.onDeferredDelete(entry, updRes.removeVersion());
+                        if (updRes.removeVersion() != null) {
+                            if (ctx.deferredDelete())
+                                ctx.onDeferredDelete(entry, updRes.removeVersion());
+                            else {
+                                assert entry.obsolete();
+
+                                removeEntry(entry);
+                            }
+                        }
 
                         entry.onUnlock();
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/73f1be64/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
index ed07f8f..f8fa573 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
@@ -241,8 +241,15 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
                         subjId,
                         taskName);
 
-                    if (updRes.removeVersion() != null)
-                        ctx.onDeferredDelete(entry, updRes.removeVersion());
+                    if (updRes.removeVersion() != null) {
+                        if (ctx.deferredDelete())
+                            ctx.onDeferredDelete(entry, updRes.removeVersion());
+                        else {
+                            assert entry.obsolete();
+
+                            removeEntry(entry);
+                        }
+                    }
 
                     break; // While.
                 }
@@ -339,8 +346,15 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
                             req.subjectId(),
                             taskName);
 
-                        if (updRes.removeVersion() != null)
-                            ctx.onDeferredDelete(entry, updRes.removeVersion());
+                        if (updRes.removeVersion() != null) {
+                            if (ctx.deferredDelete())
+                                ctx.onDeferredDelete(entry, updRes.removeVersion());
+                            else {
+                                assert entry.obsolete();
+
+                                removeEntry(entry);
+                            }
+                        }
 
                         break;
                     }