You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by av...@apache.org on 2015/07/07 12:53:53 UTC

[02/50] [abbrv] incubator-ignite git commit: ignite-973-2 - swap read before remove

ignite-973-2 - swap read before remove


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/e1243b40
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/e1243b40
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/e1243b40

Branch: refs/heads/ignite-1067
Commit: e1243b40537ac883271eb7cab492e8ef86f7b330
Parents: e0b573b
Author: S.Vladykin <sv...@gridgain.com>
Authored: Tue Jun 23 23:13:03 2015 +0300
Committer: S.Vladykin <sv...@gridgain.com>
Committed: Tue Jun 23 23:13:03 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheSwapManager.java  | 98 +++++++++++++-------
 .../inmemory/GridTestSwapSpaceSpi.java          |  3 +-
 .../processors/query/h2/opt/GridH2Table.java    |  2 +-
 .../cache/IgniteCacheOffheapEvictQueryTest.java |  2 +-
 4 files changed, 66 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1243b40/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
index e45ec2d..7595a1d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
@@ -582,6 +582,13 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
             part,
             key.valueBytes(cctx.cacheObjectContext()));
 
+        ClassLoader ldr = cctx.deploy().globalLoader();
+
+        GridCacheQueryManager qryMgr = cctx.queries();
+
+        if (qryMgr != null && !readSwapBeforeRemove(key, swapKey, ldr))
+            return null; // Not found.
+
         swapMgr.remove(spaceName, swapKey, new CI1<byte[]>() {
             @Override public void apply(byte[] rmv) {
                 if (cctx.config().isStatisticsEnabled())
@@ -597,7 +604,6 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
                         t.set(entry);
 
                         CacheObject v = entry.value();
-                        byte[] valBytes = entry.valueBytes();
 
                         // Event notification.
                         if (cctx.events().isRecordable(EVT_CACHE_OBJECT_UNSWAPPED)) {
@@ -621,18 +627,13 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
 
                         // Always fire this event, since preloading depends on it.
                         onUnswapped(part, key, entry);
-
-                        GridCacheQueryManager qryMgr = cctx.queries();
-
-                        if (qryMgr != null)
-                            qryMgr.onUnswap(key, v);
                     }
                     catch (IgniteCheckedException e) {
                         err.set(e);
                     }
                 }
             }
-        }, cctx.deploy().globalLoader());
+        }, ldr);
 
         if (err.get() != null)
             throw err.get();
@@ -839,7 +840,17 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
         assert swapEnabled;
         assert unprocessedKeys != null;
 
-        // Swap is enabled.
+        ClassLoader ldr = cctx.deploy().globalLoader();
+
+        if (qryMgr != null) { // Unswap for indexing.
+            Iterator<SwapKey> iter = unprocessedKeys.iterator();
+
+            while (iter.hasNext()) {
+                if (!readSwapBeforeRemove(null, iter.next(), ldr))
+                    iter.remove(); // We will not do unswapping further -> need to skip the key.
+            }
+        }
+
         final GridTuple<IgniteCheckedException> err = F.t1();
 
         swapMgr.removeAll(spaceName,
@@ -891,9 +902,6 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
 
                             // Always fire this event, since preloading depends on it.
                             onUnswapped(swapKey.partition(), key, entry);
-
-                            if (qryMgr != null)
-                                qryMgr.onUnswap(key, entry.value());
                         }
                         catch (IgniteCheckedException e) {
                             err.set(e);
@@ -901,7 +909,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
                     }
                 }
             },
-            cctx.deploy().globalLoader());
+            ldr);
 
         if (err.get() != null)
             throw err.get();
@@ -923,7 +931,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
 
         boolean rmv = offheap.removex(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext()));
 
-        if(rmv && cctx.config().isStatisticsEnabled())
+        if (rmv && cctx.config().isStatisticsEnabled())
             cctx.cache().metrics0().onOffHeapRemove();
 
         return rmv;
@@ -982,6 +990,37 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
     }
 
     /**
+     * Reads value from swap and unswaps it to indexing.
+     *
+     * @param key Key.
+     * @param swapKey Swap key.
+     * @param ldr Class loader.
+     * @return {@code true} If read and unswapped successfully.
+     * @throws IgniteCheckedException If failed.
+     */
+    private boolean readSwapBeforeRemove(@Nullable KeyCacheObject key, SwapKey swapKey, ClassLoader ldr)
+        throws IgniteCheckedException {
+        assert cctx.queries() != null;
+
+        byte[] entryBytes = swapMgr.read(spaceName, swapKey, ldr);
+
+        if (entryBytes == null)
+            return false;
+
+        GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(entryBytes));
+
+        if (entry == null)
+            return false;
+
+        if (key == null)
+            key = cctx.toCacheKeyObject(swapKey.keyBytes());
+
+        cctx.queries().onUnswap(key, entry.value());
+
+        return true;
+    }
+
+    /**
      * @param key Key to remove.
      * @throws IgniteCheckedException If failed.
      */
@@ -1013,33 +1052,20 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
                 part,
                 key.valueBytes(cctx.cacheObjectContext()));
 
+            ClassLoader ldr = cctx.deploy().globalLoader();
+
+            if (qryMgr != null && !readSwapBeforeRemove(key, swapKey, ldr))
+                return; // Not found.
+
             swapMgr.remove(spaceName,
                 swapKey,
-                new CI1<byte[]>() {
+                cctx.config().isStatisticsEnabled() ? new CI1<byte[]>() {
                     @Override public void apply(byte[] rmv) {
-                        if (rmv == null)
-                            return;
-
-                        try {
-                            if (cctx.config().isStatisticsEnabled())
-                                cctx.cache().metrics0().onSwapRemove();
-
-                            if (qryMgr == null)
-                                return;
-
-                            GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(rmv));
-
-                            if (entry == null)
-                                return;
-
-                            qryMgr.onUnswap(key, entry.value());
-                        }
-                        catch (IgniteCheckedException e) {
-                            throw new IgniteException(e);
-                        }
+                        if (rmv != null)
+                            cctx.cache().metrics0().onSwapRemove();
                     }
-                },
-                cctx.deploy().globalLoader());
+                } : null,
+                ldr);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1243b40/modules/core/src/test/java/org/apache/ignite/spi/swapspace/inmemory/GridTestSwapSpaceSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/swapspace/inmemory/GridTestSwapSpaceSpi.java b/modules/core/src/test/java/org/apache/ignite/spi/swapspace/inmemory/GridTestSwapSpaceSpi.java
index d8303a4..2a3c940 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/swapspace/inmemory/GridTestSwapSpaceSpi.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/swapspace/inmemory/GridTestSwapSpaceSpi.java
@@ -285,7 +285,8 @@ public class GridTestSwapSpaceSpi extends IgniteSpiAdapter implements SwapSpaceS
             byte[] val = data.remove(key);
 
             if (val != null) {
-                c.apply(val);
+                if (c != null)
+                    c.apply(val);
 
                 fireEvent(EVT_SWAP_SPACE_DATA_REMOVED, name, key.keyBytes());
             }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1243b40/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
index 92991af..86dbf06 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
@@ -393,7 +393,7 @@ public class GridH2Table extends TableBase {
                     for (int i = 2, len = idxs.size(); i < len; i++) {
                         Row res = index(i).remove(old);
 
-                        assert eq(pk, res, old): "\n" + old + "\n" + res;
+                        assert eq(pk, res, old): "\n" + old + "\n" + res + "\n" + i + " -> " + index(i).getName();
                     }
                 }
                 else

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1243b40/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapEvictQueryTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapEvictQueryTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapEvictQueryTest.java
index 45d744e..f9ff69e 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapEvictQueryTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapEvictQueryTest.java
@@ -86,7 +86,7 @@ public class IgniteCacheOffheapEvictQueryTest extends GridCommonAbstractTest {
      */
     public void testEvictAndRemove() throws Exception {
         final int KEYS_CNT = 3000;
-        final int THREADS_CNT = 50;
+        final int THREADS_CNT = 250;
 
         final IgniteCache<Integer,Integer> c = startGrid().cache(null);