You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/05/14 15:58:57 UTC
[2/6] incubator-ignite git commit: #ignite-614: wip.
#ignite-614: wip.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/574d4269
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/574d4269
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/574d4269
Branch: refs/heads/ignite-614
Commit: 574d4269bf321d252a6221984833567fc44fe5b3
Parents: ee180bc
Author: ivasilinets <iv...@gridgain.com>
Authored: Thu May 14 15:25:16 2015 +0300
Committer: ivasilinets <iv...@gridgain.com>
Committed: Thu May 14 15:25:16 2015 +0300
----------------------------------------------------------------------
.../GridCacheEntryInfoCollectSwapListener.java | 50 ++++++++++++++++++--
.../processors/cache/GridCacheSwapListener.java | 8 ++++
.../processors/cache/GridCacheSwapManager.java | 11 ++++-
.../preloader/GridDhtPartitionSupplyPool.java | 1 +
.../cache/GridCacheSwapPreloadSelfTest.java | 2 +-
5 files changed, 66 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/574d4269/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfoCollectSwapListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfoCollectSwapListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfoCollectSwapListener.java
index e227b43..bd1746d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfoCollectSwapListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfoCollectSwapListener.java
@@ -21,6 +21,7 @@ import org.apache.ignite.*;
import org.jsr166.*;
import java.util.*;
+import java.util.concurrent.locks.*;
/**
*
@@ -29,6 +30,12 @@ public class GridCacheEntryInfoCollectSwapListener implements GridCacheSwapListe
/** */
private final Map<KeyCacheObject, GridCacheEntryInfo> swappedEntries = new ConcurrentHashMap8<>();
+ private final ConcurrentHashMap8<KeyCacheObject, GridCacheEntryInfo> notFinishedSwappedEntries = new ConcurrentHashMap8<>();
+
+
+ final Lock lock = new ReentrantLock();
+ final Condition emptyCond = lock.newCondition();
+
/** */
private final IgniteLogger log;
@@ -39,11 +46,26 @@ public class GridCacheEntryInfoCollectSwapListener implements GridCacheSwapListe
this.log = log;
}
+ /**
+ * Wait until all entries finish unswapping.
+ */
+ public void waitUnswapFinished() {
+ lock.lock();
+ try{
+ if (notFinishedSwappedEntries.size() != 0)
+ try {
+ emptyCond.await();
+ }
+ catch (InterruptedException e) {
+ // No-op.
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
/** {@inheritDoc} */
- @Override public void onEntryUnswapped(int part,
- KeyCacheObject key,
- GridCacheSwapEntry swapEntry)
- {
+ @Override public void onEntryUnswapping(int part, KeyCacheObject key, GridCacheSwapEntry swapEntry) throws IgniteCheckedException {
if (log.isDebugEnabled())
log.debug("Received unswapped event for key: " + key);
@@ -58,7 +80,27 @@ public class GridCacheEntryInfoCollectSwapListener implements GridCacheSwapListe
info.version(swapEntry.version());
info.value(swapEntry.value());
+ notFinishedSwappedEntries.put(key, info);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onEntryUnswapped(int part,
+ KeyCacheObject key,
+ GridCacheSwapEntry swapEntry)
+ {
+ GridCacheEntryInfo info = notFinishedSwappedEntries.remove(key);
+
+ assert info != null;
+
swappedEntries.put(key, info);
+
+ lock.lock();
+ try{
+ if (notFinishedSwappedEntries.size() == 0)
+ emptyCond.signalAll();
+ } finally {
+ lock.unlock();
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/574d4269/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapListener.java
index 811b951f..d8d0ddc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapListener.java
@@ -29,5 +29,13 @@ public interface GridCacheSwapListener {
* @param e Entry.
* @throws IgniteCheckedException If failed.
*/
+ public void onEntryUnswapping(int part, KeyCacheObject key, GridCacheSwapEntry e) throws IgniteCheckedException;
+
+ /**
+ * @param part Partition.
+ * @param key Cache key.
+ * @param e Entry.
+ * @throws IgniteCheckedException If failed.
+ */
public void onEntryUnswapped(int part, KeyCacheObject key, GridCacheSwapEntry e) throws IgniteCheckedException;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/574d4269/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
index eb82218..fed83de 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
@@ -580,7 +580,10 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
part,
key.valueBytes(cctx.cacheObjectContext()));
- swapMgr.remove(spaceName, swapKey, new CI1<byte[]>() {
+ for (GridCacheSwapListener lsnr : swapLsnrs.get(part))
+ lsnr.onEntryUnswapping(part, key, entry);
+
+ swapMgr.remove(spaceName, swapKey, new CI1<byte[]>() {
@Override public void apply(byte[] rmv) {
if (rmv != null) {
try {
@@ -1925,6 +1928,12 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
}
/** {@inheritDoc} */
+ @Override public void onEntryUnswapping(int part, KeyCacheObject key, GridCacheSwapEntry e)
+ throws IgniteCheckedException {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
@Override public void onEntryUnswapped(int part,
KeyCacheObject key,
GridCacheSwapEntry e) throws IgniteCheckedException {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/574d4269/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
index 5d9677d..39fd9ac 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
@@ -410,6 +410,7 @@ class GridDhtPartitionSupplyPool<K, V> {
// Stop receiving promote notifications.
if (swapLsnr != null) {
+ swapLsnr.waitUnswapFinished();
cctx.swap().removeOffHeapListener(part, swapLsnr);
cctx.swap().removeSwapListener(part, swapLsnr);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/574d4269/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSwapPreloadSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSwapPreloadSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSwapPreloadSelfTest.java
index 57c3b9e..18245db 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSwapPreloadSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSwapPreloadSelfTest.java
@@ -42,7 +42,7 @@ import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
*/
public class GridCacheSwapPreloadSelfTest extends GridCommonAbstractTest {
/** Entry count. */
- private static final int ENTRY_CNT = 15_000;
+ private static final int ENTRY_CNT = 150_000;
/** */
private final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);