You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/02/12 08:05:09 UTC

[5/5] ignite git commit: ignite-2586 If swap is not enabled need pass value evicted from offheap to query manager

ignite-2586 If swap is not enabled need pass value evicted from offheap to query manager


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b0945577
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b0945577
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b0945577

Branch: refs/heads/ignite-2586
Commit: b094557765cb284f089647c680d77878215da939
Parents: 84de111
Author: sboikov <sb...@gridgain.com>
Authored: Fri Feb 12 10:04:25 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Feb 12 10:04:25 2016 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheEntryEx.java      |  7 +++-
 .../processors/cache/GridCacheMapEntry.java     | 12 ++++--
 .../processors/cache/GridCacheProcessor.java    |  4 +-
 .../processors/cache/GridCacheSwapManager.java  | 43 +++++++++++++-------
 .../processors/cache/GridCacheTestEntryEx.java  |  2 +-
 .../IgniteCacheQuerySelfTestSuite.java          |  3 ++
 6 files changed, 48 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/b0945577/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
index 953c301..9bee307 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
@@ -920,7 +920,10 @@ public interface GridCacheEntryEx {
     public void updateTtl(@Nullable GridCacheVersion ver, long ttl);
 
     /**
-     * Tries to do offheap -> swap eviction.
+     * Called when entry should be evicted from offheap.
+     * <p>
+     * If swap is enabled tries to do offheap -> swap eviction, otherwise evicted value should
+     * be passed to query manager.
      *
      * @param entry Serialized swap entry.
      * @param evictVer Version when entry was selected for eviction.
@@ -929,7 +932,7 @@ public interface GridCacheEntryEx {
      * @throws GridCacheEntryRemovedException If entry was removed.
      * @return {@code True} if entry was obsoleted and written to swap.
      */
-    public boolean offheapSwapEvict(byte[] entry, GridCacheVersion evictVer, GridCacheVersion obsoleteVer)
+    public boolean onOffheapEvict(byte[] entry, GridCacheVersion evictVer, GridCacheVersion obsoleteVer)
         throws IgniteCheckedException, GridCacheEntryRemovedException;
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/b0945577/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 1c8fa53..c1eeb5e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -427,9 +427,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
     }
 
     /** {@inheritDoc} */
-    @Override public boolean offheapSwapEvict(byte[] entry, GridCacheVersion evictVer, GridCacheVersion obsoleteVer)
+    @Override public boolean onOffheapEvict(byte[] entry, GridCacheVersion evictVer, GridCacheVersion obsoleteVer)
         throws IgniteCheckedException, GridCacheEntryRemovedException {
-        assert cctx.swap().swapEnabled() && cctx.swap().offHeapEnabled() : this;
+        assert cctx.swap().offHeapEnabled() && (cctx.swap().swapEnabled() || cctx.queries().enabled()) : this;
 
         boolean obsolete;
 
@@ -444,12 +444,18 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             if (mvcc != null && !mvcc.isEmpty(obsoleteVer))
                 return false;
 
-            if (cctx.swap().offheapSwapEvict(key, entry, partition(), evictVer)) {
+            if (cctx.swap().onOffheapEvict(key, entry, partition(), evictVer)) {
                 assert !hasValueUnlocked() : this;
 
                 obsolete = markObsolete0(obsoleteVer, false, null);
 
                 assert obsolete : this;
+
+                if (!cctx.swap().swapEnabled()) {
+                    CacheObject val = cctx.swap().unmarshalSwapEntryValue(entry);
+
+                    clearIndex(val);
+                }
             }
             else
                 obsolete = false;

http://git-wip-us.apache.org/repos/asf/ignite/blob/b0945577/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 5acad6c..a45c2e0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -2868,9 +2868,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                 try {
                     KeyCacheObject key = cctx.toCacheKeyObject(keyBytes);
 
-                    GridCacheSwapEntry swapEntry = GridCacheSwapEntryImpl.unmarshal(valBytes, true);
-
-                    CacheObject val = swapEntry.value();
+                    CacheObject val = cctx.swap().unmarshalSwapEntryValue(valBytes);
 
                     assert val != null;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/b0945577/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
index 37b5e15..08fe5a0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
@@ -39,6 +39,7 @@ import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersionAware;
 import org.apache.ignite.internal.processors.offheap.GridOffHeapProcessor;
+import org.apache.ignite.internal.processors.query.GridQueryProcessor;
 import org.apache.ignite.internal.util.GridCloseableIteratorAdapter;
 import org.apache.ignite.internal.util.GridConcurrentHashSet;
 import org.apache.ignite.internal.util.GridEmptyCloseableIterator;
@@ -105,8 +106,8 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
     /** Soft iterator set. */
     private final Collection<GridWeakIterator<Map.Entry>> itSet = new GridConcurrentHashSet<>();
 
-    /** {@code True} if offheap to swap eviction is possible. */
-    private boolean offheapToSwapEvicts;
+    /** {@code True} if need process evictions from offheap. */
+    private boolean unwindOffheapEvicts;
 
     /** Values to be evicted from offheap to swap. */
     private ThreadLocal<Collection<IgniteBiTuple<byte[], byte[]>>> offheapEvicts = new ThreadLocal<>();
@@ -140,7 +141,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
      *
      */
     public void unwindOffheapEvicts() {
-        if (!offheapToSwapEvicts)
+        if (!unwindOffheapEvicts)
             return;
 
         Collection<IgniteBiTuple<byte[], byte[]>> evicts = offheapEvicts.get();
@@ -161,7 +162,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
                         GridCacheEntryEx entry = cctx.cache().entryEx(key);
 
                         try {
-                            if (entry.offheapSwapEvict(vb, evictVer, obsoleteVer))
+                            if (entry.onOffheapEvict(vb, evictVer, obsoleteVer))
                                 cctx.cache().removeEntry(entry);
 
                             break;
@@ -198,12 +199,12 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
 
         GridOffHeapEvictListener lsnr;
 
-        if (swapEnabled) {
-            offheapToSwapEvicts = true;
+        if (swapEnabled || GridQueryProcessor.isEnabled(cctx.config())) {
+            unwindOffheapEvicts = true;
 
             lsnr = new GridOffHeapEvictListener() {
                 @Override public void onEvict(int part, int hash, byte[] kb, byte[] vb) {
-                    assert offheapToSwapEvicts;
+                    assert unwindOffheapEvicts;
 
                     onOffheapEvict();
 
@@ -1075,7 +1076,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
      * @return {@code True} if removed.
      * @throws IgniteCheckedException If failed.
      */
-    boolean offheapSwapEvict(final KeyCacheObject key, byte[] entry, int part, final GridCacheVersion ver)
+    boolean onOffheapEvict(final KeyCacheObject key, byte[] entry, int part, final GridCacheVersion ver)
         throws IgniteCheckedException {
         assert offheapEnabled;
 
@@ -1095,13 +1096,14 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
             Collection<GridCacheSwapListener> lsnrs = offheapLsnrs.get(part);
 
             if (lsnrs != null) {
-                GridCacheSwapEntry e = swapEntry(GridCacheSwapEntryImpl.unmarshal(entry, false));
+                GridCacheSwapEntry e = swapEntry(unmarshalSwapEntry(entry, false));
 
                 for (GridCacheSwapListener lsnr : lsnrs)
                     lsnr.onEntryUnswapped(part, key, e);
             }
 
-            cctx.swap().writeToSwap(part, key, entry);
+            if (swapEnabled)
+                cctx.swap().writeToSwap(part, key, entry);
         }
 
         return rmv;
@@ -2141,6 +2143,19 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
 
     /**
      * @param bytes Bytes to unmarshal.
+     * @return Unmarshalled values.
+     * @throws IgniteCheckedException If failed.
+     */
+    public CacheObject unmarshalSwapEntryValue(byte[] bytes) throws IgniteCheckedException {
+        GridCacheSwapEntry swapEntry = swapEntry(GridCacheSwapEntryImpl.unmarshal(bytes, true));
+
+        assert swapEntry != null && swapEntry.value() != null : swapEntry;
+
+        return swapEntry.value();
+    }
+
+    /**
+     * @param bytes Bytes to unmarshal.
      * @param valOnly If {@code true} unmarshalls only value.
      * @return Unmarshalled entry.
      */
@@ -2191,9 +2206,9 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
         @Override protected Map.Entry<byte[], GridCacheSwapEntry> onNext() throws IgniteCheckedException {
             Map.Entry<byte[], byte[]> e = iter.nextX();
 
-            GridCacheSwapEntry unmarshalled = unmarshalSwapEntry(e.getValue(), false);
+            GridCacheSwapEntry unmarshalled = swapEntry(unmarshalSwapEntry(e.getValue(), false));
 
-            return F.t(e.getKey(), swapEntry(unmarshalled));
+            return F.t(e.getKey(), unmarshalled);
         }
 
         /** {@inheritDoc} */
@@ -2499,9 +2514,9 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
         /** {@inheritDoc} */
         @Override public V getValue() {
             try {
-                GridCacheSwapEntry e = unmarshalSwapEntry(entry.getValue(), false);
+                GridCacheSwapEntry e = swapEntry(unmarshalSwapEntry(entry.getValue(), false));
 
-                swapEntry(e);
+                assert e != null;
 
                 return e.value().value(cctx.cacheObjectContext(), false);
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/b0945577/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
index e983c7e..e627083 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
@@ -849,7 +849,7 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr
     }
 
     /** {@inheritDoc} */
-    @Override public boolean offheapSwapEvict(byte[] vb, GridCacheVersion evictVer, GridCacheVersion obsoleteVer)
+    @Override public boolean onOffheapEvict(byte[] vb, GridCacheVersion evictVer, GridCacheVersion obsoleteVer)
         throws IgniteCheckedException, GridCacheEntryRemovedException {
         return false;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/b0945577/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
index 3cd4579..ceed4aa 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
@@ -21,6 +21,7 @@ import junit.framework.TestSuite;
 import org.apache.ignite.internal.processors.cache.CacheLocalQueryMetricsSelfTest;
 import org.apache.ignite.internal.processors.cache.CachePartitionedQueryMetricsDistributedSelfTest;
 import org.apache.ignite.internal.processors.cache.CachePartitionedQueryMetricsLocalSelfTest;
+import org.apache.ignite.internal.processors.cache.CacheQueryOffheapEvictDataLostTest;
 import org.apache.ignite.internal.processors.cache.CacheReplicatedQueryMetricsDistributedSelfTest;
 import org.apache.ignite.internal.processors.cache.CacheReplicatedQueryMetricsLocalSelfTest;
 import org.apache.ignite.internal.processors.cache.CacheScanPartitionQueryFallbackSelfTest;
@@ -222,6 +223,8 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite {
 
         suite.addTestSuite(GridOrderedMessageCancelSelfTest.class);
 
+        suite.addTestSuite(CacheQueryOffheapEvictDataLostTest.class);
+
         // Ignite cache and H2 comparison.
         suite.addTestSuite(BaseH2CompareQueryTest.class);
         suite.addTestSuite(H2CompareBigQueryTest.class);