You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/05/14 15:58:56 UTC

[1/6] incubator-ignite git commit: #ignite-614: Add delay in FileSwapSpaceSpi.remove only for testing.

Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-614 [created] 37e60c1a0


#ignite-614: Add delay in FileSwapSpaceSpi.remove only for testing.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/ee180bc6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/ee180bc6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/ee180bc6

Branch: refs/heads/ignite-614
Commit: ee180bc6e77d5c74f9e27d4889ccf2e19684a924
Parents: 7e599b6
Author: ivasilinets <iv...@gridgain.com>
Authored: Thu May 14 13:17:41 2015 +0300
Committer: ivasilinets <iv...@gridgain.com>
Committed: Thu May 14 13:17:41 2015 +0300

----------------------------------------------------------------------
 .../spi/swapspace/file/FileSwapSpaceSpi.java    |   7 ++
 .../cache/GridCacheSwapPreloadSelfTest.java     | 102 ++++++++++---------
 2 files changed, 59 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ee180bc6/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java
index e7db285..e8e7191 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java
@@ -392,6 +392,13 @@ public class FileSwapSpaceSpi extends IgniteSpiAdapter implements SwapSpaceSpi,
 
         byte[] val = space.remove(key, c != null);
 
+        try {
+            U.sleep(1000);
+        }
+        catch (IgniteInterruptedCheckedException e) {
+            e.printStackTrace();
+        }
+
         if (c != null)
             c.apply(val);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ee180bc6/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSwapPreloadSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSwapPreloadSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSwapPreloadSelfTest.java
index 6176da4..57c3b9e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSwapPreloadSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSwapPreloadSelfTest.java
@@ -156,83 +156,85 @@ public class GridCacheSwapPreloadSelfTest extends GridCommonAbstractTest {
         final AtomicBoolean done = new AtomicBoolean();
         IgniteInternalFuture<?> fut = null;
 
-        try {
-            startGrid(0);
+        for (int attempt = 0; attempt < 10; attempt++) {
+            try {
+                startGrid(0);
 
-            final IgniteCache<Integer, Integer> cache = grid(0).cache(null);
+                final IgniteCache<Integer, Integer> cache = grid(0).cache(null);
 
-            assertNotNull(cache);
+                assertNotNull(cache);
 
-            // Populate.
-            for (int i = 0; i < ENTRY_CNT; i++)
-                cache.put(i, i);
+                // Populate.
+                for (int i = 0; i < ENTRY_CNT; i++)
+                    cache.put(i, i);
 
-            Set<Integer> keys = new HashSet<>();
+                Set<Integer> keys = new HashSet<>();
 
-            for (Cache.Entry<Integer, Integer> entry : cache.localEntries())
-                keys.add(entry.getKey());
+                for (Cache.Entry<Integer, Integer> entry : cache.localEntries())
+                    keys.add(entry.getKey());
 
-            cache.localEvict(keys);
+                cache.localEvict(keys);
+
+                fut = multithreadedAsync(new Callable<Object>() {
+                    @Nullable @Override public Object call() throws Exception {
+                        Random rnd = new Random();
 
-            fut = multithreadedAsync(new Callable<Object>() {
-                @Nullable @Override public Object call() throws Exception {
-                    Random rnd = new Random();
+                        while (!done.get()) {
+                            int key = rnd.nextInt(ENTRY_CNT);
 
-                    while (!done.get()) {
-                        int key = rnd.nextInt(ENTRY_CNT);
+                            Integer i = cache.get(key);
 
-                        Integer i = cache.get(key);
+                            assertNotNull(i);
+                            assertEquals(Integer.valueOf(key), i);
 
-                        assertNotNull(i);
-                        assertEquals(Integer.valueOf(key), i);
+                            //cache.localEvict(Collections.singleton(rnd.nextInt(ENTRY_CNT)));
+                        }
 
-                        cache.localEvict(Collections.singleton(rnd.nextInt(ENTRY_CNT)));
+                        return null;
                     }
+                }, 10);
 
-                    return null;
-                }
-            }, 10);
+                startGrid(1);
 
-            startGrid(1);
+                done.set(true);
 
-            done.set(true);
+                fut.get();
 
-            fut.get();
+                fut = null;
 
-            fut = null;
+                int size = grid(1).cache(null).localSize(CachePeekMode.PRIMARY, CachePeekMode.BACKUP,
+                    CachePeekMode.NEAR, CachePeekMode.ONHEAP);
 
-            int size = grid(1).cache(null).localSize(CachePeekMode.PRIMARY, CachePeekMode.BACKUP,
-                CachePeekMode.NEAR, CachePeekMode.ONHEAP);
+                info("New node cache size: " + size);
 
-            info("New node cache size: " + size);
+                if (size != ENTRY_CNT) {
+                    Set<Integer> keySet = new TreeSet<>();
 
-            if (size != ENTRY_CNT) {
-                Set<Integer> keySet = new TreeSet<>();
+                    int next = 0;
 
-                int next = 0;
+                    for (IgniteCache.Entry<Integer, Integer> e : grid(1).<Integer, Integer>cache(null).localEntries())
+                        keySet.add(e.getKey());
 
-                for (IgniteCache.Entry<Integer, Integer> e : grid(1).<Integer, Integer>cache(null).localEntries())
-                    keySet.add(e.getKey());
+                    for (Integer i : keySet) {
+                        while (next < i)
+                            info("Missing key: " + next++);
 
-                for (Integer i : keySet) {
-                    while (next < i)
-                        info("Missing key: " + next++);
-
-                    next++;
+                        next++;
+                    }
                 }
-            }
 
-            assertEquals(ENTRY_CNT, size);
-        }
-        finally {
-            done.set(true);
-
-            try {
-                if (fut != null)
-                    fut.get();
+                assertEquals(ENTRY_CNT, size);
             }
             finally {
-                stopAllGrids();
+                done.set(true);
+
+                try {
+                    if (fut != null)
+                        fut.get();
+                }
+                finally {
+                    stopAllGrids();
+                }
             }
         }
     }


[6/6] incubator-ignite git commit: #ignite-614: Remove code for test.

Posted by sb...@apache.org.
#ignite-614: Remove code for test.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/37e60c1a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/37e60c1a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/37e60c1a

Branch: refs/heads/ignite-614
Commit: 37e60c1a0e2e37b4f2d9b472339afb4982eacd53
Parents: 57d7fbd
Author: ivasilinets <iv...@gridgain.com>
Authored: Thu May 14 16:54:30 2015 +0300
Committer: ivasilinets <iv...@gridgain.com>
Committed: Thu May 14 16:54:30 2015 +0300

----------------------------------------------------------------------
 .../apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java    | 7 -------
 1 file changed, 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/37e60c1a/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java
index e8e7191..e7db285 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java
@@ -392,13 +392,6 @@ public class FileSwapSpaceSpi extends IgniteSpiAdapter implements SwapSpaceSpi,
 
         byte[] val = space.remove(key, c != null);
 
-        try {
-            U.sleep(1000);
-        }
-        catch (IgniteInterruptedCheckedException e) {
-            e.printStackTrace();
-        }
-
         if (c != null)
             c.apply(val);
 


[4/6] incubator-ignite git commit: #ignite-614: wip.

Posted by sb...@apache.org.
#ignite-614: wip.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/dcac35ce
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/dcac35ce
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/dcac35ce

Branch: refs/heads/ignite-614
Commit: dcac35ce2662ef82eb62265f2b5ed7dd19d9ca46
Parents: 87c1275
Author: ivasilinets <iv...@gridgain.com>
Authored: Thu May 14 16:08:12 2015 +0300
Committer: ivasilinets <iv...@gridgain.com>
Committed: Thu May 14 16:08:12 2015 +0300

----------------------------------------------------------------------
 .../GridCacheEntryInfoCollectSwapListener.java  | 68 +++++++++++---------
 .../processors/cache/GridCacheSwapListener.java |  4 +-
 .../processors/cache/GridCacheSwapManager.java  | 11 ++--
 .../preloader/GridDhtPartitionSupplyPool.java   |  1 +
 4 files changed, 46 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dcac35ce/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfoCollectSwapListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfoCollectSwapListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfoCollectSwapListener.java
index bd1746d..9a7511c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfoCollectSwapListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfoCollectSwapListener.java
@@ -18,6 +18,8 @@
 package org.apache.ignite.internal.processors.cache;
 
 import org.apache.ignite.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.client.util.*;
 import org.jsr166.*;
 
 import java.util.*;
@@ -30,11 +32,14 @@ public class GridCacheEntryInfoCollectSwapListener implements GridCacheSwapListe
     /** */
     private final Map<KeyCacheObject, GridCacheEntryInfo> swappedEntries = new ConcurrentHashMap8<>();
 
-    private final ConcurrentHashMap8<KeyCacheObject, GridCacheEntryInfo>  notFinishedSwappedEntries = new ConcurrentHashMap8<>();
+    /** Entries in swapping. */
+    private final GridConcurrentHashSet<KeyCacheObject> swappingKeys = new GridConcurrentHashSet();
 
+    /** Lock for empty condition. */
+    final Lock emptyLock = new ReentrantLock();
 
-    final Lock lock = new ReentrantLock();
-    final Condition emptyCond  = lock.newCondition();
+    /** Condition for empty swapping entries. */
+    final Condition emptyCond  = emptyLock.newCondition();
 
     /** */
     private final IgniteLogger log;
@@ -49,27 +54,37 @@ public class GridCacheEntryInfoCollectSwapListener implements GridCacheSwapListe
     /**
      * Wait until all entries finish unswapping.
      */
-    public void waitUnswapFinished() {
-        lock.lock();
-        try{
-            if (notFinishedSwappedEntries.size() != 0)
-                try {
-                    emptyCond.await();
-                }
-                catch (InterruptedException e) {
-                    // No-op.
-                }
-        } finally {
-            lock.unlock();
+    public void waitUnswapFinished() throws IgniteCheckedException {
+        emptyLock.lock();
+
+        try {
+            if (swappingKeys.size() != 0)
+                emptyCond.await();
+        }
+        catch (InterruptedException e) {
+            throw new IgniteInterruptedCheckedException(e);
+        }
+        finally {
+            emptyLock.unlock();
         }
     }
 
     /** {@inheritDoc} */
-    @Override public void onEntryUnswapping(int part, KeyCacheObject key, GridCacheSwapEntry swapEntry) throws IgniteCheckedException {
+    @Override public void onEntryUnswapping(KeyCacheObject key) throws IgniteCheckedException {
         if (log.isDebugEnabled())
-            log.debug("Received unswapped event for key: " + key);
+            log.debug("Received unswapping event for key: " + key);
 
         assert key != null;
+
+        swappingKeys.add(key);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onEntryUnswapped(int part,
+        KeyCacheObject key,
+        GridCacheSwapEntry swapEntry)
+    {
+        assert key != null;
         assert swapEntry != null;
 
         GridCacheEntryInfo info = new GridCacheEntryInfo();
@@ -80,26 +95,17 @@ public class GridCacheEntryInfoCollectSwapListener implements GridCacheSwapListe
         info.version(swapEntry.version());
         info.value(swapEntry.value());
 
-        notFinishedSwappedEntries.put(key, info);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onEntryUnswapped(int part,
-        KeyCacheObject key,
-        GridCacheSwapEntry swapEntry)
-    {
-        GridCacheEntryInfo info = notFinishedSwappedEntries.remove(key);
+        swappedEntries.put(key, info);
 
-        assert info != null;
+        swappingKeys.remove(key);
 
-        swappedEntries.put(key, info);
+        emptyLock.lock();
 
-        lock.lock();
         try{
-            if (notFinishedSwappedEntries.size() == 0)
+            if (swappingKeys.size() == 0)
                 emptyCond.signalAll();
         } finally {
-            lock.unlock();
+            emptyLock.unlock();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dcac35ce/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapListener.java
index d8d0ddc..d6d13ff 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapListener.java
@@ -24,12 +24,10 @@ import org.apache.ignite.*;
  */
 public interface GridCacheSwapListener {
     /**
-     * @param part Partition.
      * @param key Cache key.
-     * @param e Entry.
      * @throws IgniteCheckedException If failed.
      */
-    public void onEntryUnswapping(int part, KeyCacheObject key, GridCacheSwapEntry e) throws IgniteCheckedException;
+    public void onEntryUnswapping(KeyCacheObject key) throws IgniteCheckedException;
 
     /**
      * @param part Partition.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dcac35ce/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
index fed83de..c179b31 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
@@ -580,8 +580,12 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
             part,
             key.valueBytes(cctx.cacheObjectContext()));
 
-        for (GridCacheSwapListener lsnr : swapLsnrs.get(part))
-            lsnr.onEntryUnswapping(part, key, entry);
+        Collection<GridCacheSwapListener> lsnrs = swapLsnrs.get(part);
+
+        if (lsnrs != null) {
+            for (GridCacheSwapListener lsnr : lsnrs)
+                lsnr.onEntryUnswapping(key);
+        }
 
         swapMgr.remove(spaceName, swapKey,  new CI1<byte[]>() {
             @Override public void apply(byte[] rmv) {
@@ -595,7 +599,6 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
                         t.set(entry);
 
                         CacheObject v = entry.value();
-                        byte[] valBytes = entry.valueBytes();
 
                         // Event notification.
                         if (cctx.events().isRecordable(EVT_CACHE_OBJECT_UNSWAPPED)) {
@@ -1928,7 +1931,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
         }
 
         /** {@inheritDoc} */
-        @Override public void onEntryUnswapping(int part, KeyCacheObject key, GridCacheSwapEntry e)
+        @Override public void onEntryUnswapping(KeyCacheObject key)
             throws IgniteCheckedException {
             // No-op.
         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dcac35ce/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
index 39fd9ac..3b93c09 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
@@ -411,6 +411,7 @@ class GridDhtPartitionSupplyPool<K, V> {
                         // Stop receiving promote notifications.
                         if (swapLsnr != null) {
                             swapLsnr.waitUnswapFinished();
+
                             cctx.swap().removeOffHeapListener(part, swapLsnr);
                             cctx.swap().removeSwapListener(part, swapLsnr);
                         }


[3/6] incubator-ignite git commit: Merge remote-tracking branch 'remotes/origin/ignite-sprint-5' into ignite-614

Posted by sb...@apache.org.
Merge remote-tracking branch 'remotes/origin/ignite-sprint-5' into ignite-614


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/87c12750
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/87c12750
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/87c12750

Branch: refs/heads/ignite-614
Commit: 87c12750beb4b35b7fd7d82214593fd819e57f4f
Parents: 574d426 593e3ee
Author: ivasilinets <iv...@gridgain.com>
Authored: Thu May 14 15:43:40 2015 +0300
Committer: ivasilinets <iv...@gridgain.com>
Committed: Thu May 14 15:43:40 2015 +0300

----------------------------------------------------------------------
 bin/ignite-schema-import.bat                    |   2 +-
 bin/ignite-schema-import.sh                     |   2 +-
 bin/ignite.bat                                  |   2 +-
 bin/ignitevisorcmd.bat                          |   2 +-
 bin/ignitevisorcmd.sh                           |   2 +-
 .../processors/cache/GridCacheAdapter.java      | 119 +++++------
 .../GridDistributedCacheAdapter.java            | 210 ++++++++++++-------
 .../GridDhtPartitionsExchangeFuture.java        |   2 +-
 .../cache/CacheRemoveAllSelfTest.java           |  81 +++++++
 .../near/NoneRebalanceModeSelfTest.java         |  67 ++++++
 .../testsuites/IgniteCacheTestSuite2.java       |   1 +
 .../testsuites/IgniteCacheTestSuite4.java       |   2 +
 12 files changed, 344 insertions(+), 148 deletions(-)
----------------------------------------------------------------------



[5/6] incubator-ignite git commit: #ignite-614: wip.

Posted by sb...@apache.org.
#ignite-614: wip.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/57d7fbd8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/57d7fbd8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/57d7fbd8

Branch: refs/heads/ignite-614
Commit: 57d7fbd8f084a43614c1100f2b5578a599d9b008
Parents: dcac35c
Author: ivasilinets <iv...@gridgain.com>
Authored: Thu May 14 16:16:46 2015 +0300
Committer: ivasilinets <iv...@gridgain.com>
Committed: Thu May 14 16:16:46 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheEntryInfoCollectSwapListener.java   | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57d7fbd8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfoCollectSwapListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfoCollectSwapListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfoCollectSwapListener.java
index 9a7511c..9cf40cb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfoCollectSwapListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfoCollectSwapListener.java
@@ -104,7 +104,8 @@ public class GridCacheEntryInfoCollectSwapListener implements GridCacheSwapListe
         try{
             if (swappingKeys.size() == 0)
                 emptyCond.signalAll();
-        } finally {
+        }
+        finally {
             emptyLock.unlock();
         }
     }


[2/6] incubator-ignite git commit: #ignite-614: wip.

Posted by sb...@apache.org.
#ignite-614: wip.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/574d4269
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/574d4269
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/574d4269

Branch: refs/heads/ignite-614
Commit: 574d4269bf321d252a6221984833567fc44fe5b3
Parents: ee180bc
Author: ivasilinets <iv...@gridgain.com>
Authored: Thu May 14 15:25:16 2015 +0300
Committer: ivasilinets <iv...@gridgain.com>
Committed: Thu May 14 15:25:16 2015 +0300

----------------------------------------------------------------------
 .../GridCacheEntryInfoCollectSwapListener.java  | 50 ++++++++++++++++++--
 .../processors/cache/GridCacheSwapListener.java |  8 ++++
 .../processors/cache/GridCacheSwapManager.java  | 11 ++++-
 .../preloader/GridDhtPartitionSupplyPool.java   |  1 +
 .../cache/GridCacheSwapPreloadSelfTest.java     |  2 +-
 5 files changed, 66 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/574d4269/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfoCollectSwapListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfoCollectSwapListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfoCollectSwapListener.java
index e227b43..bd1746d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfoCollectSwapListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfoCollectSwapListener.java
@@ -21,6 +21,7 @@ import org.apache.ignite.*;
 import org.jsr166.*;
 
 import java.util.*;
+import java.util.concurrent.locks.*;
 
 /**
  *
@@ -29,6 +30,12 @@ public class GridCacheEntryInfoCollectSwapListener implements GridCacheSwapListe
     /** */
     private final Map<KeyCacheObject, GridCacheEntryInfo> swappedEntries = new ConcurrentHashMap8<>();
 
+    private final ConcurrentHashMap8<KeyCacheObject, GridCacheEntryInfo>  notFinishedSwappedEntries = new ConcurrentHashMap8<>();
+
+
+    final Lock lock = new ReentrantLock();
+    final Condition emptyCond  = lock.newCondition();
+
     /** */
     private final IgniteLogger log;
 
@@ -39,11 +46,26 @@ public class GridCacheEntryInfoCollectSwapListener implements GridCacheSwapListe
         this.log = log;
     }
 
+    /**
+     * Wait until all entries finish unswapping.
+     */
+    public void waitUnswapFinished() {
+        lock.lock();
+        try{
+            if (notFinishedSwappedEntries.size() != 0)
+                try {
+                    emptyCond.await();
+                }
+                catch (InterruptedException e) {
+                    // No-op.
+                }
+        } finally {
+            lock.unlock();
+        }
+    }
+
     /** {@inheritDoc} */
-    @Override public void onEntryUnswapped(int part,
-        KeyCacheObject key,
-        GridCacheSwapEntry swapEntry)
-    {
+    @Override public void onEntryUnswapping(int part, KeyCacheObject key, GridCacheSwapEntry swapEntry) throws IgniteCheckedException {
         if (log.isDebugEnabled())
             log.debug("Received unswapped event for key: " + key);
 
@@ -58,7 +80,27 @@ public class GridCacheEntryInfoCollectSwapListener implements GridCacheSwapListe
         info.version(swapEntry.version());
         info.value(swapEntry.value());
 
+        notFinishedSwappedEntries.put(key, info);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onEntryUnswapped(int part,
+        KeyCacheObject key,
+        GridCacheSwapEntry swapEntry)
+    {
+        GridCacheEntryInfo info = notFinishedSwappedEntries.remove(key);
+
+        assert info != null;
+
         swappedEntries.put(key, info);
+
+        lock.lock();
+        try{
+            if (notFinishedSwappedEntries.size() == 0)
+                emptyCond.signalAll();
+        } finally {
+            lock.unlock();
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/574d4269/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapListener.java
index 811b951f..d8d0ddc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapListener.java
@@ -29,5 +29,13 @@ public interface GridCacheSwapListener {
      * @param e Entry.
      * @throws IgniteCheckedException If failed.
      */
+    public void onEntryUnswapping(int part, KeyCacheObject key, GridCacheSwapEntry e) throws IgniteCheckedException;
+
+    /**
+     * @param part Partition.
+     * @param key Cache key.
+     * @param e Entry.
+     * @throws IgniteCheckedException If failed.
+     */
     public void onEntryUnswapped(int part, KeyCacheObject key, GridCacheSwapEntry e) throws IgniteCheckedException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/574d4269/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
index eb82218..fed83de 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
@@ -580,7 +580,10 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
             part,
             key.valueBytes(cctx.cacheObjectContext()));
 
-        swapMgr.remove(spaceName, swapKey, new CI1<byte[]>() {
+        for (GridCacheSwapListener lsnr : swapLsnrs.get(part))
+            lsnr.onEntryUnswapping(part, key, entry);
+
+        swapMgr.remove(spaceName, swapKey,  new CI1<byte[]>() {
             @Override public void apply(byte[] rmv) {
                 if (rmv != null) {
                     try {
@@ -1925,6 +1928,12 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
         }
 
         /** {@inheritDoc} */
+        @Override public void onEntryUnswapping(int part, KeyCacheObject key, GridCacheSwapEntry e)
+            throws IgniteCheckedException {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
         @Override public void onEntryUnswapped(int part,
             KeyCacheObject key,
             GridCacheSwapEntry e) throws IgniteCheckedException {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/574d4269/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
index 5d9677d..39fd9ac 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
@@ -410,6 +410,7 @@ class GridDhtPartitionSupplyPool<K, V> {
 
                         // Stop receiving promote notifications.
                         if (swapLsnr != null) {
+                            swapLsnr.waitUnswapFinished();
                             cctx.swap().removeOffHeapListener(part, swapLsnr);
                             cctx.swap().removeSwapListener(part, swapLsnr);
                         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/574d4269/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSwapPreloadSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSwapPreloadSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSwapPreloadSelfTest.java
index 57c3b9e..18245db 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSwapPreloadSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSwapPreloadSelfTest.java
@@ -42,7 +42,7 @@ import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
  */
 public class GridCacheSwapPreloadSelfTest extends GridCommonAbstractTest {
     /** Entry count. */
-    private static final int ENTRY_CNT = 15_000;
+    private static final int ENTRY_CNT = 150_000;
 
     /** */
     private final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);