You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/07/03 11:19:49 UTC
[05/26] incubator-ignite git commit: ignite-973-2 - read offheap
value before remove
ignite-973-2 - read offheap value before remove
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/260dc2dd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/260dc2dd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/260dc2dd
Branch: refs/heads/ignite-gg-10460
Commit: 260dc2dd4978d0a57732b7edd0aa0b043d4eff4c
Parents: 285d790
Author: S.Vladykin <sv...@gridgain.com>
Authored: Tue Jun 23 15:28:24 2015 +0300
Committer: S.Vladykin <sv...@gridgain.com>
Committed: Tue Jun 23 15:28:24 2015 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheSwapManager.java | 192 +++++++++++--------
.../query/h2/opt/GridH2KeyValueRowOffheap.java | 8 +-
2 files changed, 118 insertions(+), 82 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/260dc2dd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
index f709e03..e45ec2d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
@@ -535,21 +535,9 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
// First try removing from offheap.
if (offheapEnabled) {
- byte[] entryBytes = offheap.remove(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext()));
-
- if (cctx.config().isStatisticsEnabled()) {
- if (entryBytes != null)
- cctx.cache().metrics0().onOffHeapRemove();
-
- cctx.cache().metrics0().onOffHeapRead(entryBytes != null);
- }
-
- if (entryBytes != null) {
- GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(entryBytes));
-
- if (entry == null)
- return null;
+ GridCacheSwapEntry entry = removeFromOffheap(key, key.valueBytes(cctx.cacheObjectContext()), part);
+ if (entry != null) {
// Always fire this event, since preloading depends on it.
onOffHeaped(part, key, entry);
@@ -569,11 +557,6 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
null,
null);
- GridCacheQueryManager qryMgr = cctx.queries();
-
- if (qryMgr != null)
- qryMgr.onUnswap(key, entry.value());
-
return entry;
}
}
@@ -737,6 +720,47 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
}
/**
+ * @param key Key.
+ * @param keyBytes Key bytes.
+ * @param part Partition.
+ * @return Swap entry.
+ * @throws IgniteCheckedException If failed.
+ */
+ @Nullable private GridCacheSwapEntry removeFromOffheap(KeyCacheObject key, byte[] keyBytes, int part)
+ throws IgniteCheckedException {
+ final GridCacheQueryManager qryMgr = cctx.queries();
+
+ GridCacheSwapEntry entry;
+
+ if (qryMgr != null) {
+ entry = readOffheapBeforeRemove(key, keyBytes, part);
+
+ if (entry != null) {
+ if (offheap.removex(spaceName, part, key, keyBytes)) {
+ if (cctx.config().isStatisticsEnabled())
+ cctx.cache().metrics0().onOffHeapRemove();
+ }
+ else
+ entry = null; // Failed to remove -> reset to null.
+ }
+ }
+ else {
+ byte[] entryBytes = offheap.remove(spaceName, part, key, keyBytes);
+
+ if (entryBytes != null) {
+ if (cctx.config().isStatisticsEnabled())
+ cctx.cache().metrics0().onOffHeapRemove();
+
+ entry = swapEntry(unmarshalSwapEntry(entryBytes));
+ }
+ else
+ entry = null;
+ }
+
+ return entry;
+ }
+
+ /**
* @param keys Collection of keys to remove from swap.
* @return Collection of swap entries.
* @throws IgniteCheckedException If failed,
@@ -759,40 +783,30 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
for (KeyCacheObject key : keys) {
int part = cctx.affinity().partition(key);
- byte[] entryBytes = offheap.remove(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext()));
+ GridCacheSwapEntry entry = removeFromOffheap(key, key.valueBytes(cctx.cacheObjectContext()), part);
- if(entryBytes != null && cctx.config().isStatisticsEnabled())
- cctx.cache().metrics0().onOffHeapRemove();
+ if (entry != null) {
+ // Always fire this event, since preloading depends on it.
+ onOffHeaped(part, key, entry);
- if (entryBytes != null) {
- GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(entryBytes));
+ if (cctx.events().isRecordable(EVT_CACHE_OBJECT_FROM_OFFHEAP))
+ cctx.events().addEvent(part, key, cctx.nodeId(), (IgniteUuid)null, null,
+ EVT_CACHE_OBJECT_FROM_OFFHEAP, null, false, null, true, null, null, null);
- if (entry != null) {
- // Always fire this event, since preloading depends on it.
- onOffHeaped(part, key, entry);
-
- if (cctx.events().isRecordable(EVT_CACHE_OBJECT_FROM_OFFHEAP))
- cctx.events().addEvent(part, key, cctx.nodeId(), (IgniteUuid)null, null,
- EVT_CACHE_OBJECT_FROM_OFFHEAP, null, false, null, true, null, null, null);
-
- if (qryMgr != null)
- qryMgr.onUnswap(key, entry.value());
-
- GridCacheBatchSwapEntry unswapped = new GridCacheBatchSwapEntry(key,
- part,
- ByteBuffer.wrap(entry.valueBytes()),
- entry.type(),
- entry.version(), entry.ttl(),
- entry.expireTime(),
- entry.keyClassLoaderId(),
- entry.valueClassLoaderId());
+ GridCacheBatchSwapEntry unswapped = new GridCacheBatchSwapEntry(key,
+ part,
+ ByteBuffer.wrap(entry.valueBytes()),
+ entry.type(),
+ entry.version(), entry.ttl(),
+ entry.expireTime(),
+ entry.keyClassLoaderId(),
+ entry.valueClassLoaderId());
- unswapped.value(entry.value());
+ unswapped.value(entry.value());
- res.add(unswapped);
+ res.add(unswapped);
- continue;
- }
+ continue;
}
if (swapEnabled) {
@@ -940,6 +954,34 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
}
/**
+ * Reads value from offheap and unswaps it for indexing.
+ *
+ * @param key Key.
+ * @param keyBytes Key bytes.
+ * @param part Partition.
+ * @return Swap entry.
+ * @throws IgniteCheckedException If failed.
+ */
+ public GridCacheSwapEntry readOffheapBeforeRemove(KeyCacheObject key, byte[] keyBytes, int part)
+ throws IgniteCheckedException {
+ assert cctx.queries() != null;
+
+ byte[] val = offheap.get(spaceName, part, key, keyBytes);
+
+ if (val != null) {
+ GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(val));
+
+ if (entry != null) {
+ cctx.queries().onUnswap(key, entry.value());
+
+ return entry;
+ }
+ }
+
+ return null;
+ }
+
+ /**
* @param key Key to remove.
* @throws IgniteCheckedException If failed.
*/
@@ -951,42 +993,17 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
final GridCacheQueryManager qryMgr = cctx.queries();
- CI1<byte[]> c = qryMgr == null ? null : new CI1<byte[]>() {
- @Override public void apply(byte[] rmv) {
- if (rmv == null)
- return;
-
- try {
- if (cctx.config().isStatisticsEnabled())
- cctx.cache().metrics0().onSwapRemove();
-
- GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(rmv));
-
- if (entry == null)
- return;
-
- qryMgr.onUnswap(key, entry.value());
- }
- catch (IgniteCheckedException e) {
- throw new IgniteException(e);
- }
- }
- };
-
int part = cctx.affinity().partition(key);
// First try offheap.
if (offheapEnabled) {
- // TODO Pass closure c to offheap.remove and apply it before the actual remove.
- byte[] val = offheap.remove(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext()));
+ byte[] keyBytes = key.valueBytes(cctx.cacheObjectContext());
- if (val != null) {
+ if ((qryMgr == null || readOffheapBeforeRemove(key, keyBytes, part) != null) &&
+ offheap.removex(spaceName, part, key, keyBytes)) {
if (cctx.config().isStatisticsEnabled())
cctx.cache().metrics0().onOffHeapRemove();
- if (c != null)
- c.apply(val);
-
return;
}
}
@@ -998,7 +1015,30 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
swapMgr.remove(spaceName,
swapKey,
- c,
+ new CI1<byte[]>() {
+ @Override public void apply(byte[] rmv) {
+ if (rmv == null)
+ return;
+
+ try {
+ if (cctx.config().isStatisticsEnabled())
+ cctx.cache().metrics0().onSwapRemove();
+
+ if (qryMgr == null)
+ return;
+
+ GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(rmv));
+
+ if (entry == null)
+ return;
+
+ qryMgr.onUnswap(key, entry.value());
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
+ },
cctx.deploy().globalLoader());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/260dc2dd/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOffheap.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOffheap.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOffheap.java
index f89591a..1f54713 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOffheap.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOffheap.java
@@ -236,12 +236,8 @@ public class GridH2KeyValueRowOffheap extends GridH2AbstractKeyValueRow {
try {
GridUnsafeMemory mem = desc.memory();
- if (mem.readLongVolatile(p + OFFSET_VALUE_REF) != 0) {
- if (beforeRmv)
- return; // The offheap value is in its place, nothing to do here.
- else
- throw new IllegalStateException("Unswap without swap: " + p);
- }
+ if (mem.readLongVolatile(p + OFFSET_VALUE_REF) != 0)
+ return; // The offheap value is in its place, nothing to do here.
Value v = peekValue(VAL_COL);