You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/05/14 15:58:57 UTC

[2/6] incubator-ignite git commit: #ignite-614: wip.

#ignite-614: wip.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/574d4269
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/574d4269
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/574d4269

Branch: refs/heads/ignite-614
Commit: 574d4269bf321d252a6221984833567fc44fe5b3
Parents: ee180bc
Author: ivasilinets <iv...@gridgain.com>
Authored: Thu May 14 15:25:16 2015 +0300
Committer: ivasilinets <iv...@gridgain.com>
Committed: Thu May 14 15:25:16 2015 +0300

----------------------------------------------------------------------
 .../GridCacheEntryInfoCollectSwapListener.java  | 50 ++++++++++++++++++--
 .../processors/cache/GridCacheSwapListener.java |  8 ++++
 .../processors/cache/GridCacheSwapManager.java  | 11 ++++-
 .../preloader/GridDhtPartitionSupplyPool.java   |  1 +
 .../cache/GridCacheSwapPreloadSelfTest.java     |  2 +-
 5 files changed, 66 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/574d4269/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfoCollectSwapListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfoCollectSwapListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfoCollectSwapListener.java
index e227b43..bd1746d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfoCollectSwapListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfoCollectSwapListener.java
@@ -21,6 +21,7 @@ import org.apache.ignite.*;
 import org.jsr166.*;
 
 import java.util.*;
+import java.util.concurrent.locks.*;
 
 /**
  *
@@ -29,6 +30,12 @@ public class GridCacheEntryInfoCollectSwapListener implements GridCacheSwapListe
     /** */
     private final Map<KeyCacheObject, GridCacheEntryInfo> swappedEntries = new ConcurrentHashMap8<>();
 
+    private final ConcurrentHashMap8<KeyCacheObject, GridCacheEntryInfo>  notFinishedSwappedEntries = new ConcurrentHashMap8<>();
+
+
+    final Lock lock = new ReentrantLock();
+    final Condition emptyCond  = lock.newCondition();
+
     /** */
     private final IgniteLogger log;
 
@@ -39,11 +46,26 @@ public class GridCacheEntryInfoCollectSwapListener implements GridCacheSwapListe
         this.log = log;
     }
 
+    /**
+     * Wait until all entries finish unswapping.
+     */
+    public void waitUnswapFinished() {
+        lock.lock();
+        try{
+            if (notFinishedSwappedEntries.size() != 0)
+                try {
+                    emptyCond.await();
+                }
+                catch (InterruptedException e) {
+                    // No-op.
+                }
+        } finally {
+            lock.unlock();
+        }
+    }
+
     /** {@inheritDoc} */
-    @Override public void onEntryUnswapped(int part,
-        KeyCacheObject key,
-        GridCacheSwapEntry swapEntry)
-    {
+    @Override public void onEntryUnswapping(int part, KeyCacheObject key, GridCacheSwapEntry swapEntry) throws IgniteCheckedException {
         if (log.isDebugEnabled())
             log.debug("Received unswapped event for key: " + key);
 
@@ -58,7 +80,27 @@ public class GridCacheEntryInfoCollectSwapListener implements GridCacheSwapListe
         info.version(swapEntry.version());
         info.value(swapEntry.value());
 
+        notFinishedSwappedEntries.put(key, info);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onEntryUnswapped(int part,
+        KeyCacheObject key,
+        GridCacheSwapEntry swapEntry)
+    {
+        GridCacheEntryInfo info = notFinishedSwappedEntries.remove(key);
+
+        assert info != null;
+
         swappedEntries.put(key, info);
+
+        lock.lock();
+        try{
+            if (notFinishedSwappedEntries.size() == 0)
+                emptyCond.signalAll();
+        } finally {
+            lock.unlock();
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/574d4269/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapListener.java
index 811b951f..d8d0ddc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapListener.java
@@ -29,5 +29,13 @@ public interface GridCacheSwapListener {
      * @param e Entry.
      * @throws IgniteCheckedException If failed.
      */
+    public void onEntryUnswapping(int part, KeyCacheObject key, GridCacheSwapEntry e) throws IgniteCheckedException;
+
+    /**
+     * @param part Partition.
+     * @param key Cache key.
+     * @param e Entry.
+     * @throws IgniteCheckedException If failed.
+     */
     public void onEntryUnswapped(int part, KeyCacheObject key, GridCacheSwapEntry e) throws IgniteCheckedException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/574d4269/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
index eb82218..fed83de 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
@@ -580,7 +580,10 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
             part,
             key.valueBytes(cctx.cacheObjectContext()));
 
-        swapMgr.remove(spaceName, swapKey, new CI1<byte[]>() {
+        for (GridCacheSwapListener lsnr : swapLsnrs.get(part))
+            lsnr.onEntryUnswapping(part, key, entry);
+
+        swapMgr.remove(spaceName, swapKey,  new CI1<byte[]>() {
             @Override public void apply(byte[] rmv) {
                 if (rmv != null) {
                     try {
@@ -1925,6 +1928,12 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
         }
 
         /** {@inheritDoc} */
+        @Override public void onEntryUnswapping(int part, KeyCacheObject key, GridCacheSwapEntry e)
+            throws IgniteCheckedException {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
         @Override public void onEntryUnswapped(int part,
             KeyCacheObject key,
             GridCacheSwapEntry e) throws IgniteCheckedException {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/574d4269/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
index 5d9677d..39fd9ac 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
@@ -410,6 +410,7 @@ class GridDhtPartitionSupplyPool<K, V> {
 
                         // Stop receiving promote notifications.
                         if (swapLsnr != null) {
+                            swapLsnr.waitUnswapFinished();
                             cctx.swap().removeOffHeapListener(part, swapLsnr);
                             cctx.swap().removeSwapListener(part, swapLsnr);
                         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/574d4269/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSwapPreloadSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSwapPreloadSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSwapPreloadSelfTest.java
index 57c3b9e..18245db 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSwapPreloadSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSwapPreloadSelfTest.java
@@ -42,7 +42,7 @@ import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
  */
 public class GridCacheSwapPreloadSelfTest extends GridCommonAbstractTest {
     /** Entry count. */
-    private static final int ENTRY_CNT = 15_000;
+    private static final int ENTRY_CNT = 150_000;
 
     /** */
     private final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);