You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/03/02 16:12:33 UTC

[11/50] incubator-ignite git commit: IGNITE-136 Clear local store for entry from swap.

IGNITE-136 Clear local store for entry from swap.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/f33c0748
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/f33c0748
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/f33c0748

Branch: refs/heads/ignite-334
Commit: f33c07486ff932672b838604af3acb67397f34fc
Parents: 6ba6090
Author: nikolay_tikhonov <nt...@gridgain.com>
Authored: Wed Feb 25 13:37:33 2015 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Wed Feb 25 13:44:46 2015 +0300

----------------------------------------------------------------------
 .../distributed/dht/GridDhtLocalPartition.java  |   9 +-
 .../GridCacheAbstractLocalStoreSelfTest.java    | 118 ++++++++++++++-----
 .../GridCachePartitionedLocalStoreSelfTest.java |   2 +-
 ...chePartitionedOffHeapLocalStoreSelfTest.java |  56 +++++++++
 .../GridCacheReplicatedLocalStoreSelfTest.java  |   2 +-
 ...ridCacheTxPartitionedLocalStoreSelfTest.java |   2 +-
 .../ignite/testsuites/IgniteCacheTestSuite.java |   3 +-
 7 files changed, 157 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f33c0748/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
index 93a6012..e3efade 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
@@ -488,6 +488,8 @@ public class GridDhtLocalPartition<K, V> implements Comparable<GridDhtLocalParti
         try {
             GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry<V>>> it = cctx.swap().iterator(id, false);
 
+            boolean isLocStore = cctx.store().isLocalStore();
+
             if (it != null) {
                 // We can safely remove these values because no entries will be created for evicted partition.
                 while (it.hasNext()) {
@@ -498,6 +500,9 @@ public class GridDhtLocalPartition<K, V> implements Comparable<GridDhtLocalParti
                     K key = cctx.marshaller().unmarshal(keyBytes, cctx.deploy().globalLoader());
 
                     cctx.swap().remove(key, keyBytes);
+
+                    if (isLocStore)
+                        cctx.store().removeFromStore(null, key);
                 }
             }
         }
@@ -531,7 +536,7 @@ public class GridDhtLocalPartition<K, V> implements Comparable<GridDhtLocalParti
 
         boolean rec = cctx.events().isRecordable(EVT_CACHE_PRELOAD_OBJECT_UNLOADED);
 
-        boolean locStore = cctx.store().isLocalStore();
+        boolean isLocStore = cctx.store().isLocalStore();
 
         for (Iterator<GridDhtCacheEntry<K, V>> it = map.values().iterator(); it.hasNext();) {
             GridDhtCacheEntry<K, V> cached = it.next();
@@ -540,7 +545,7 @@ public class GridDhtLocalPartition<K, V> implements Comparable<GridDhtLocalParti
                 if (cached.clearInternal(clearVer, swap)) {
                     it.remove();
 
-                    if (locStore)
+                    if (isLocStore)
                         cctx.store().removeFromStore(null, cached.key());
 
                     if (!cached.isInternal()) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f33c0748/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractLocalStoreSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractLocalStoreSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractLocalStoreSelfTest.java
index ed1b889..9bf9bc4 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractLocalStoreSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractLocalStoreSelfTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.cache;
 
+import com.google.common.collect.*;
 import org.apache.ignite.*;
 import org.apache.ignite.cache.*;
 import org.apache.ignite.cache.store.*;
@@ -37,6 +38,7 @@ import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.*;
 
+import static org.apache.ignite.cache.CacheMemoryMode.*;
 import static org.apache.ignite.cache.CacheMode.*;
 import static org.apache.ignite.cache.CachePreloadMode.*;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
@@ -100,13 +102,16 @@ public abstract class GridCacheAbstractLocalStoreSelfTest extends GridCommonAbst
         LOCAL_STORE_3.clear();
     }
 
+    /**
+     * 
+     */
     private CacheConfiguration cache(String gridName, String cacheName, int backups) {
         CacheConfiguration cacheCfg = new CacheConfiguration();
 
         cacheCfg.setName(cacheName);
         cacheCfg.setCacheMode(getCacheMode());
         cacheCfg.setAtomicityMode(getAtomicMode());
-        cacheCfg.setDistributionMode(getDisrtMode());
+        cacheCfg.setDistributionMode(getDistributionMode());
         cacheCfg.setWriteSynchronizationMode(FULL_SYNC);
         cacheCfg.setPreloadMode(SYNC);
 
@@ -120,6 +125,11 @@ public abstract class GridCacheAbstractLocalStoreSelfTest extends GridCommonAbst
         cacheCfg.setWriteThrough(true);
         cacheCfg.setReadThrough(true);
         cacheCfg.setBackups(backups);
+        cacheCfg.setOffHeapMaxMemory(0);
+        cacheCfg.setSwapEnabled(true);
+
+        if (isOffHeapTiredMode())
+            cacheCfg.setMemoryMode(OFFHEAP_TIERED);
 
         return cacheCfg;
     }
@@ -127,7 +137,7 @@ public abstract class GridCacheAbstractLocalStoreSelfTest extends GridCommonAbst
     /**
      * @return Distribution mode.
      */
-    protected abstract CacheDistributionMode getDisrtMode();
+    protected abstract CacheDistributionMode getDistributionMode();
 
     /**
      * @return Cache atomicity mode.
@@ -139,6 +149,13 @@ public abstract class GridCacheAbstractLocalStoreSelfTest extends GridCommonAbst
      */
     protected abstract CacheMode getCacheMode();
 
+    /**
+     * @return Cache memory mode.
+     */
+    protected boolean isOffHeapTiredMode() {
+        return false;
+    }
+
     /** {@inheritDoc} */
     @Override protected void afterTestsStopped() throws Exception {
         stopAllGrids();
@@ -152,57 +169,47 @@ public abstract class GridCacheAbstractLocalStoreSelfTest extends GridCommonAbst
 
         IgniteCache<Object, Object> cache = ignite1.jcache(null);
 
+        // Populate cache and check that local store has all value.
         for (int i = 0; i < KEYS; i++)
             cache.put(i, i);
 
-        for (int i = 0; i < KEYS; i++)
-            assertEquals(LOCAL_STORE_1.load(i).get1().intValue(), i);
+        checkLocalStore(ignite1, LOCAL_STORE_1);
 
-        final CountDownLatch startPartExchange = new CountDownLatch(1);
-        final AtomicBoolean eventOcr = new AtomicBoolean(true);
+        final CountDownLatch partExchanged = new CountDownLatch(1);
+
+        final int[] leftPartition = new int[1];
 
         if (getCacheMode() != REPLICATED) {
             ignite1.events().localListen(new IgnitePredicate<Event>() {
+                private AtomicInteger eventCnt = new AtomicInteger(0);
+                
                 @Override public boolean apply(Event event) {
-                    startPartExchange.countDown();
-
-                    eventOcr.set(true);
-
+                    if (leftPartition[0] - eventCnt.incrementAndGet() == 0)
+                        partExchanged.countDown();
+                                            
                     return true;
                 }
-            }, EventType.EVT_CACHE_PRELOAD_OBJECT_UNLOADED);
+            }, EventType.EVT_CACHE_PRELOAD_PART_UNLOADED);
         }
 
         Ignite ignite2 = startGrid(2);
 
+        // Partition count which must be transferred to 2'nd node.
+        leftPartition[0] = ignite2.affinity(null).allPartitions(ignite2.cluster().localNode()).length;
+
         assertEquals(Ignition.allGrids().size(), 2);
 
         // Wait when partition unloaded.
-        waitExpirePartition(startPartExchange, eventOcr);
+        if (getCacheMode() != REPLICATED)
+            assert partExchanged.await(2, TimeUnit.SECONDS);
 
         checkLocalStore(ignite1, LOCAL_STORE_1);
         checkLocalStore(ignite2, LOCAL_STORE_2);
     }
 
     /**
-     * Wait when partition unloaded.
+     * @throws Exception If failed.
      */
-    private void waitExpirePartition(CountDownLatch startPartExchange, AtomicBoolean eventOcr) throws Exception {
-        if (getCacheMode() != REPLICATED) {
-            assert startPartExchange.await(2, TimeUnit.SECONDS);
-
-            while (true) {
-                if (eventOcr.get()) {
-                    eventOcr.set(false);
-
-                    TimeUnit.MILLISECONDS.sleep(100);
-                }
-                else
-                    break;
-            }
-        }
-    }
-
     public void testBackupNode() throws Exception {
         Ignite ignite1 = startGrid(1);
 
@@ -241,6 +248,59 @@ public abstract class GridCacheAbstractLocalStoreSelfTest extends GridCommonAbst
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testSwap() throws Exception {
+        Ignite ignite1 = startGrid(1);
+
+        IgniteCache<Object, Object> cache = ignite1.jcache(null);
+
+        // Populate cache and check that local store has all value.
+        for (int i = 0; i < KEYS; i++)
+            cache.put(i, i);
+
+        checkLocalStore(ignite1, LOCAL_STORE_1);
+
+        // Push entry to swap.
+        for (int i = 0; i < KEYS; i++)
+            cache.localEvict(Lists.newArrayList(i));
+
+        for (int i = 0; i < KEYS; i++)
+            assertNull(cache.localPeek(i, CachePeekMode.ONHEAP));
+
+        final CountDownLatch partExchanged = new CountDownLatch(1);
+
+        final int[] leftPartition = new int[1];
+
+        if (getCacheMode() != REPLICATED) {
+            ignite1.events().localListen(new IgnitePredicate<Event>() {
+                private AtomicInteger eventCnt = new AtomicInteger(0);
+
+                @Override public boolean apply(Event event) {
+                    if (leftPartition[0] - eventCnt.incrementAndGet() == 0)
+                        partExchanged.countDown();
+
+                    return true;
+                }
+            }, EventType.EVT_CACHE_PRELOAD_PART_UNLOADED);
+        }
+
+        Ignite ignite2 = startGrid(2);
+
+        // Partition count which must be transferred to 2'nd node.
+        leftPartition[0] = ignite2.affinity(null).allPartitions(ignite2.cluster().localNode()).length;
+
+        assertEquals(Ignition.allGrids().size(), 2);
+
+        // Wait when partition unloaded.
+        if (getCacheMode() != REPLICATED)
+            assert partExchanged.await(2, TimeUnit.SECONDS);
+
+        checkLocalStore(ignite1, LOCAL_STORE_1);
+        checkLocalStore(ignite2, LOCAL_STORE_2);
+    }
+
+    /**
      * Check that local stores contains only primary entry.
      */
     private void checkLocalStore(Ignite ignite, CacheStore<Integer, IgniteBiTuple<Integer, ?>> store) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f33c0748/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedLocalStoreSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedLocalStoreSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedLocalStoreSelfTest.java
index d2dfcf0..4217531 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedLocalStoreSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedLocalStoreSelfTest.java
@@ -35,7 +35,7 @@ public class GridCachePartitionedLocalStoreSelfTest extends GridCacheAbstractLoc
     }
 
     /** {@inheritDoc} */
-    @Override protected CacheDistributionMode getDisrtMode() {
+    @Override protected CacheDistributionMode getDistributionMode() {
         return PARTITIONED_ONLY;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f33c0748/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedOffHeapLocalStoreSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedOffHeapLocalStoreSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedOffHeapLocalStoreSelfTest.java
new file mode 100644
index 0000000..6dfc977
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedOffHeapLocalStoreSelfTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.cache.*;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.cache.CacheDistributionMode.*;
+import static org.apache.ignite.cache.CacheMode.*;
+
+/**
+ *
+ */
+public class GridCachePartitionedOffHeapLocalStoreSelfTest extends GridCacheAbstractLocalStoreSelfTest {
+    /**
+     *
+     */
+    public GridCachePartitionedOffHeapLocalStoreSelfTest() {
+        super();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheDistributionMode getDistributionMode() {
+        return PARTITIONED_ONLY;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicityMode getAtomicMode() {
+        return ATOMIC;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheMode getCacheMode() {
+        return PARTITIONED;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected boolean isOffHeapTiredMode() {
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f33c0748/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheReplicatedLocalStoreSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheReplicatedLocalStoreSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheReplicatedLocalStoreSelfTest.java
index 2d43d13..56f3f1a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheReplicatedLocalStoreSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheReplicatedLocalStoreSelfTest.java
@@ -35,7 +35,7 @@ public class GridCacheReplicatedLocalStoreSelfTest extends GridCacheAbstractLoca
     }
 
     /** {@inheritDoc} */
-    @Override protected CacheDistributionMode getDisrtMode() {
+    @Override protected CacheDistributionMode getDistributionMode() {
         return PARTITIONED_ONLY;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f33c0748/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTxPartitionedLocalStoreSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTxPartitionedLocalStoreSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTxPartitionedLocalStoreSelfTest.java
index e031102..113bac3 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTxPartitionedLocalStoreSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTxPartitionedLocalStoreSelfTest.java
@@ -35,7 +35,7 @@ public class GridCacheTxPartitionedLocalStoreSelfTest extends GridCacheAbstractL
     }
 
     /** {@inheritDoc} */
-    @Override protected CacheDistributionMode getDisrtMode() {
+    @Override protected CacheDistributionMode getDistributionMode() {
         return PARTITIONED_ONLY;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f33c0748/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
index 28c9b57..6b3eb4a 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
@@ -140,7 +140,8 @@ public class IgniteCacheTestSuite extends TestSuite {
         suite.addTestSuite(GridCacheGlobalLoadTest.class);
         suite.addTestSuite(GridCachePartitionedLocalStoreSelfTest.class);
         suite.addTestSuite(GridCacheReplicatedLocalStoreSelfTest.class);
-        //suite.addTestSuite(GridCacheTxPartitionedLocalStoreSelfTest.class); TODO GG-9762
+        suite.addTestSuite(GridCachePartitionedOffHeapLocalStoreSelfTest.class);
+        suite.addTestSuite(GridCacheTxPartitionedLocalStoreSelfTest.class);
 
         // Heuristic exception handling. TODO IGNITE-257
 //        suite.addTestSuite(GridCacheColocatedTxExceptionSelfTest.class);