You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2015/09/23 02:57:19 UTC

[03/14] ignite git commit: ignite-973 Fixed atomic cache 'remove' to always provide old value for indexing

ignite-973 Fixed atomic cache 'remove' to always provide old value for indexing


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/621eb0f7
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/621eb0f7
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/621eb0f7

Branch: refs/heads/ignite-1171-debug
Commit: 621eb0f75bbe1a0a623229dded38a3549309eead
Parents: 8b94494
Author: sboikov <se...@inria.fr>
Authored: Mon Sep 21 21:37:52 2015 +0300
Committer: sboikov <se...@inria.fr>
Committed: Mon Sep 21 21:37:52 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheMapEntry.java     | 37 +++++++++++++-------
 .../processors/cache/GridCacheProcessor.java    |  2 +-
 .../processors/cache/GridCacheSwapManager.java  | 24 ++++++-------
 .../datastreamer/DataStreamerImpl.java          |  2 --
 4 files changed, 37 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/621eb0f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index f2bb646..961c792 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -1588,6 +1588,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
                 boolean hasValPtr = hasOffHeapPointer();
 
+                if (old == null)
+                    old = saveValueForIndexUnlocked();
+
                 // Update index inside synchronization since it can be updated
                 // in load methods without actually holding entry lock.
                 clearIndex(old);
@@ -2163,6 +2166,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                     // Must persist inside synchronization in non-tx mode.
                     cctx.store().remove(null, keyValue(false));
 
+                if (oldVal == null)
+                    oldVal = saveValueForIndexUnlocked();
+
                 // Update index inside synchronization since it can be updated
                 // in load methods without actually holding entry lock.
                 clearIndex(oldVal);
@@ -3342,7 +3348,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
         try {
             synchronized (this) {
-                CacheObject expiredVal = saveValueForIndexUnlocked();
+                CacheObject expiredVal = saveOldValueUnlocked(false);
 
                 boolean hasOldBytes = hasOffHeapPointer();
 
@@ -3523,12 +3529,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
         try {
             GridCacheQueryManager qryMgr = cctx.queries();
 
-            if (qryMgr != null && qryMgr.enabled()) {
-                qryMgr.store(key,
-                    val,
-                    ver,
-                    expireTime);
-            }
+            if (qryMgr.enabled())
+                qryMgr.store(key, val, ver, expireTime);
         }
         catch (IgniteCheckedException e) {
             throw new GridCacheIndexUpdateException(e);
@@ -3547,8 +3549,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
         try {
             GridCacheQueryManager<?, ?> qryMgr = cctx.queries();
 
-            if (qryMgr != null)
-                qryMgr.remove(key(), prevVal == null ? null : prevVal);
+            if (qryMgr.enabled())
+                qryMgr.remove(key(), prevVal);
         }
         catch (IgniteCheckedException e) {
             throw new GridCacheIndexUpdateException(e);
@@ -3562,10 +3564,19 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
      * @return Previous value or {@code null}.
      * @throws IgniteCheckedException If failed to retrieve previous value.
      */
-    protected CacheObject saveValueForIndexUnlocked() throws IgniteCheckedException {
+    protected final CacheObject saveValueForIndexUnlocked() throws IgniteCheckedException {
+        return saveOldValueUnlocked(true);
+    }
+
+    /**
+     * @param qryOnly If {@code true} reads old value only if query indexing is enabled.
+     * @return Previous value or {@code null}.
+     * @throws IgniteCheckedException If failed to retrieve previous value.
+     */
+    private CacheObject saveOldValueUnlocked(boolean qryOnly) throws IgniteCheckedException {
         assert Thread.holdsLock(this);
 
-        if (cctx.queries() == null)
+        if (qryOnly && !cctx.queries().enabled())
             return null;
 
         CacheObject val = rawGetOrUnmarshalUnlocked(false);
@@ -3681,7 +3692,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                     if (obsoleteVersionExtras() != null)
                         return true;
 
-                    CacheObject prev = saveValueForIndexUnlocked();
+                    CacheObject prev = saveOldValueUnlocked(false);
 
                     if (!hasReaders() && markObsolete0(obsoleteVer, false)) {
                         if (swap) {
@@ -3791,7 +3802,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
                 GridCacheQueryManager qryMgr = cctx.queries();
 
-                if (qryMgr != null)
+                if (qryMgr.enabled())
                     qryMgr.onUnswap(key, prevVal);
             }
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/621eb0f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index c92de7d..7c16136 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -2759,7 +2759,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         if (spaceName.equals(CU.swapSpaceName(cctx))) {
             GridCacheQueryManager qryMgr = cctx.queries();
 
-            if (qryMgr != null) {
+            if (qryMgr.enabled()) {
                 try {
                     KeyCacheObject key = cctx.toCacheKeyObject(keyBytes);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/621eb0f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
index 9b6381e..d9a8b5c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
@@ -696,12 +696,12 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
 
         final GridCacheQueryManager qryMgr = cctx.queries();
 
-        if (qryMgr != null && !readSwapBeforeRemove(key, swapKey, ldr))
+        if (qryMgr.enabled() && !readSwapBeforeRemove(key, swapKey, ldr))
             return null; // Not found.
 
         swapMgr.remove(spaceName, swapKey, new CI1<byte[]>() {
             @Override public void apply(byte[] rmv) {
-                if (qryMgr == null && cctx.config().isStatisticsEnabled())
+                if (!qryMgr.enabled() && cctx.config().isStatisticsEnabled())
                     cctx.cache().metrics0().onSwapRead(rmv != null);
 
                 if (rmv != null) {
@@ -843,7 +843,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
 
         GridCacheSwapEntry entry;
 
-        if (qryMgr != null) {
+        if (qryMgr.enabled()) {
             entry = readOffheapBeforeRemove(key, keyBytes, part);
 
             if (entry != null) {
@@ -952,7 +952,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
 
         ClassLoader ldr = cctx.deploy().globalLoader();
 
-        if (qryMgr != null) { // Unswap for indexing.
+        if (qryMgr.enabled()) { // Unswap for indexing.
             Iterator<SwapKey> iter = unprocessedKeys.iterator();
 
             while (iter.hasNext()) {
@@ -967,7 +967,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
             unprocessedKeys,
             new IgniteBiInClosure<SwapKey, byte[]>() {
                 @Override public void apply(SwapKey swapKey, byte[] rmv) {
-                    if (qryMgr == null && cctx.config().isStatisticsEnabled())
+                    if (!qryMgr.enabled() && cctx.config().isStatisticsEnabled())
                         cctx.cache().metrics0().onSwapRead(rmv != null);
 
                     if (rmv != null) {
@@ -1124,7 +1124,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
      */
     public GridCacheSwapEntry readOffheapBeforeRemove(KeyCacheObject key, byte[] keyBytes, int part)
         throws IgniteCheckedException {
-        assert cctx.queries() != null;
+        assert cctx.queries().enabled();
 
         byte[] entryBytes = offheap.get(spaceName, part, key, keyBytes);
 
@@ -1155,7 +1155,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
      */
     private boolean readSwapBeforeRemove(@Nullable KeyCacheObject key, SwapKey swapKey, ClassLoader ldr)
         throws IgniteCheckedException {
-        assert cctx.queries() != null;
+        assert cctx.queries().enabled();
 
         byte[] entryBytes = swapMgr.read(spaceName, swapKey, ldr);
 
@@ -1196,7 +1196,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
         if (offheapEnabled) {
             byte[] keyBytes = key.valueBytes(cctx.cacheObjectContext());
 
-            if ((qryMgr == null || readOffheapBeforeRemove(key, keyBytes, part) != null) &&
+            if ((!qryMgr.enabled() || readOffheapBeforeRemove(key, keyBytes, part) != null) &&
                 offheap.removex(spaceName, part, key, keyBytes)) {
                 if (cctx.config().isStatisticsEnabled())
                     cctx.cache().metrics0().onOffHeapRemove();
@@ -1212,7 +1212,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
 
             ClassLoader ldr = cctx.deploy().globalLoader();
 
-            if (qryMgr != null && !readSwapBeforeRemove(key, swapKey, ldr))
+            if (qryMgr.enabled() && !readSwapBeforeRemove(key, swapKey, ldr))
                 return; // Not found.
 
             swapMgr.remove(spaceName,
@@ -1279,7 +1279,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
 
         GridCacheQueryManager qryMgr = cctx.queries();
 
-        if (qryMgr != null)
+        if (qryMgr.enabled())
             qryMgr.onSwap(key);
     }
 
@@ -1308,7 +1308,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
                     cctx.events().addEvent(swapEntry.partition(), swapEntry.key(), cctx.nodeId(),
                         (IgniteUuid)null, null, EVT_CACHE_OBJECT_TO_OFFHEAP, null, false, null, true, null, null, null);
 
-                if (qryMgr != null)
+                if (qryMgr.enabled())
                     qryMgr.onSwap(swapEntry.key());
             }
         }
@@ -1330,7 +1330,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
                     cctx.events().addEvent(batchSwapEntry.partition(), batchSwapEntry.key(), cctx.nodeId(),
                         (IgniteUuid)null, null, EVT_CACHE_OBJECT_SWAPPED, null, false, null, true, null, null, null);
 
-                    if (qryMgr != null)
+                    if (qryMgr.enabled())
                         qryMgr.onSwap(batchSwapEntry.key());
                 }
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/621eb0f7/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index b5d9a7d..ab2a6e8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -1569,8 +1569,6 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
 
                     GridCacheEntryEx entry = internalCache.entryEx(e.getKey(), topVer);
 
-                    entry.unswap(false);
-
                     if (plc != null) {
                         ttl = CU.toTtl(plc.getExpiryForCreation());