You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/02/12 08:05:09 UTC
[5/5] ignite git commit: ignite-2586 If swap is not enabled need pass
value evicted from offheap to query manager
ignite-2586 If swap is not enabled need pass value evicted from offheap to query manager
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b0945577
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b0945577
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b0945577
Branch: refs/heads/ignite-2586
Commit: b094557765cb284f089647c680d77878215da939
Parents: 84de111
Author: sboikov <sb...@gridgain.com>
Authored: Fri Feb 12 10:04:25 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Feb 12 10:04:25 2016 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheEntryEx.java | 7 +++-
.../processors/cache/GridCacheMapEntry.java | 12 ++++--
.../processors/cache/GridCacheProcessor.java | 4 +-
.../processors/cache/GridCacheSwapManager.java | 43 +++++++++++++-------
.../processors/cache/GridCacheTestEntryEx.java | 2 +-
.../IgniteCacheQuerySelfTestSuite.java | 3 ++
6 files changed, 48 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/b0945577/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
index 953c301..9bee307 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
@@ -920,7 +920,10 @@ public interface GridCacheEntryEx {
public void updateTtl(@Nullable GridCacheVersion ver, long ttl);
/**
- * Tries to do offheap -> swap eviction.
+ * Called when entry should be evicted from offheap.
+ * <p>
+ * If swap is enabled tries to do offheap -> swap eviction, otherwise evicted value should
+ * be passed to query manager.
*
* @param entry Serialized swap entry.
* @param evictVer Version when entry was selected for eviction.
@@ -929,7 +932,7 @@ public interface GridCacheEntryEx {
* @throws GridCacheEntryRemovedException If entry was removed.
* @return {@code True} if entry was obsoleted and written to swap.
*/
- public boolean offheapSwapEvict(byte[] entry, GridCacheVersion evictVer, GridCacheVersion obsoleteVer)
+ public boolean onOffheapEvict(byte[] entry, GridCacheVersion evictVer, GridCacheVersion obsoleteVer)
throws IgniteCheckedException, GridCacheEntryRemovedException;
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/b0945577/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 1c8fa53..c1eeb5e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -427,9 +427,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
}
/** {@inheritDoc} */
- @Override public boolean offheapSwapEvict(byte[] entry, GridCacheVersion evictVer, GridCacheVersion obsoleteVer)
+ @Override public boolean onOffheapEvict(byte[] entry, GridCacheVersion evictVer, GridCacheVersion obsoleteVer)
throws IgniteCheckedException, GridCacheEntryRemovedException {
- assert cctx.swap().swapEnabled() && cctx.swap().offHeapEnabled() : this;
+ assert cctx.swap().offHeapEnabled() && (cctx.swap().swapEnabled() || cctx.queries().enabled()) : this;
boolean obsolete;
@@ -444,12 +444,18 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
if (mvcc != null && !mvcc.isEmpty(obsoleteVer))
return false;
- if (cctx.swap().offheapSwapEvict(key, entry, partition(), evictVer)) {
+ if (cctx.swap().onOffheapEvict(key, entry, partition(), evictVer)) {
assert !hasValueUnlocked() : this;
obsolete = markObsolete0(obsoleteVer, false, null);
assert obsolete : this;
+
+ if (!cctx.swap().swapEnabled()) {
+ CacheObject val = cctx.swap().unmarshalSwapEntryValue(entry);
+
+ clearIndex(val);
+ }
}
else
obsolete = false;
http://git-wip-us.apache.org/repos/asf/ignite/blob/b0945577/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 5acad6c..a45c2e0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -2868,9 +2868,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
try {
KeyCacheObject key = cctx.toCacheKeyObject(keyBytes);
- GridCacheSwapEntry swapEntry = GridCacheSwapEntryImpl.unmarshal(valBytes, true);
-
- CacheObject val = swapEntry.value();
+ CacheObject val = cctx.swap().unmarshalSwapEntryValue(valBytes);
assert val != null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/b0945577/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
index 37b5e15..08fe5a0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
@@ -39,6 +39,7 @@ import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionAware;
import org.apache.ignite.internal.processors.offheap.GridOffHeapProcessor;
+import org.apache.ignite.internal.processors.query.GridQueryProcessor;
import org.apache.ignite.internal.util.GridCloseableIteratorAdapter;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.GridEmptyCloseableIterator;
@@ -105,8 +106,8 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
/** Soft iterator set. */
private final Collection<GridWeakIterator<Map.Entry>> itSet = new GridConcurrentHashSet<>();
- /** {@code True} if offheap to swap eviction is possible. */
- private boolean offheapToSwapEvicts;
+ /** {@code True} if need process evictions from offheap. */
+ private boolean unwindOffheapEvicts;
/** Values to be evicted from offheap to swap. */
private ThreadLocal<Collection<IgniteBiTuple<byte[], byte[]>>> offheapEvicts = new ThreadLocal<>();
@@ -140,7 +141,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
*
*/
public void unwindOffheapEvicts() {
- if (!offheapToSwapEvicts)
+ if (!unwindOffheapEvicts)
return;
Collection<IgniteBiTuple<byte[], byte[]>> evicts = offheapEvicts.get();
@@ -161,7 +162,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
GridCacheEntryEx entry = cctx.cache().entryEx(key);
try {
- if (entry.offheapSwapEvict(vb, evictVer, obsoleteVer))
+ if (entry.onOffheapEvict(vb, evictVer, obsoleteVer))
cctx.cache().removeEntry(entry);
break;
@@ -198,12 +199,12 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
GridOffHeapEvictListener lsnr;
- if (swapEnabled) {
- offheapToSwapEvicts = true;
+ if (swapEnabled || GridQueryProcessor.isEnabled(cctx.config())) {
+ unwindOffheapEvicts = true;
lsnr = new GridOffHeapEvictListener() {
@Override public void onEvict(int part, int hash, byte[] kb, byte[] vb) {
- assert offheapToSwapEvicts;
+ assert unwindOffheapEvicts;
onOffheapEvict();
@@ -1075,7 +1076,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
* @return {@code True} if removed.
* @throws IgniteCheckedException If failed.
*/
- boolean offheapSwapEvict(final KeyCacheObject key, byte[] entry, int part, final GridCacheVersion ver)
+ boolean onOffheapEvict(final KeyCacheObject key, byte[] entry, int part, final GridCacheVersion ver)
throws IgniteCheckedException {
assert offheapEnabled;
@@ -1095,13 +1096,14 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
Collection<GridCacheSwapListener> lsnrs = offheapLsnrs.get(part);
if (lsnrs != null) {
- GridCacheSwapEntry e = swapEntry(GridCacheSwapEntryImpl.unmarshal(entry, false));
+ GridCacheSwapEntry e = swapEntry(unmarshalSwapEntry(entry, false));
for (GridCacheSwapListener lsnr : lsnrs)
lsnr.onEntryUnswapped(part, key, e);
}
- cctx.swap().writeToSwap(part, key, entry);
+ if (swapEnabled)
+ cctx.swap().writeToSwap(part, key, entry);
}
return rmv;
@@ -2141,6 +2143,19 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
/**
* @param bytes Bytes to unmarshal.
+ * @return Unmarshalled values.
+ * @throws IgniteCheckedException If failed.
+ */
+ public CacheObject unmarshalSwapEntryValue(byte[] bytes) throws IgniteCheckedException {
+ GridCacheSwapEntry swapEntry = swapEntry(GridCacheSwapEntryImpl.unmarshal(bytes, true));
+
+ assert swapEntry != null && swapEntry.value() != null : swapEntry;
+
+ return swapEntry.value();
+ }
+
+ /**
+ * @param bytes Bytes to unmarshal.
* @param valOnly If {@code true} unmarshalls only value.
* @return Unmarshalled entry.
*/
@@ -2191,9 +2206,9 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
@Override protected Map.Entry<byte[], GridCacheSwapEntry> onNext() throws IgniteCheckedException {
Map.Entry<byte[], byte[]> e = iter.nextX();
- GridCacheSwapEntry unmarshalled = unmarshalSwapEntry(e.getValue(), false);
+ GridCacheSwapEntry unmarshalled = swapEntry(unmarshalSwapEntry(e.getValue(), false));
- return F.t(e.getKey(), swapEntry(unmarshalled));
+ return F.t(e.getKey(), unmarshalled);
}
/** {@inheritDoc} */
@@ -2499,9 +2514,9 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
/** {@inheritDoc} */
@Override public V getValue() {
try {
- GridCacheSwapEntry e = unmarshalSwapEntry(entry.getValue(), false);
+ GridCacheSwapEntry e = swapEntry(unmarshalSwapEntry(entry.getValue(), false));
- swapEntry(e);
+ assert e != null;
return e.value().value(cctx.cacheObjectContext(), false);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b0945577/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
index e983c7e..e627083 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
@@ -849,7 +849,7 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr
}
/** {@inheritDoc} */
- @Override public boolean offheapSwapEvict(byte[] vb, GridCacheVersion evictVer, GridCacheVersion obsoleteVer)
+ @Override public boolean onOffheapEvict(byte[] vb, GridCacheVersion evictVer, GridCacheVersion obsoleteVer)
throws IgniteCheckedException, GridCacheEntryRemovedException {
return false;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b0945577/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
index 3cd4579..ceed4aa 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
@@ -21,6 +21,7 @@ import junit.framework.TestSuite;
import org.apache.ignite.internal.processors.cache.CacheLocalQueryMetricsSelfTest;
import org.apache.ignite.internal.processors.cache.CachePartitionedQueryMetricsDistributedSelfTest;
import org.apache.ignite.internal.processors.cache.CachePartitionedQueryMetricsLocalSelfTest;
+import org.apache.ignite.internal.processors.cache.CacheQueryOffheapEvictDataLostTest;
import org.apache.ignite.internal.processors.cache.CacheReplicatedQueryMetricsDistributedSelfTest;
import org.apache.ignite.internal.processors.cache.CacheReplicatedQueryMetricsLocalSelfTest;
import org.apache.ignite.internal.processors.cache.CacheScanPartitionQueryFallbackSelfTest;
@@ -222,6 +223,8 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite {
suite.addTestSuite(GridOrderedMessageCancelSelfTest.class);
+ suite.addTestSuite(CacheQueryOffheapEvictDataLostTest.class);
+
// Ignite cache and H2 comparison.
suite.addTestSuite(BaseH2CompareQueryTest.class);
suite.addTestSuite(H2CompareBigQueryTest.class);