You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/07/03 17:08:29 UTC

[11/40] incubator-ignite git commit: ignite-973-2 - read offheap value before remove

ignite-973-2 - read offheap value before remove


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/260dc2dd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/260dc2dd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/260dc2dd

Branch: refs/heads/ignite-gg-7813
Commit: 260dc2dd4978d0a57732b7edd0aa0b043d4eff4c
Parents: 285d790
Author: S.Vladykin <sv...@gridgain.com>
Authored: Tue Jun 23 15:28:24 2015 +0300
Committer: S.Vladykin <sv...@gridgain.com>
Committed: Tue Jun 23 15:28:24 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheSwapManager.java  | 192 +++++++++++--------
 .../query/h2/opt/GridH2KeyValueRowOffheap.java  |   8 +-
 2 files changed, 118 insertions(+), 82 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/260dc2dd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
index f709e03..e45ec2d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
@@ -535,21 +535,9 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
 
         // First try removing from offheap.
         if (offheapEnabled) {
-            byte[] entryBytes = offheap.remove(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext()));
-
-            if (cctx.config().isStatisticsEnabled()) {
-                if (entryBytes != null)
-                    cctx.cache().metrics0().onOffHeapRemove();
-
-                cctx.cache().metrics0().onOffHeapRead(entryBytes != null);
-            }
-
-            if (entryBytes != null) {
-                GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(entryBytes));
-
-                if (entry == null)
-                    return null;
+            GridCacheSwapEntry entry = removeFromOffheap(key, key.valueBytes(cctx.cacheObjectContext()), part);
 
+            if (entry != null) {
                 // Always fire this event, since preloading depends on it.
                 onOffHeaped(part, key, entry);
 
@@ -569,11 +557,6 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
                         null,
                         null);
 
-                GridCacheQueryManager qryMgr = cctx.queries();
-
-                if (qryMgr != null)
-                    qryMgr.onUnswap(key, entry.value());
-
                 return entry;
             }
         }
@@ -737,6 +720,47 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
     }
 
     /**
+     * @param key Key.
+     * @param keyBytes Key bytes.
+     * @param part Partition.
+     * @return Swap entry.
+     * @throws IgniteCheckedException If failed.
+     */
+    @Nullable private GridCacheSwapEntry removeFromOffheap(KeyCacheObject key, byte[] keyBytes, int part)
+        throws IgniteCheckedException {
+        final GridCacheQueryManager qryMgr = cctx.queries();
+
+        GridCacheSwapEntry entry;
+
+        if (qryMgr != null) {
+            entry = readOffheapBeforeRemove(key, keyBytes, part);
+
+            if (entry != null) {
+                if (offheap.removex(spaceName, part, key, keyBytes)) {
+                    if (cctx.config().isStatisticsEnabled())
+                        cctx.cache().metrics0().onOffHeapRemove();
+                }
+                else
+                    entry = null; // Failed to remove -> reset to null.
+            }
+        }
+        else {
+            byte[] entryBytes = offheap.remove(spaceName, part, key, keyBytes);
+
+            if (entryBytes != null) {
+                if (cctx.config().isStatisticsEnabled())
+                    cctx.cache().metrics0().onOffHeapRemove();
+
+                entry = swapEntry(unmarshalSwapEntry(entryBytes));
+            }
+            else
+                entry = null;
+        }
+
+        return entry;
+    }
+
+    /**
      * @param keys Collection of keys to remove from swap.
      * @return Collection of swap entries.
      * @throws IgniteCheckedException If failed,
@@ -759,40 +783,30 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
             for (KeyCacheObject key : keys) {
                 int part = cctx.affinity().partition(key);
 
-                byte[] entryBytes = offheap.remove(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext()));
+                GridCacheSwapEntry entry = removeFromOffheap(key, key.valueBytes(cctx.cacheObjectContext()), part);
 
-                if(entryBytes != null && cctx.config().isStatisticsEnabled())
-                    cctx.cache().metrics0().onOffHeapRemove();
+                if (entry != null) {
+                    // Always fire this event, since preloading depends on it.
+                    onOffHeaped(part, key, entry);
 
-                if (entryBytes != null) {
-                    GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(entryBytes));
+                    if (cctx.events().isRecordable(EVT_CACHE_OBJECT_FROM_OFFHEAP))
+                        cctx.events().addEvent(part, key, cctx.nodeId(), (IgniteUuid)null, null,
+                            EVT_CACHE_OBJECT_FROM_OFFHEAP, null, false, null, true, null, null, null);
 
-                    if (entry != null) {
-                        // Always fire this event, since preloading depends on it.
-                        onOffHeaped(part, key, entry);
-
-                        if (cctx.events().isRecordable(EVT_CACHE_OBJECT_FROM_OFFHEAP))
-                            cctx.events().addEvent(part, key, cctx.nodeId(), (IgniteUuid)null, null,
-                                EVT_CACHE_OBJECT_FROM_OFFHEAP, null, false, null, true, null, null, null);
-
-                        if (qryMgr != null)
-                            qryMgr.onUnswap(key, entry.value());
-
-                        GridCacheBatchSwapEntry unswapped = new GridCacheBatchSwapEntry(key,
-                            part,
-                            ByteBuffer.wrap(entry.valueBytes()),
-                            entry.type(),
-                            entry.version(), entry.ttl(),
-                            entry.expireTime(),
-                            entry.keyClassLoaderId(),
-                            entry.valueClassLoaderId());
+                    GridCacheBatchSwapEntry unswapped = new GridCacheBatchSwapEntry(key,
+                        part,
+                        ByteBuffer.wrap(entry.valueBytes()),
+                        entry.type(),
+                        entry.version(), entry.ttl(),
+                        entry.expireTime(),
+                        entry.keyClassLoaderId(),
+                        entry.valueClassLoaderId());
 
-                        unswapped.value(entry.value());
+                    unswapped.value(entry.value());
 
-                        res.add(unswapped);
+                    res.add(unswapped);
 
-                        continue;
-                    }
+                    continue;
                 }
 
                 if (swapEnabled) {
@@ -940,6 +954,34 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
     }
 
     /**
+     * Reads value from offheap and unswaps it for indexing.
+     *
+     * @param key Key.
+     * @param keyBytes Key bytes.
+     * @param part Partition.
+     * @return Swap entry.
+     * @throws IgniteCheckedException If failed.
+     */
+    public GridCacheSwapEntry readOffheapBeforeRemove(KeyCacheObject key, byte[] keyBytes, int part)
+        throws IgniteCheckedException {
+        assert cctx.queries() != null;
+
+        byte[] val = offheap.get(spaceName, part, key, keyBytes);
+
+        if (val != null) {
+            GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(val));
+
+            if (entry != null) {
+                cctx.queries().onUnswap(key, entry.value());
+
+                return entry;
+            }
+        }
+
+        return null;
+    }
+
+    /**
      * @param key Key to remove.
      * @throws IgniteCheckedException If failed.
      */
@@ -951,42 +993,17 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
 
         final GridCacheQueryManager qryMgr = cctx.queries();
 
-        CI1<byte[]> c = qryMgr == null ? null : new CI1<byte[]>() {
-            @Override public void apply(byte[] rmv) {
-                if (rmv == null)
-                    return;
-
-                try {
-                    if (cctx.config().isStatisticsEnabled())
-                        cctx.cache().metrics0().onSwapRemove();
-
-                    GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(rmv));
-
-                    if (entry == null)
-                        return;
-
-                    qryMgr.onUnswap(key, entry.value());
-                }
-                catch (IgniteCheckedException e) {
-                    throw new IgniteException(e);
-                }
-            }
-        };
-
         int part = cctx.affinity().partition(key);
 
         // First try offheap.
         if (offheapEnabled) {
-            // TODO Pass closure c to offheap.remove and apply it before the actual remove.
-            byte[] val = offheap.remove(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext()));
+            byte[] keyBytes = key.valueBytes(cctx.cacheObjectContext());
 
-            if (val != null) {
+            if ((qryMgr == null || readOffheapBeforeRemove(key, keyBytes, part) != null) &&
+                offheap.removex(spaceName, part, key, keyBytes)) {
                 if (cctx.config().isStatisticsEnabled())
                     cctx.cache().metrics0().onOffHeapRemove();
 
-                if (c != null)
-                    c.apply(val);
-
                 return;
             }
         }
@@ -998,7 +1015,30 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
 
             swapMgr.remove(spaceName,
                 swapKey,
-                c,
+                new CI1<byte[]>() {
+                    @Override public void apply(byte[] rmv) {
+                        if (rmv == null)
+                            return;
+
+                        try {
+                            if (cctx.config().isStatisticsEnabled())
+                                cctx.cache().metrics0().onSwapRemove();
+
+                            if (qryMgr == null)
+                                return;
+
+                            GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(rmv));
+
+                            if (entry == null)
+                                return;
+
+                            qryMgr.onUnswap(key, entry.value());
+                        }
+                        catch (IgniteCheckedException e) {
+                            throw new IgniteException(e);
+                        }
+                    }
+                },
                 cctx.deploy().globalLoader());
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/260dc2dd/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOffheap.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOffheap.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOffheap.java
index f89591a..1f54713 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOffheap.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOffheap.java
@@ -236,12 +236,8 @@ public class GridH2KeyValueRowOffheap extends GridH2AbstractKeyValueRow {
         try {
             GridUnsafeMemory mem = desc.memory();
 
-            if (mem.readLongVolatile(p + OFFSET_VALUE_REF) != 0) {
-                if (beforeRmv)
-                    return; // The offheap value is in its place, nothing to do here.
-                else
-                    throw new IllegalStateException("Unswap without swap: " + p);
-            }
+            if (mem.readLongVolatile(p + OFFSET_VALUE_REF) != 0)
+                return; // The offheap value is in its place, nothing to do here.
 
             Value v = peekValue(VAL_COL);