You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2015/09/23 03:36:34 UTC
[05/23] ignite git commit: ignite-973 Fixed atomic cache 'remove' to
always provide old value for indexing
ignite-973 Fixed atomic cache 'remove' to always provide old value for indexing
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/621eb0f7
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/621eb0f7
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/621eb0f7
Branch: refs/heads/ignite-1171
Commit: 621eb0f75bbe1a0a623229dded38a3549309eead
Parents: 8b94494
Author: sboikov <se...@inria.fr>
Authored: Mon Sep 21 21:37:52 2015 +0300
Committer: sboikov <se...@inria.fr>
Committed: Mon Sep 21 21:37:52 2015 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheMapEntry.java | 37 +++++++++++++-------
.../processors/cache/GridCacheProcessor.java | 2 +-
.../processors/cache/GridCacheSwapManager.java | 24 ++++++-------
.../datastreamer/DataStreamerImpl.java | 2 --
4 files changed, 37 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/621eb0f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index f2bb646..961c792 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -1588,6 +1588,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
boolean hasValPtr = hasOffHeapPointer();
+ if (old == null)
+ old = saveValueForIndexUnlocked();
+
// Update index inside synchronization since it can be updated
// in load methods without actually holding entry lock.
clearIndex(old);
@@ -2163,6 +2166,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
// Must persist inside synchronization in non-tx mode.
cctx.store().remove(null, keyValue(false));
+ if (oldVal == null)
+ oldVal = saveValueForIndexUnlocked();
+
// Update index inside synchronization since it can be updated
// in load methods without actually holding entry lock.
clearIndex(oldVal);
@@ -3342,7 +3348,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
try {
synchronized (this) {
- CacheObject expiredVal = saveValueForIndexUnlocked();
+ CacheObject expiredVal = saveOldValueUnlocked(false);
boolean hasOldBytes = hasOffHeapPointer();
@@ -3523,12 +3529,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
try {
GridCacheQueryManager qryMgr = cctx.queries();
- if (qryMgr != null && qryMgr.enabled()) {
- qryMgr.store(key,
- val,
- ver,
- expireTime);
- }
+ if (qryMgr.enabled())
+ qryMgr.store(key, val, ver, expireTime);
}
catch (IgniteCheckedException e) {
throw new GridCacheIndexUpdateException(e);
@@ -3547,8 +3549,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
try {
GridCacheQueryManager<?, ?> qryMgr = cctx.queries();
- if (qryMgr != null)
- qryMgr.remove(key(), prevVal == null ? null : prevVal);
+ if (qryMgr.enabled())
+ qryMgr.remove(key(), prevVal);
}
catch (IgniteCheckedException e) {
throw new GridCacheIndexUpdateException(e);
@@ -3562,10 +3564,19 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
* @return Previous value or {@code null}.
* @throws IgniteCheckedException If failed to retrieve previous value.
*/
- protected CacheObject saveValueForIndexUnlocked() throws IgniteCheckedException {
+ protected final CacheObject saveValueForIndexUnlocked() throws IgniteCheckedException {
+ return saveOldValueUnlocked(true);
+ }
+
+ /**
+ * @param qryOnly If {@code true} reads old value only if query indexing is enabled.
+ * @return Previous value or {@code null}.
+ * @throws IgniteCheckedException If failed to retrieve previous value.
+ */
+ private CacheObject saveOldValueUnlocked(boolean qryOnly) throws IgniteCheckedException {
assert Thread.holdsLock(this);
- if (cctx.queries() == null)
+ if (qryOnly && !cctx.queries().enabled())
return null;
CacheObject val = rawGetOrUnmarshalUnlocked(false);
@@ -3681,7 +3692,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
if (obsoleteVersionExtras() != null)
return true;
- CacheObject prev = saveValueForIndexUnlocked();
+ CacheObject prev = saveOldValueUnlocked(false);
if (!hasReaders() && markObsolete0(obsoleteVer, false)) {
if (swap) {
@@ -3791,7 +3802,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
GridCacheQueryManager qryMgr = cctx.queries();
- if (qryMgr != null)
+ if (qryMgr.enabled())
qryMgr.onUnswap(key, prevVal);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/621eb0f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index c92de7d..7c16136 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -2759,7 +2759,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
if (spaceName.equals(CU.swapSpaceName(cctx))) {
GridCacheQueryManager qryMgr = cctx.queries();
- if (qryMgr != null) {
+ if (qryMgr.enabled()) {
try {
KeyCacheObject key = cctx.toCacheKeyObject(keyBytes);
http://git-wip-us.apache.org/repos/asf/ignite/blob/621eb0f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
index 9b6381e..d9a8b5c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
@@ -696,12 +696,12 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
final GridCacheQueryManager qryMgr = cctx.queries();
- if (qryMgr != null && !readSwapBeforeRemove(key, swapKey, ldr))
+ if (qryMgr.enabled() && !readSwapBeforeRemove(key, swapKey, ldr))
return null; // Not found.
swapMgr.remove(spaceName, swapKey, new CI1<byte[]>() {
@Override public void apply(byte[] rmv) {
- if (qryMgr == null && cctx.config().isStatisticsEnabled())
+ if (!qryMgr.enabled() && cctx.config().isStatisticsEnabled())
cctx.cache().metrics0().onSwapRead(rmv != null);
if (rmv != null) {
@@ -843,7 +843,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
GridCacheSwapEntry entry;
- if (qryMgr != null) {
+ if (qryMgr.enabled()) {
entry = readOffheapBeforeRemove(key, keyBytes, part);
if (entry != null) {
@@ -952,7 +952,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
ClassLoader ldr = cctx.deploy().globalLoader();
- if (qryMgr != null) { // Unswap for indexing.
+ if (qryMgr.enabled()) { // Unswap for indexing.
Iterator<SwapKey> iter = unprocessedKeys.iterator();
while (iter.hasNext()) {
@@ -967,7 +967,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
unprocessedKeys,
new IgniteBiInClosure<SwapKey, byte[]>() {
@Override public void apply(SwapKey swapKey, byte[] rmv) {
- if (qryMgr == null && cctx.config().isStatisticsEnabled())
+ if (!qryMgr.enabled() && cctx.config().isStatisticsEnabled())
cctx.cache().metrics0().onSwapRead(rmv != null);
if (rmv != null) {
@@ -1124,7 +1124,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
*/
public GridCacheSwapEntry readOffheapBeforeRemove(KeyCacheObject key, byte[] keyBytes, int part)
throws IgniteCheckedException {
- assert cctx.queries() != null;
+ assert cctx.queries().enabled();
byte[] entryBytes = offheap.get(spaceName, part, key, keyBytes);
@@ -1155,7 +1155,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
*/
private boolean readSwapBeforeRemove(@Nullable KeyCacheObject key, SwapKey swapKey, ClassLoader ldr)
throws IgniteCheckedException {
- assert cctx.queries() != null;
+ assert cctx.queries().enabled();
byte[] entryBytes = swapMgr.read(spaceName, swapKey, ldr);
@@ -1196,7 +1196,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
if (offheapEnabled) {
byte[] keyBytes = key.valueBytes(cctx.cacheObjectContext());
- if ((qryMgr == null || readOffheapBeforeRemove(key, keyBytes, part) != null) &&
+ if ((!qryMgr.enabled() || readOffheapBeforeRemove(key, keyBytes, part) != null) &&
offheap.removex(spaceName, part, key, keyBytes)) {
if (cctx.config().isStatisticsEnabled())
cctx.cache().metrics0().onOffHeapRemove();
@@ -1212,7 +1212,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
ClassLoader ldr = cctx.deploy().globalLoader();
- if (qryMgr != null && !readSwapBeforeRemove(key, swapKey, ldr))
+ if (qryMgr.enabled() && !readSwapBeforeRemove(key, swapKey, ldr))
return; // Not found.
swapMgr.remove(spaceName,
@@ -1279,7 +1279,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
GridCacheQueryManager qryMgr = cctx.queries();
- if (qryMgr != null)
+ if (qryMgr.enabled())
qryMgr.onSwap(key);
}
@@ -1308,7 +1308,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
cctx.events().addEvent(swapEntry.partition(), swapEntry.key(), cctx.nodeId(),
(IgniteUuid)null, null, EVT_CACHE_OBJECT_TO_OFFHEAP, null, false, null, true, null, null, null);
- if (qryMgr != null)
+ if (qryMgr.enabled())
qryMgr.onSwap(swapEntry.key());
}
}
@@ -1330,7 +1330,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
cctx.events().addEvent(batchSwapEntry.partition(), batchSwapEntry.key(), cctx.nodeId(),
(IgniteUuid)null, null, EVT_CACHE_OBJECT_SWAPPED, null, false, null, true, null, null, null);
- if (qryMgr != null)
+ if (qryMgr.enabled())
qryMgr.onSwap(batchSwapEntry.key());
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/621eb0f7/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index b5d9a7d..ab2a6e8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -1569,8 +1569,6 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
GridCacheEntryEx entry = internalCache.entryEx(e.getKey(), topVer);
- entry.unswap(false);
-
if (plc != null) {
ttl = CU.toTtl(plc.getExpiryForCreation());