You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/05/14 15:58:56 UTC
[1/6] incubator-ignite git commit: #ignite-614: Add delay in
FileSwapSpaceSpi.remove only for testing.
Repository: incubator-ignite
Updated Branches:
refs/heads/ignite-614 [created] 37e60c1a0
#ignite-614: Add delay in FileSwapSpaceSpi.remove only for testing.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/ee180bc6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/ee180bc6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/ee180bc6
Branch: refs/heads/ignite-614
Commit: ee180bc6e77d5c74f9e27d4889ccf2e19684a924
Parents: 7e599b6
Author: ivasilinets <iv...@gridgain.com>
Authored: Thu May 14 13:17:41 2015 +0300
Committer: ivasilinets <iv...@gridgain.com>
Committed: Thu May 14 13:17:41 2015 +0300
----------------------------------------------------------------------
.../spi/swapspace/file/FileSwapSpaceSpi.java | 7 ++
.../cache/GridCacheSwapPreloadSelfTest.java | 102 ++++++++++---------
2 files changed, 59 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ee180bc6/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java
index e7db285..e8e7191 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java
@@ -392,6 +392,13 @@ public class FileSwapSpaceSpi extends IgniteSpiAdapter implements SwapSpaceSpi,
byte[] val = space.remove(key, c != null);
+ try {
+ U.sleep(1000);
+ }
+ catch (IgniteInterruptedCheckedException e) {
+ e.printStackTrace();
+ }
+
if (c != null)
c.apply(val);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ee180bc6/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSwapPreloadSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSwapPreloadSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSwapPreloadSelfTest.java
index 6176da4..57c3b9e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSwapPreloadSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSwapPreloadSelfTest.java
@@ -156,83 +156,85 @@ public class GridCacheSwapPreloadSelfTest extends GridCommonAbstractTest {
final AtomicBoolean done = new AtomicBoolean();
IgniteInternalFuture<?> fut = null;
- try {
- startGrid(0);
+ for (int attempt = 0; attempt < 10; attempt++) {
+ try {
+ startGrid(0);
- final IgniteCache<Integer, Integer> cache = grid(0).cache(null);
+ final IgniteCache<Integer, Integer> cache = grid(0).cache(null);
- assertNotNull(cache);
+ assertNotNull(cache);
- // Populate.
- for (int i = 0; i < ENTRY_CNT; i++)
- cache.put(i, i);
+ // Populate.
+ for (int i = 0; i < ENTRY_CNT; i++)
+ cache.put(i, i);
- Set<Integer> keys = new HashSet<>();
+ Set<Integer> keys = new HashSet<>();
- for (Cache.Entry<Integer, Integer> entry : cache.localEntries())
- keys.add(entry.getKey());
+ for (Cache.Entry<Integer, Integer> entry : cache.localEntries())
+ keys.add(entry.getKey());
- cache.localEvict(keys);
+ cache.localEvict(keys);
+
+ fut = multithreadedAsync(new Callable<Object>() {
+ @Nullable @Override public Object call() throws Exception {
+ Random rnd = new Random();
- fut = multithreadedAsync(new Callable<Object>() {
- @Nullable @Override public Object call() throws Exception {
- Random rnd = new Random();
+ while (!done.get()) {
+ int key = rnd.nextInt(ENTRY_CNT);
- while (!done.get()) {
- int key = rnd.nextInt(ENTRY_CNT);
+ Integer i = cache.get(key);
- Integer i = cache.get(key);
+ assertNotNull(i);
+ assertEquals(Integer.valueOf(key), i);
- assertNotNull(i);
- assertEquals(Integer.valueOf(key), i);
+ //cache.localEvict(Collections.singleton(rnd.nextInt(ENTRY_CNT)));
+ }
- cache.localEvict(Collections.singleton(rnd.nextInt(ENTRY_CNT)));
+ return null;
}
+ }, 10);
- return null;
- }
- }, 10);
+ startGrid(1);
- startGrid(1);
+ done.set(true);
- done.set(true);
+ fut.get();
- fut.get();
+ fut = null;
- fut = null;
+ int size = grid(1).cache(null).localSize(CachePeekMode.PRIMARY, CachePeekMode.BACKUP,
+ CachePeekMode.NEAR, CachePeekMode.ONHEAP);
- int size = grid(1).cache(null).localSize(CachePeekMode.PRIMARY, CachePeekMode.BACKUP,
- CachePeekMode.NEAR, CachePeekMode.ONHEAP);
+ info("New node cache size: " + size);
- info("New node cache size: " + size);
+ if (size != ENTRY_CNT) {
+ Set<Integer> keySet = new TreeSet<>();
- if (size != ENTRY_CNT) {
- Set<Integer> keySet = new TreeSet<>();
+ int next = 0;
- int next = 0;
+ for (IgniteCache.Entry<Integer, Integer> e : grid(1).<Integer, Integer>cache(null).localEntries())
+ keySet.add(e.getKey());
- for (IgniteCache.Entry<Integer, Integer> e : grid(1).<Integer, Integer>cache(null).localEntries())
- keySet.add(e.getKey());
+ for (Integer i : keySet) {
+ while (next < i)
+ info("Missing key: " + next++);
- for (Integer i : keySet) {
- while (next < i)
- info("Missing key: " + next++);
-
- next++;
+ next++;
+ }
}
- }
- assertEquals(ENTRY_CNT, size);
- }
- finally {
- done.set(true);
-
- try {
- if (fut != null)
- fut.get();
+ assertEquals(ENTRY_CNT, size);
}
finally {
- stopAllGrids();
+ done.set(true);
+
+ try {
+ if (fut != null)
+ fut.get();
+ }
+ finally {
+ stopAllGrids();
+ }
}
}
}
[6/6] incubator-ignite git commit: #ignite-614: Remove code for test.
Posted by sb...@apache.org.
#ignite-614: Remove code for test.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/37e60c1a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/37e60c1a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/37e60c1a
Branch: refs/heads/ignite-614
Commit: 37e60c1a0e2e37b4f2d9b472339afb4982eacd53
Parents: 57d7fbd
Author: ivasilinets <iv...@gridgain.com>
Authored: Thu May 14 16:54:30 2015 +0300
Committer: ivasilinets <iv...@gridgain.com>
Committed: Thu May 14 16:54:30 2015 +0300
----------------------------------------------------------------------
.../apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java | 7 -------
1 file changed, 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/37e60c1a/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java
index e8e7191..e7db285 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java
@@ -392,13 +392,6 @@ public class FileSwapSpaceSpi extends IgniteSpiAdapter implements SwapSpaceSpi,
byte[] val = space.remove(key, c != null);
- try {
- U.sleep(1000);
- }
- catch (IgniteInterruptedCheckedException e) {
- e.printStackTrace();
- }
-
if (c != null)
c.apply(val);
[4/6] incubator-ignite git commit: #ignite-614: wip.
Posted by sb...@apache.org.
#ignite-614: wip.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/dcac35ce
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/dcac35ce
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/dcac35ce
Branch: refs/heads/ignite-614
Commit: dcac35ce2662ef82eb62265f2b5ed7dd19d9ca46
Parents: 87c1275
Author: ivasilinets <iv...@gridgain.com>
Authored: Thu May 14 16:08:12 2015 +0300
Committer: ivasilinets <iv...@gridgain.com>
Committed: Thu May 14 16:08:12 2015 +0300
----------------------------------------------------------------------
.../GridCacheEntryInfoCollectSwapListener.java | 68 +++++++++++---------
.../processors/cache/GridCacheSwapListener.java | 4 +-
.../processors/cache/GridCacheSwapManager.java | 11 ++--
.../preloader/GridDhtPartitionSupplyPool.java | 1 +
4 files changed, 46 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dcac35ce/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfoCollectSwapListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfoCollectSwapListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfoCollectSwapListener.java
index bd1746d..9a7511c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfoCollectSwapListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfoCollectSwapListener.java
@@ -18,6 +18,8 @@
package org.apache.ignite.internal.processors.cache;
import org.apache.ignite.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.client.util.*;
import org.jsr166.*;
import java.util.*;
@@ -30,11 +32,14 @@ public class GridCacheEntryInfoCollectSwapListener implements GridCacheSwapListe
/** */
private final Map<KeyCacheObject, GridCacheEntryInfo> swappedEntries = new ConcurrentHashMap8<>();
- private final ConcurrentHashMap8<KeyCacheObject, GridCacheEntryInfo> notFinishedSwappedEntries = new ConcurrentHashMap8<>();
+ /** Entries in swapping. */
+ private final GridConcurrentHashSet<KeyCacheObject> swappingKeys = new GridConcurrentHashSet();
+ /** Lock for empty condition. */
+ final Lock emptyLock = new ReentrantLock();
- final Lock lock = new ReentrantLock();
- final Condition emptyCond = lock.newCondition();
+ /** Condition for empty swapping entries. */
+ final Condition emptyCond = emptyLock.newCondition();
/** */
private final IgniteLogger log;
@@ -49,27 +54,37 @@ public class GridCacheEntryInfoCollectSwapListener implements GridCacheSwapListe
/**
* Wait until all entries finish unswapping.
*/
- public void waitUnswapFinished() {
- lock.lock();
- try{
- if (notFinishedSwappedEntries.size() != 0)
- try {
- emptyCond.await();
- }
- catch (InterruptedException e) {
- // No-op.
- }
- } finally {
- lock.unlock();
+ public void waitUnswapFinished() throws IgniteCheckedException {
+ emptyLock.lock();
+
+ try {
+ if (swappingKeys.size() != 0)
+ emptyCond.await();
+ }
+ catch (InterruptedException e) {
+ throw new IgniteInterruptedCheckedException(e);
+ }
+ finally {
+ emptyLock.unlock();
}
}
/** {@inheritDoc} */
- @Override public void onEntryUnswapping(int part, KeyCacheObject key, GridCacheSwapEntry swapEntry) throws IgniteCheckedException {
+ @Override public void onEntryUnswapping(KeyCacheObject key) throws IgniteCheckedException {
if (log.isDebugEnabled())
- log.debug("Received unswapped event for key: " + key);
+ log.debug("Received unswapping event for key: " + key);
assert key != null;
+
+ swappingKeys.add(key);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onEntryUnswapped(int part,
+ KeyCacheObject key,
+ GridCacheSwapEntry swapEntry)
+ {
+ assert key != null;
assert swapEntry != null;
GridCacheEntryInfo info = new GridCacheEntryInfo();
@@ -80,26 +95,17 @@ public class GridCacheEntryInfoCollectSwapListener implements GridCacheSwapListe
info.version(swapEntry.version());
info.value(swapEntry.value());
- notFinishedSwappedEntries.put(key, info);
- }
-
- /** {@inheritDoc} */
- @Override public void onEntryUnswapped(int part,
- KeyCacheObject key,
- GridCacheSwapEntry swapEntry)
- {
- GridCacheEntryInfo info = notFinishedSwappedEntries.remove(key);
+ swappedEntries.put(key, info);
- assert info != null;
+ swappingKeys.remove(key);
- swappedEntries.put(key, info);
+ emptyLock.lock();
- lock.lock();
try{
- if (notFinishedSwappedEntries.size() == 0)
+ if (swappingKeys.size() == 0)
emptyCond.signalAll();
} finally {
- lock.unlock();
+ emptyLock.unlock();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dcac35ce/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapListener.java
index d8d0ddc..d6d13ff 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapListener.java
@@ -24,12 +24,10 @@ import org.apache.ignite.*;
*/
public interface GridCacheSwapListener {
/**
- * @param part Partition.
* @param key Cache key.
- * @param e Entry.
* @throws IgniteCheckedException If failed.
*/
- public void onEntryUnswapping(int part, KeyCacheObject key, GridCacheSwapEntry e) throws IgniteCheckedException;
+ public void onEntryUnswapping(KeyCacheObject key) throws IgniteCheckedException;
/**
* @param part Partition.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dcac35ce/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
index fed83de..c179b31 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
@@ -580,8 +580,12 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
part,
key.valueBytes(cctx.cacheObjectContext()));
- for (GridCacheSwapListener lsnr : swapLsnrs.get(part))
- lsnr.onEntryUnswapping(part, key, entry);
+ Collection<GridCacheSwapListener> lsnrs = swapLsnrs.get(part);
+
+ if (lsnrs != null) {
+ for (GridCacheSwapListener lsnr : lsnrs)
+ lsnr.onEntryUnswapping(key);
+ }
swapMgr.remove(spaceName, swapKey, new CI1<byte[]>() {
@Override public void apply(byte[] rmv) {
@@ -595,7 +599,6 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
t.set(entry);
CacheObject v = entry.value();
- byte[] valBytes = entry.valueBytes();
// Event notification.
if (cctx.events().isRecordable(EVT_CACHE_OBJECT_UNSWAPPED)) {
@@ -1928,7 +1931,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
}
/** {@inheritDoc} */
- @Override public void onEntryUnswapping(int part, KeyCacheObject key, GridCacheSwapEntry e)
+ @Override public void onEntryUnswapping(KeyCacheObject key)
throws IgniteCheckedException {
// No-op.
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dcac35ce/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
index 39fd9ac..3b93c09 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
@@ -411,6 +411,7 @@ class GridDhtPartitionSupplyPool<K, V> {
// Stop receiving promote notifications.
if (swapLsnr != null) {
swapLsnr.waitUnswapFinished();
+
cctx.swap().removeOffHeapListener(part, swapLsnr);
cctx.swap().removeSwapListener(part, swapLsnr);
}
[3/6] incubator-ignite git commit: Merge remote-tracking branch
'remotes/origin/ignite-sprint-5' into ignite-614
Posted by sb...@apache.org.
Merge remote-tracking branch 'remotes/origin/ignite-sprint-5' into ignite-614
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/87c12750
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/87c12750
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/87c12750
Branch: refs/heads/ignite-614
Commit: 87c12750beb4b35b7fd7d82214593fd819e57f4f
Parents: 574d426 593e3ee
Author: ivasilinets <iv...@gridgain.com>
Authored: Thu May 14 15:43:40 2015 +0300
Committer: ivasilinets <iv...@gridgain.com>
Committed: Thu May 14 15:43:40 2015 +0300
----------------------------------------------------------------------
bin/ignite-schema-import.bat | 2 +-
bin/ignite-schema-import.sh | 2 +-
bin/ignite.bat | 2 +-
bin/ignitevisorcmd.bat | 2 +-
bin/ignitevisorcmd.sh | 2 +-
.../processors/cache/GridCacheAdapter.java | 119 +++++------
.../GridDistributedCacheAdapter.java | 210 ++++++++++++-------
.../GridDhtPartitionsExchangeFuture.java | 2 +-
.../cache/CacheRemoveAllSelfTest.java | 81 +++++++
.../near/NoneRebalanceModeSelfTest.java | 67 ++++++
.../testsuites/IgniteCacheTestSuite2.java | 1 +
.../testsuites/IgniteCacheTestSuite4.java | 2 +
12 files changed, 344 insertions(+), 148 deletions(-)
----------------------------------------------------------------------
[5/6] incubator-ignite git commit: #ignite-614: wip.
Posted by sb...@apache.org.
#ignite-614: wip.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/57d7fbd8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/57d7fbd8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/57d7fbd8
Branch: refs/heads/ignite-614
Commit: 57d7fbd8f084a43614c1100f2b5578a599d9b008
Parents: dcac35c
Author: ivasilinets <iv...@gridgain.com>
Authored: Thu May 14 16:16:46 2015 +0300
Committer: ivasilinets <iv...@gridgain.com>
Committed: Thu May 14 16:16:46 2015 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheEntryInfoCollectSwapListener.java | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57d7fbd8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfoCollectSwapListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfoCollectSwapListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfoCollectSwapListener.java
index 9a7511c..9cf40cb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfoCollectSwapListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfoCollectSwapListener.java
@@ -104,7 +104,8 @@ public class GridCacheEntryInfoCollectSwapListener implements GridCacheSwapListe
try{
if (swappingKeys.size() == 0)
emptyCond.signalAll();
- } finally {
+ }
+ finally {
emptyLock.unlock();
}
}
[2/6] incubator-ignite git commit: #ignite-614: wip.
Posted by sb...@apache.org.
#ignite-614: wip.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/574d4269
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/574d4269
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/574d4269
Branch: refs/heads/ignite-614
Commit: 574d4269bf321d252a6221984833567fc44fe5b3
Parents: ee180bc
Author: ivasilinets <iv...@gridgain.com>
Authored: Thu May 14 15:25:16 2015 +0300
Committer: ivasilinets <iv...@gridgain.com>
Committed: Thu May 14 15:25:16 2015 +0300
----------------------------------------------------------------------
.../GridCacheEntryInfoCollectSwapListener.java | 50 ++++++++++++++++++--
.../processors/cache/GridCacheSwapListener.java | 8 ++++
.../processors/cache/GridCacheSwapManager.java | 11 ++++-
.../preloader/GridDhtPartitionSupplyPool.java | 1 +
.../cache/GridCacheSwapPreloadSelfTest.java | 2 +-
5 files changed, 66 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/574d4269/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfoCollectSwapListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfoCollectSwapListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfoCollectSwapListener.java
index e227b43..bd1746d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfoCollectSwapListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfoCollectSwapListener.java
@@ -21,6 +21,7 @@ import org.apache.ignite.*;
import org.jsr166.*;
import java.util.*;
+import java.util.concurrent.locks.*;
/**
*
@@ -29,6 +30,12 @@ public class GridCacheEntryInfoCollectSwapListener implements GridCacheSwapListe
/** */
private final Map<KeyCacheObject, GridCacheEntryInfo> swappedEntries = new ConcurrentHashMap8<>();
+ private final ConcurrentHashMap8<KeyCacheObject, GridCacheEntryInfo> notFinishedSwappedEntries = new ConcurrentHashMap8<>();
+
+
+ final Lock lock = new ReentrantLock();
+ final Condition emptyCond = lock.newCondition();
+
/** */
private final IgniteLogger log;
@@ -39,11 +46,26 @@ public class GridCacheEntryInfoCollectSwapListener implements GridCacheSwapListe
this.log = log;
}
+ /**
+ * Wait until all entries finish unswapping.
+ */
+ public void waitUnswapFinished() {
+ lock.lock();
+ try{
+ if (notFinishedSwappedEntries.size() != 0)
+ try {
+ emptyCond.await();
+ }
+ catch (InterruptedException e) {
+ // No-op.
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
/** {@inheritDoc} */
- @Override public void onEntryUnswapped(int part,
- KeyCacheObject key,
- GridCacheSwapEntry swapEntry)
- {
+ @Override public void onEntryUnswapping(int part, KeyCacheObject key, GridCacheSwapEntry swapEntry) throws IgniteCheckedException {
if (log.isDebugEnabled())
log.debug("Received unswapped event for key: " + key);
@@ -58,7 +80,27 @@ public class GridCacheEntryInfoCollectSwapListener implements GridCacheSwapListe
info.version(swapEntry.version());
info.value(swapEntry.value());
+ notFinishedSwappedEntries.put(key, info);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onEntryUnswapped(int part,
+ KeyCacheObject key,
+ GridCacheSwapEntry swapEntry)
+ {
+ GridCacheEntryInfo info = notFinishedSwappedEntries.remove(key);
+
+ assert info != null;
+
swappedEntries.put(key, info);
+
+ lock.lock();
+ try{
+ if (notFinishedSwappedEntries.size() == 0)
+ emptyCond.signalAll();
+ } finally {
+ lock.unlock();
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/574d4269/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapListener.java
index 811b951f..d8d0ddc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapListener.java
@@ -29,5 +29,13 @@ public interface GridCacheSwapListener {
* @param e Entry.
* @throws IgniteCheckedException If failed.
*/
+ public void onEntryUnswapping(int part, KeyCacheObject key, GridCacheSwapEntry e) throws IgniteCheckedException;
+
+ /**
+ * @param part Partition.
+ * @param key Cache key.
+ * @param e Entry.
+ * @throws IgniteCheckedException If failed.
+ */
public void onEntryUnswapped(int part, KeyCacheObject key, GridCacheSwapEntry e) throws IgniteCheckedException;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/574d4269/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
index eb82218..fed83de 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
@@ -580,7 +580,10 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
part,
key.valueBytes(cctx.cacheObjectContext()));
- swapMgr.remove(spaceName, swapKey, new CI1<byte[]>() {
+ for (GridCacheSwapListener lsnr : swapLsnrs.get(part))
+ lsnr.onEntryUnswapping(part, key, entry);
+
+ swapMgr.remove(spaceName, swapKey, new CI1<byte[]>() {
@Override public void apply(byte[] rmv) {
if (rmv != null) {
try {
@@ -1925,6 +1928,12 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
}
/** {@inheritDoc} */
+ @Override public void onEntryUnswapping(int part, KeyCacheObject key, GridCacheSwapEntry e)
+ throws IgniteCheckedException {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
@Override public void onEntryUnswapped(int part,
KeyCacheObject key,
GridCacheSwapEntry e) throws IgniteCheckedException {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/574d4269/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
index 5d9677d..39fd9ac 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
@@ -410,6 +410,7 @@ class GridDhtPartitionSupplyPool<K, V> {
// Stop receiving promote notifications.
if (swapLsnr != null) {
+ swapLsnr.waitUnswapFinished();
cctx.swap().removeOffHeapListener(part, swapLsnr);
cctx.swap().removeSwapListener(part, swapLsnr);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/574d4269/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSwapPreloadSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSwapPreloadSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSwapPreloadSelfTest.java
index 57c3b9e..18245db 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSwapPreloadSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSwapPreloadSelfTest.java
@@ -42,7 +42,7 @@ import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
*/
public class GridCacheSwapPreloadSelfTest extends GridCommonAbstractTest {
/** Entry count. */
- private static final int ENTRY_CNT = 15_000;
+ private static final int ENTRY_CNT = 150_000;
/** */
private final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);