You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by dg...@apache.org on 2018/11/11 22:16:28 UTC

[1/2] ignite git commit: IGNITE-10207 rework lossPolicy tests

Repository: ignite
Updated Branches:
  refs/heads/ignite-10207 [created] 19d8d902f


IGNITE-10207 rework lossPolicy tests

Signed-off-by: Dmitriy Govorukhin <dm...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/162e5738
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/162e5738
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/162e5738

Branch: refs/heads/ignite-10207
Commit: 162e57381f1edd3f3b3a252894fe40231c332474
Parents: 70952fe
Author: Dmitriy Govorukhin <dm...@gmail.com>
Authored: Sat Nov 10 01:09:12 2018 +0300
Committer: Dmitriy Govorukhin <dm...@gmail.com>
Committed: Sat Nov 10 01:09:12 2018 +0300

----------------------------------------------------------------------
 ...CacheResultIsNotNullOnPartitionLossTest.java |  23 +-
 .../IgniteCachePartitionLossPolicySelfTest.java | 652 +++++++------------
 2 files changed, 238 insertions(+), 437 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/162e5738/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheResultIsNotNullOnPartitionLossTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheResultIsNotNullOnPartitionLossTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheResultIsNotNullOnPartitionLossTest.java
index ceafc9e..0958f83 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheResultIsNotNullOnPartitionLossTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheResultIsNotNullOnPartitionLossTest.java
@@ -17,15 +17,21 @@
 
 package org.apache.ignite.internal.processors.cache.distributed;
 
+import java.util.Collections;
+import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteClientDisconnectedException;
 import org.apache.ignite.IgniteDataStreamer;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.cache.PartitionLossPolicy;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.events.EventType;
@@ -48,13 +54,13 @@ public class CacheResultIsNotNullOnPartitionLossTest extends GridCommonAbstractT
     private static final TcpDiscoveryVmIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
 
     /** Number of servers to be started. */
-    private static final int SERVERS = 10;
+    private static final int SERVERS = 5;
 
     /** Index of node that is goning to be the only client node. */
     private static final int CLIENT_IDX = SERVERS;
 
     /** Number of cache entries to insert into the test cache. */
-    private static final int CACHE_ENTRIES_CNT = 10_000;
+    private static final int CACHE_ENTRIES_CNT = 60;
 
     /** True if {@link #getConfiguration(String)} is expected to configure client node on next invocations. */
     private boolean isClient;
@@ -75,6 +81,7 @@ public class CacheResultIsNotNullOnPartitionLossTest extends GridCommonAbstractT
                 .setCacheMode(CacheMode.PARTITIONED)
                 .setBackups(0)
                 .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC)
+                .setAffinity(new RendezvousAffinityFunction(false, 50))
                 .setPartitionLossPolicy(PartitionLossPolicy.READ_WRITE_SAFE)
         );
 
@@ -90,7 +97,12 @@ public class CacheResultIsNotNullOnPartitionLossTest extends GridCommonAbstractT
 
         cleanPersistenceDir();
 
-        startGrids(SERVERS);
+        List<Integer> list = IntStream.range(0, SERVERS).boxed().collect(Collectors.toList());
+
+        Collections.shuffle(list);
+
+        for (Integer i : list)
+            startGrid(i);
 
         isClient = true;
 
@@ -178,9 +190,9 @@ public class CacheResultIsNotNullOnPartitionLossTest extends GridCommonAbstractT
             readerThreadStarted.await(1, TimeUnit.SECONDS);
 
             for (int i = 0; i < SERVERS - 1; i++) {
-                Thread.sleep(50L);
-
                 grid(i).close();
+
+                Thread.sleep(400L);
             }
         }
         finally {
@@ -204,6 +216,7 @@ public class CacheResultIsNotNullOnPartitionLossTest extends GridCommonAbstractT
     private boolean expectedThrowableClass(Throwable throwable) {
         return X.hasCause(
             throwable,
+            IgniteClientDisconnectedException.class,
             CacheInvalidStateException.class,
             ClusterTopologyCheckedException.class,
             IllegalStateException.class,

http://git-wip-us.apache.org/repos/asf/ignite/blob/162e5738/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCachePartitionLossPolicySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCachePartitionLossPolicySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCachePartitionLossPolicySelfTest.java
index caf0829..f02563d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCachePartitionLossPolicySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCachePartitionLossPolicySelfTest.java
@@ -18,14 +18,11 @@
 package org.apache.ignite.internal.processors.cache.distributed;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
-import java.util.TreeSet;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Semaphore;
@@ -39,8 +36,6 @@ import org.apache.ignite.cache.affinity.Affinity;
 import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
 import org.apache.ignite.cache.query.ScanQuery;
 import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.configuration.DataRegionConfiguration;
-import org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.events.CacheRebalancingEvent;
 import org.apache.ignite.events.Event;
@@ -48,7 +43,7 @@ import org.apache.ignite.events.EventType;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.TestDelayingCommunicationSpi;
 import org.apache.ignite.internal.managers.communication.GridIoMessage;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsAbstractMessage;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
 import org.apache.ignite.internal.util.typedef.F;
@@ -58,18 +53,19 @@ import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
-import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 
+import static java.util.Arrays.asList;
 import static org.apache.ignite.cache.CacheMode.PARTITIONED;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.internal.processors.cache.GridCacheUtils.cacheId;
 
 /**
  *
  */
 public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTest {
     /** */
-    private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
 
     /** */
     private boolean client;
@@ -78,43 +74,36 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe
     private PartitionLossPolicy partLossPlc;
 
     /** */
-    protected static final String CACHE_NAME = "partitioned";
+    private int backups;
 
     /** */
-    private int backups = 0;
+    private final AtomicBoolean delayPartExchange = new AtomicBoolean();
 
     /** */
-    private final AtomicBoolean delayPartExchange = new AtomicBoolean(false);
-
-    /** */
-    private final TopologyChanger killSingleNode = new TopologyChanger(false, Collections.singletonList(3), Arrays.asList(0, 1, 2, 4), 0);
-
-    /** */
-    private boolean isPersistenceEnabled;
+    private final TopologyChanger killSingleNode = new TopologyChanger(
+        false, asList(3), asList(0, 1, 2, 4), 0
+    );
 
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(gridName);
 
-        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+        cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(IP_FINDER));
 
         cfg.setCommunicationSpi(new TestDelayingCommunicationSpi() {
+            /** {@inheritDoc} */
             @Override protected boolean delayMessage(Message msg, GridIoMessage ioMsg) {
-                return delayPartExchange.get() && (msg instanceof GridDhtPartitionsFullMessage || msg instanceof GridDhtPartitionsAbstractMessage);
+                return delayPartExchange.get() &&
+                    (msg instanceof GridDhtPartitionsFullMessage || msg instanceof GridDhtPartitionsAbstractMessage);
             }
 
-            @Override protected int delayMillis() {
-                return 250;
-            }
         });
 
         cfg.setClientMode(client);
 
-        cfg.setCacheConfiguration(cacheConfiguration());
+        cfg.setConsistentId(gridName);
 
-        cfg.setDataStorageConfiguration(new DataStorageConfiguration().setDefaultDataRegionConfiguration(
-            new DataRegionConfiguration().setPersistenceEnabled(isPersistenceEnabled)
-        ));
+        cfg.setCacheConfiguration(cacheConfiguration());
 
         return cfg;
     }
@@ -123,7 +112,7 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe
      * @return Cache configuration.
      */
     protected CacheConfiguration<Integer, Integer> cacheConfiguration() {
-        CacheConfiguration<Integer, Integer> cacheCfg = new CacheConfiguration<>(CACHE_NAME);
+        CacheConfiguration<Integer, Integer> cacheCfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME);
 
         cacheCfg.setCacheMode(PARTITIONED);
         cacheCfg.setBackups(backups);
@@ -135,44 +124,27 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe
     }
 
     /** {@inheritDoc} */
-    @Override protected void beforeTest() throws Exception {
-        super.beforeTest();
-
-        delayPartExchange.set(false);
-
-        partLossPlc = PartitionLossPolicy.IGNORE;
-
-        backups = 0;
-
-        isPersistenceEnabled = false;
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
     }
 
     /** {@inheritDoc} */
-    @Override protected void afterTest() throws Exception {
-        stopAllGrids();
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
 
         cleanPersistenceDir();
 
-        super.afterTest();
-    }
-
-    /**
-     * @throws Exception if failed.
-     */
-    public void testReadOnlySafe() throws Exception {
-        partLossPlc = PartitionLossPolicy.READ_ONLY_SAFE;
+        delayPartExchange.set(false);
 
-        checkLostPartition(false, true, killSingleNode);
+        backups = 0;
     }
 
     /**
      * @throws Exception if failed.
      */
-    public void testReadOnlySafeWithPersistence() throws Exception {
+    public void testReadOnlySafe() throws Exception {
         partLossPlc = PartitionLossPolicy.READ_ONLY_SAFE;
 
-        isPersistenceEnabled = true;
-
         checkLostPartition(false, true, killSingleNode);
     }
 
@@ -188,19 +160,6 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe
     /**
      * @throws Exception if failed.
      */
-    public void testReadOnlyAllWithPersistence() throws Exception {
-        fail("https://issues.apache.org/jira/browse/IGNITE-10041");
-
-        partLossPlc = PartitionLossPolicy.READ_ONLY_ALL;
-
-        isPersistenceEnabled = true;
-
-        checkLostPartition(false, false, killSingleNode);
-    }
-
-    /**
-     * @throws Exception if failed.
-     */
     public void testReadWriteSafe() throws Exception {
         partLossPlc = PartitionLossPolicy.READ_WRITE_SAFE;
 
@@ -210,17 +169,6 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe
     /**
      * @throws Exception if failed.
      */
-    public void testReadWriteSafeWithPersistence() throws Exception {
-        partLossPlc = PartitionLossPolicy.READ_WRITE_SAFE;
-
-        isPersistenceEnabled = true;
-
-        checkLostPartition(true, true, killSingleNode);
-    }
-
-    /**
-     * @throws Exception if failed.
-     */
     public void testReadWriteAll() throws Exception {
         partLossPlc = PartitionLossPolicy.READ_WRITE_ALL;
 
@@ -230,34 +178,10 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe
     /**
      * @throws Exception if failed.
      */
-    public void testReadWriteAllWithPersistence() throws Exception {
-        fail("https://issues.apache.org/jira/browse/IGNITE-10041");
-
-        partLossPlc = PartitionLossPolicy.READ_WRITE_ALL;
-
-        isPersistenceEnabled = true;
-
-        checkLostPartition(true, false, killSingleNode);
-    }
-
-    /**
-     * @throws Exception if failed.
-     */
     public void testReadWriteSafeAfterKillTwoNodes() throws Exception {
         partLossPlc = PartitionLossPolicy.READ_WRITE_SAFE;
 
-        checkLostPartition(true, true, new TopologyChanger(false, Arrays.asList(3, 2), Arrays.asList(0, 1, 4), 0));
-    }
-
-    /**
-     * @throws Exception if failed.
-     */
-    public void testReadWriteSafeAfterKillTwoNodesWithPersistence() throws Exception {
-        partLossPlc = PartitionLossPolicy.READ_WRITE_SAFE;
-
-        isPersistenceEnabled = true;
-
-        checkLostPartition(true, true, new TopologyChanger(false, Arrays.asList(3, 2), Arrays.asList(0, 1, 4), 0));
+        checkLostPartition(true, true, new TopologyChanger(false, asList(3, 2), asList(0, 1, 4), 0));
     }
 
     /**
@@ -266,18 +190,7 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe
     public void testReadWriteSafeAfterKillTwoNodesWithDelay() throws Exception {
         partLossPlc = PartitionLossPolicy.READ_WRITE_SAFE;
 
-        checkLostPartition(true, true, new TopologyChanger(false, Arrays.asList(3, 2), Arrays.asList(0, 1, 4), 20));
-    }
-
-    /**
-     * @throws Exception if failed.
-     */
-    public void testReadWriteSafeAfterKillTwoNodesWithDelayWithPersistence() throws Exception {
-        partLossPlc = PartitionLossPolicy.READ_WRITE_SAFE;
-
-        isPersistenceEnabled = true;
-
-        checkLostPartition(true, true, new TopologyChanger(false, Arrays.asList(3, 2), Arrays.asList(0, 1, 4), 20));
+        checkLostPartition(true, true, new TopologyChanger(false, asList(3, 2), asList(0, 1, 4), 20));
     }
 
     /**
@@ -288,22 +201,7 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe
 
         backups = 1;
 
-        checkLostPartition(true, true, new TopologyChanger(true, Arrays.asList(3, 2, 1), Arrays.asList(0, 4), 0));
-    }
-
-    /**
-     * @throws Exception if failed.
-     */
-    public void testReadWriteSafeWithBackupsAfterKillThreeNodesWithPersistence() throws Exception {
-        fail("https://issues.apache.org/jira/browse/IGNITE-10043");
-
-        partLossPlc = PartitionLossPolicy.READ_WRITE_SAFE;
-
-        backups = 1;
-
-        isPersistenceEnabled = true;
-
-        checkLostPartition(true, true, new TopologyChanger(true, Arrays.asList(3, 2, 1), Arrays.asList(0, 4), 0));
+        checkLostPartition(true, true, new TopologyChanger(true, asList(3, 2, 1), asList(0, 4), 0));
     }
 
     /**
@@ -312,18 +210,7 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe
     public void testReadWriteSafeAfterKillCrd() throws Exception {
         partLossPlc = PartitionLossPolicy.READ_WRITE_SAFE;
 
-        checkLostPartition(true, true, new TopologyChanger(true, Arrays.asList(3, 0), Arrays.asList(1, 2, 4), 0));
-    }
-
-    /**
-     * @throws Exception if failed.
-     */
-    public void testReadWriteSafeAfterKillCrdWithPersistence() throws Exception {
-        partLossPlc = PartitionLossPolicy.READ_WRITE_SAFE;
-
-        isPersistenceEnabled = true;
-
-        checkLostPartition(true, true, new TopologyChanger(true, Arrays.asList(3, 0), Arrays.asList(1, 2, 4), 0));
+        checkLostPartition(true, true, new TopologyChanger(true, asList(3, 0), asList(1, 2, 4), 0));
     }
 
     /**
@@ -334,20 +221,7 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe
 
         backups = 1;
 
-        checkLostPartition(true, true, new TopologyChanger(true, Arrays.asList(3, 2), Arrays.asList(0, 1, 4), 0));
-    }
-
-    /**
-     * @throws Exception if failed.
-     */
-    public void testReadWriteSafeWithBackupsWithPersistence() throws Exception {
-        partLossPlc = PartitionLossPolicy.READ_WRITE_SAFE;
-
-        backups = 1;
-
-        isPersistenceEnabled = true;
-
-        checkLostPartition(true, true, new TopologyChanger(true, Arrays.asList(3, 2), Arrays.asList(0, 1, 4), 0));
+        checkLostPartition(true, true, new TopologyChanger(true, asList(3, 2), asList(0, 1, 4), 0));
     }
 
     /**
@@ -358,95 +232,26 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe
 
         backups = 1;
 
-        checkLostPartition(true, true, new TopologyChanger(true, Arrays.asList(3, 0), Arrays.asList(1, 2, 4), 0));
-    }
-
-    /**
-     * @throws Exception if failed.
-     */
-    public void testReadWriteSafeWithBackupsAfterKillCrdWithPersistence() throws Exception {
-        partLossPlc = PartitionLossPolicy.READ_WRITE_SAFE;
-
-        backups = 1;
-
-        isPersistenceEnabled = true;
-
-        checkLostPartition(true, true, new TopologyChanger(true, Arrays.asList(3, 0), Arrays.asList(1, 2, 4), 0));
-    }
-
-    /**
-     * @throws Exception if failed.
-     */
-    public void testIgnore() throws Exception {
-        fail("https://issues.apache.org/jira/browse/IGNITE-5078");
-
-        partLossPlc = PartitionLossPolicy.IGNORE;
-
-        checkIgnore(killSingleNode);
+        checkLostPartition(true, true, new TopologyChanger(true, asList(3, 0), asList(1, 2, 4), 0));
     }
 
     /**
+     * @param topChanger topology changer.
      * @throws Exception if failed.
      */
-    public void testIgnoreWithPersistence() throws Exception {
+    public void testIgnore(TopologyChanger topChanger) throws Exception {
         fail("https://issues.apache.org/jira/browse/IGNITE-5078");
 
-        fail("https://issues.apache.org/jira/browse/IGNITE-10041");
-
-        partLossPlc = PartitionLossPolicy.IGNORE;
-
-        isPersistenceEnabled = true;
-
-        checkIgnore(killSingleNode);
-    }
-
-    /**
-     * @throws Exception if failed.
-     */
-    public void testIgnoreKillThreeNodes() throws Exception {
-        partLossPlc = PartitionLossPolicy.IGNORE;
-
-        // TODO aliveNodes should include node 4, but it fails due to https://issues.apache.org/jira/browse/IGNITE-5078.
-        // TODO need to add 4 to the aliveNodes after IGNITE-5078 is fixed.
-        // TopologyChanger onlyCrdIsAlive = new TopologyChanger(false, Arrays.asList(1, 2, 3), Arrays.asList(0, 4), 0);
-        TopologyChanger onlyCrdIsAlive = new TopologyChanger(false, Arrays.asList(1, 2, 3), Collections.singletonList(0), 0);
-
-        checkIgnore(onlyCrdIsAlive);
-    }
-
-    /**
-     * @throws Exception if failed.
-     */
-    public void testIgnoreKillThreeNodesWithPersistence() throws Exception {
-        fail("https://issues.apache.org/jira/browse/IGNITE-10041");
-
-        partLossPlc = PartitionLossPolicy.IGNORE;
-
-        isPersistenceEnabled = true;
-
-        // TODO aliveNodes should include node 4, but it fails due to https://issues.apache.org/jira/browse/IGNITE-5078.
-        // TODO need to add 4 to the aliveNodes after IGNITE-5078 is fixed.
-        // TopologyChanger onlyCrdIsAlive = new TopologyChanger(false, Arrays.asList(1, 2, 3), Arrays.asList(0, 4), 0);
-        TopologyChanger onlyCrdIsAlive = new TopologyChanger(false, Arrays.asList(1, 2, 3), Collections.singletonList(0), 0);
-
-        checkIgnore(onlyCrdIsAlive);
-    }
-
-    /**
-     * @param topChanger topology changer.
-     * @throws Exception if failed.
-     */
-    private void checkIgnore(TopologyChanger topChanger) throws Exception {
         topChanger.changeTopology();
 
         for (Ignite ig : G.allGrids()) {
-            IgniteCache<Integer, Integer> cache = ig.cache(CACHE_NAME);
+            IgniteCache<Integer, Integer> cache = ig.cache(DEFAULT_CACHE_NAME);
 
             Collection<Integer> lost = cache.lostPartitions();
 
             assertTrue("[grid=" + ig.name() + ", lost=" + lost.toString() + ']', lost.isEmpty());
 
-            int parts = ig.affinity(CACHE_NAME).partitions();
+            int parts = ig.affinity(DEFAULT_CACHE_NAME).partitions();
 
             for (int i = 0; i < parts; i++) {
                 cache.get(i);
@@ -465,127 +270,109 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe
     private void checkLostPartition(boolean canWrite, boolean safe, TopologyChanger topChanger) throws Exception {
         assert partLossPlc != null;
 
-        List<Integer> lostParts = topChanger.changeTopology();
-
-        // Wait for all grids (servers and client) have same topology version
-        // to make sure that all nodes received map with lost partition.
-        boolean success = GridTestUtils.waitForCondition(() -> {
-            AffinityTopologyVersion last = null;
-            for (Ignite ig : G.allGrids()) {
-                AffinityTopologyVersion ver = ((IgniteEx)ig).context().cache().context().exchange().readyAffinityVersion();
-
-                if (last != null && !last.equals(ver))
-                    return false;
-
-                last = ver;
-            }
-
-            return true;
-        }, 10000);
-
-        assertTrue("Failed to wait for new topology", success);
+        int part = topChanger.changeTopology().get(0);
 
         for (Ignite ig : G.allGrids()) {
             info("Checking node: " + ig.cluster().localNode().id());
 
-            IgniteCache<Integer, Integer> cache = ig.cache(CACHE_NAME);
+            IgniteCache<Integer, Integer> cache = ig.cache(DEFAULT_CACHE_NAME);
 
-            verifyLostPartitions(ig, lostParts);
-
-            verifyCacheOps(canWrite, safe, ig);
-
-            validateQuery(safe, ig);
+            verifyCacheOps(canWrite, safe, part, ig);
 
-            // TODO withPartitionRecover doesn't work with BLT - https://issues.apache.org/jira/browse/IGNITE-10041.
-            if (!isPersistenceEnabled) {
-                // Check we can read and write to lost partition in recovery mode.
-                IgniteCache<Integer, Integer> recoverCache = cache.withPartitionRecover();
+            // Check we can read and write to lost partition in recovery mode.
+            IgniteCache<Integer, Integer> recoverCache = cache.withPartitionRecover();
 
-                for (int lostPart : recoverCache.lostPartitions()) {
-                    recoverCache.get(lostPart);
-                    recoverCache.put(lostPart, lostPart);
-                }
-
-                // Check that writing in recover mode does not clear partition state.
-                verifyLostPartitions(ig, lostParts);
+            for (int lostPart : recoverCache.lostPartitions()) {
+                recoverCache.get(lostPart);
+                recoverCache.put(lostPart, lostPart);
+            }
 
-                verifyCacheOps(canWrite, safe, ig);
+            // Check that writing in recover mode does not clear partition state.
+            verifyCacheOps(canWrite, safe, part, ig);
 
-                validateQuery(safe, ig);
-            }
+            // Validate queries.
+            validateQuery(safe, ig);
         }
 
-        // Bring all nodes back.
-        for (int i : topChanger.killNodes) {
-            IgniteEx grd = startGrid(i);
+        checkNewNode(true, canWrite, safe, part);
+        checkNewNode(false, canWrite, safe, part);
 
-            info("Newly started node: " + grd.cluster().localNode().id());
+        // Check that partition state does not change after we start a new node.
+        IgniteEx grd = startGrid(3);
 
-            // Check that partition state does not change after we start each node.
-            // TODO With persistence enabled LOST partitions become OWNING after a node joins back - https://issues.apache.org/jira/browse/IGNITE-10044.
-            if (!isPersistenceEnabled) {
-                for (Ignite ig : G.allGrids()) {
-                    verifyCacheOps(canWrite, safe, ig);
+        info("Newly started node: " + grd.cluster().localNode().id());
 
-                    // TODO Query effectively waits for rebalance due to https://issues.apache.org/jira/browse/IGNITE-10057
-                    // TODO and after resetLostPartition there is another OWNING copy in the cluster due to https://issues.apache.org/jira/browse/IGNITE-10058.
-                    // TODO Uncomment after https://issues.apache.org/jira/browse/IGNITE-10058 is fixed.
-//                    validateQuery(safe, ig);
-                }
-            }
-        }
+        for (Ignite ig : G.allGrids())
+            verifyCacheOps(canWrite, safe, part, ig);
 
-        ignite(4).resetLostPartitions(Collections.singletonList(CACHE_NAME));
+        ignite(4).resetLostPartitions(Collections.singletonList(DEFAULT_CACHE_NAME));
 
         awaitPartitionMapExchange(true, true, null);
 
         for (Ignite ig : G.allGrids()) {
-            IgniteCache<Integer, Integer> cache = ig.cache(CACHE_NAME);
+            IgniteCache<Integer, Integer> cache = ig.cache(DEFAULT_CACHE_NAME);
 
             assertTrue(cache.lostPartitions().isEmpty());
 
-            int parts = ig.affinity(CACHE_NAME).partitions();
+            int parts = ig.affinity(DEFAULT_CACHE_NAME).partitions();
 
             for (int i = 0; i < parts; i++) {
                 cache.get(i);
 
                 cache.put(i, i);
             }
-
-            for (int i = 0; i < parts; i++) {
-                checkQueryPasses(ig, false, i);
-
-                if (shouldExecuteLocalQuery(ig, i))
-                    checkQueryPasses(ig, true, i);
-
-            }
-
-            checkQueryPasses(ig, false);
         }
     }
 
     /**
-     * @param node Node.
-     * @param lostParts Lost partition IDs.
+     * @param client Client flag.
+     * @param canWrite Can write flag.
+     * @param safe Safe flag.
+     * @param part List of lost partitions.
+     * @throws Exception If failed to start a new node.
      */
-    private void verifyLostPartitions(Ignite node, List<Integer> lostParts) {
-        IgniteCache<Integer, Integer> cache = node.cache(CACHE_NAME);
+    private void checkNewNode(
+        boolean client,
+        boolean canWrite,
+        boolean safe,
+        int part
+    ) throws Exception {
+        this.client = client;
+
+        try {
+            IgniteEx cl = startGrid("newNode");
+
+            CacheGroupContext grpCtx = cl.context().cache().cacheGroup(cacheId(DEFAULT_CACHE_NAME));
+
+            assertTrue(grpCtx.needsRecovery());
 
-        Set<Integer> actualSortedLostParts = new TreeSet<>(cache.lostPartitions());
-        Set<Integer> expSortedLostParts = new TreeSet<>(lostParts);
+            verifyCacheOps(canWrite, safe, part, cl);
 
-        assertEqualsCollections(expSortedLostParts, actualSortedLostParts);
+            validateQuery(safe, cl);
+        }
+        finally {
+            stopGrid("newNode", false);
+
+            this.client = false;
+        }
     }
 
     /**
+     *
      * @param canWrite {@code True} if writes are allowed.
      * @param safe {@code True} if lost partition should trigger exception.
+     * @param part Lost partition ID.
      * @param ig Ignite instance.
      */
-    private void verifyCacheOps(boolean canWrite, boolean safe, Ignite ig) {
-        IgniteCache<Integer, Integer> cache = ig.cache(CACHE_NAME);
+    private void verifyCacheOps(boolean canWrite, boolean safe, int part, Ignite ig) {
+        IgniteCache<Integer, Integer> cache = ig.cache(DEFAULT_CACHE_NAME);
 
-        int parts = ig.affinity(CACHE_NAME).partitions();
+        Collection<Integer> lost = cache.lostPartitions();
+
+        assertTrue("Failed to find expected lost partition [exp=" + part + ", lost=" + lost + ']',
+            lost.contains(part));
+
+        int parts = ig.affinity(DEFAULT_CACHE_NAME).partitions();
 
         // Check read.
         for (int i = 0; i < parts; i++) {
@@ -632,8 +419,8 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe
      * @param nodes List of nodes to find partition.
      * @return List of partitions that aren't primary or backup for specified nodes.
      */
-    private List<Integer> noPrimaryOrBackupPartition(List<Integer> nodes) {
-        Affinity<Object> aff = ignite(4).affinity(CACHE_NAME);
+    protected List<Integer> noPrimaryOrBackupPartition(List<Integer> nodes) {
+        Affinity<Object> aff = ignite(4).affinity(DEFAULT_CACHE_NAME);
 
         List<Integer> parts = new ArrayList<>();
 
@@ -657,6 +444,127 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe
         return parts;
     }
 
+    /** */
+    private class TopologyChanger {
+        /** Flag to delay partition exchange */
+        private boolean delayExchange;
+
+        /** List of nodes to kill */
+        private List<Integer> killNodes;
+
+        /** List of nodes to be alive */
+        private List<Integer> aliveNodes;
+
+        /** Delay between node stops */
+        private long stopDelay;
+
+        /**
+         * @param delayExchange Flag for delay partition exchange.
+         * @param killNodes List of nodes to kill.
+         * @param aliveNodes List of nodes to be alive.
+         * @param stopDelay Delay between stopping nodes.
+         */
+        public TopologyChanger(
+            boolean delayExchange,
+            List<Integer> killNodes,
+            List<Integer> aliveNodes,
+            long stopDelay
+        ) {
+            this.delayExchange = delayExchange;
+            this.killNodes = killNodes;
+            this.aliveNodes = aliveNodes;
+            this.stopDelay = stopDelay;
+        }
+
+        /**
+         * @return Lost partition ID.
+         * @throws Exception If failed.
+         */
+        protected List<Integer> changeTopology() throws Exception {
+            startGrids(4);
+
+            Affinity<Object> aff = ignite(0).affinity(DEFAULT_CACHE_NAME);
+
+            for (int i = 0; i < aff.partitions(); i++)
+                ignite(0).cache(DEFAULT_CACHE_NAME).put(i, i);
+
+            client = true;
+
+            startGrid(4);
+
+            client = false;
+
+            for (int i = 0; i < 5; i++)
+                info(">>> Node [idx=" + i + ", nodeId=" + ignite(i).cluster().localNode().id() + ']');
+
+            awaitPartitionMapExchange();
+
+            final List<Integer> parts = noPrimaryOrBackupPartition(aliveNodes);
+
+            if (parts.isEmpty())
+                throw new IllegalStateException("No partition on nodes: " + killNodes);
+
+            final List<Map<Integer, Semaphore>> lostMap = new ArrayList<>();
+
+            for (int i : aliveNodes) {
+                HashMap<Integer, Semaphore> semaphoreMap = new HashMap<>();
+
+                for (Integer part : parts)
+                    semaphoreMap.put(part, new Semaphore(0));
+
+                lostMap.add(semaphoreMap);
+
+                grid(i).events().localListen(new P1<Event>() {
+                    @Override public boolean apply(Event evt) {
+                        assert evt.type() == EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST;
+
+                        CacheRebalancingEvent cacheEvt = (CacheRebalancingEvent)evt;
+
+                        if (F.eq(DEFAULT_CACHE_NAME, cacheEvt.cacheName())) {
+                            if (semaphoreMap.containsKey(cacheEvt.partition()))
+                                semaphoreMap.get(cacheEvt.partition()).release();
+                        }
+
+                        return true;
+                    }
+                }, EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST);
+            }
+
+            if (delayExchange)
+                delayPartExchange.set(true);
+
+            ExecutorService executor = Executors.newFixedThreadPool(killNodes.size());
+
+            for (Integer node : killNodes) {
+                executor.submit(new Runnable() {
+                    @Override public void run() {
+                        grid(node).close();
+                    }
+                });
+
+                Thread.sleep(stopDelay);
+            }
+
+            executor.shutdown();
+
+            delayPartExchange.set(false);
+
+            Thread.sleep(5_000L);
+
+            for (Map<Integer, Semaphore> map : lostMap) {
+                for (Map.Entry<Integer, Semaphore> entry : map.entrySet())
+                    assertTrue("Failed to wait for partition LOST event for partition:" + entry.getKey(), entry.getValue().tryAcquire(1));
+            }
+
+            for (Map<Integer, Semaphore> map : lostMap) {
+                for (Map.Entry<Integer, Semaphore> entry : map.entrySet())
+                    assertFalse("Partition LOST event raised twice for partition:" + entry.getKey(), entry.getValue().tryAcquire(1));
+            }
+
+            return parts;
+        }
+    }
+
     /**
      * Validate query execution on a node.
      *
@@ -665,7 +573,7 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe
      */
     private void validateQuery(boolean safe, Ignite node) {
         // Get node lost and remaining partitions.
-        IgniteCache<?, ?> cache = node.cache(CACHE_NAME);
+        IgniteCache<?, ?> cache = node.cache(DEFAULT_CACHE_NAME);
 
         Collection<Integer> lostParts = cache.lostPartitions();
 
@@ -673,7 +581,7 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe
 
         Integer remainingPart = null;
 
-        for (int i = 0; i < node.affinity(CACHE_NAME).partitions(); i++) {
+        for (int i = 0; i < node.affinity(DEFAULT_CACHE_NAME).partitions(); i++) {
             if (lostParts.contains(i))
                 continue;
 
@@ -730,7 +638,7 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe
 
         int numOfPrimaryParts = 0;
 
-        for (int nodePrimaryPart : node.affinity(CACHE_NAME).primaryPartitions(node.cluster().localNode())) {
+        for (int nodePrimaryPart : node.affinity(DEFAULT_CACHE_NAME).primaryPartitions(node.cluster().localNode())) {
             for (int part : parts) {
                 if (part == nodePrimaryPart)
                     numOfPrimaryParts++;
@@ -754,7 +662,7 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe
         if (loc)
             return;
 
-        IgniteCache cache = node.cache(CACHE_NAME);
+        IgniteCache cache = node.cache(DEFAULT_CACHE_NAME);
 
         ScanQuery qry = new ScanQuery();
 
@@ -777,124 +685,4 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe
         // TODO Need to add an actual check after https://issues.apache.org/jira/browse/IGNITE-9902 is fixed.
         // No-op.
     }
-
-    /** */
-    private class TopologyChanger {
-        /** Flag to delay partition exchange */
-        private boolean delayExchange;
-
-        /** List of nodes to kill */
-        private List<Integer> killNodes;
-
-        /** List of nodes to be alive */
-        private List<Integer> aliveNodes;
-
-        /** Delay between node stops */
-        private long stopDelay;
-
-        /**
-         * @param delayExchange Flag for delay partition exchange.
-         * @param killNodes List of nodes to kill.
-         * @param aliveNodes List of nodes to be alive.
-         * @param stopDelay Delay between stopping nodes.
-         */
-        private TopologyChanger(boolean delayExchange, List<Integer> killNodes, List<Integer> aliveNodes,
-            long stopDelay) {
-            this.delayExchange = delayExchange;
-            this.killNodes = killNodes;
-            this.aliveNodes = aliveNodes;
-            this.stopDelay = stopDelay;
-        }
-
-        /**
-         * @return Lost partition ID.
-         * @throws Exception If failed.
-         */
-        private List<Integer> changeTopology() throws Exception {
-            startGrids(4);
-
-            if (isPersistenceEnabled)
-                grid(0).cluster().active(true);
-
-            Affinity<Object> aff = ignite(0).affinity(CACHE_NAME);
-
-            for (int i = 0; i < aff.partitions(); i++)
-                ignite(0).cache(CACHE_NAME).put(i, i);
-
-            client = true;
-
-            startGrid(4);
-
-            client = false;
-
-            for (int i = 0; i < 5; i++)
-                info(">>> Node [idx=" + i + ", nodeId=" + ignite(i).cluster().localNode().id() + ']');
-
-            awaitPartitionMapExchange();
-
-            final List<Integer> parts = noPrimaryOrBackupPartition(aliveNodes);
-
-            if (parts.isEmpty())
-                throw new IllegalStateException("No partition on nodes: " + killNodes);
-
-            final List<Map<Integer, Semaphore>> lostMap = new ArrayList<>();
-
-            for (int i : aliveNodes) {
-                HashMap<Integer, Semaphore> semaphoreMap = new HashMap<>();
-
-                for (Integer part : parts)
-                    semaphoreMap.put(part, new Semaphore(0));
-
-                lostMap.add(semaphoreMap);
-
-                grid(i).events().localListen(new P1<Event>() {
-                    @Override public boolean apply(Event evt) {
-                        assert evt.type() == EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST;
-
-                        CacheRebalancingEvent cacheEvt = (CacheRebalancingEvent)evt;
-
-                        if (F.eq(CACHE_NAME, cacheEvt.cacheName())) {
-                            if (semaphoreMap.containsKey(cacheEvt.partition()))
-                                semaphoreMap.get(cacheEvt.partition()).release();
-                        }
-
-                        return true;
-                    }
-                }, EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST);
-            }
-
-            if (delayExchange)
-                delayPartExchange.set(true);
-
-            ExecutorService executor = Executors.newFixedThreadPool(killNodes.size());
-
-            for (Integer node : killNodes) {
-                executor.submit(new Runnable() {
-                    @Override public void run() {
-                        grid(node).close();
-                    }
-                });
-
-                Thread.sleep(stopDelay);
-            }
-
-            executor.shutdown();
-
-            delayPartExchange.set(false);
-
-            Thread.sleep(5_000L);
-
-            for (Map<Integer, Semaphore> map : lostMap) {
-                for (Map.Entry<Integer, Semaphore> entry : map.entrySet())
-                    assertTrue("Failed to wait for partition LOST event for partition:" + entry.getKey(), entry.getValue().tryAcquire(1));
-            }
-
-            for (Map<Integer, Semaphore> map : lostMap) {
-                for (Map.Entry<Integer, Semaphore> entry : map.entrySet())
-                    assertFalse("Partition LOST event raised twice for partition:" + entry.getKey(), entry.getValue().tryAcquire(1));
-            }
-
-            return parts;
-        }
-    }
 }


[2/2] ignite git commit: IGNITE-10207 fix checking lost partition for many cases

Posted by dg...@apache.org.
IGNITE-10207 fix checking lost partition for many cases

Signed-off-by: Dmitriy Govorukhin <dm...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/19d8d902
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/19d8d902
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/19d8d902

Branch: refs/heads/ignite-10207
Commit: 19d8d902f6ee2fb77244549a1a891a901ea2386d
Parents: 162e573
Author: Dmitriy Govorukhin <dm...@gmail.com>
Authored: Mon Nov 12 01:16:18 2018 +0300
Committer: Dmitriy Govorukhin <dm...@gmail.com>
Committed: Mon Nov 12 01:16:18 2018 +0300

----------------------------------------------------------------------
 .../cache/distributed/dht/GridDhtGetFuture.java |  28 ++-
 .../distributed/dht/GridDhtGetSingleFuture.java |  29 ++-
 .../dht/GridDhtTopologyFutureAdapter.java       | 230 ++++++++++++-------
 .../dht/GridPartitionedSingleGetFuture.java     |   7 +
 .../GridDhtPartitionsExchangeFuture.java        |   4 +-
 5 files changed, 205 insertions(+), 93 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/19d8d902/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
index 0bdc6b1..20c94c5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
@@ -37,8 +37,10 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedExceptio
 import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.ReaderArguments;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFutureAdapter.LostPolicyValidator;
 import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException;
 import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.future.GridCompoundFuture;
@@ -55,6 +57,9 @@ import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
+import static java.util.Collections.singleton;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFutureAdapter.OperationType.READ;
+
 /**
  *
  */
@@ -314,14 +319,31 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
             if (part == null)
                 return false;
 
+            if (part.state() == GridDhtPartitionState.LOST && !recovery) {
+                Throwable error = LostPolicyValidator.validate(cctx, key, READ, singleton(part.id()));
+
+                if (error != null) {
+                    onDone(null, error);
+
+                    return false;
+                }
+            }
+
             if (parts == null || !F.contains(parts, part.id())) {
                 // By reserving, we make sure that partition won't be unloaded while processed.
                 if (part.reserve()) {
-                    parts = parts == null ? new int[1] : Arrays.copyOf(parts, parts.length + 1);
+                    if (part.state() == GridDhtPartitionState.OWNING || part.state() == GridDhtPartitionState.LOST) {
+                        parts = parts == null ? new int[1] : Arrays.copyOf(parts, parts.length + 1);
 
-                    parts[parts.length - 1] = part.id();
+                        parts[parts.length - 1] = part.id();
 
-                    return true;
+                        return true;
+                    }
+                    else {
+                        part.release();
+
+                        return false;
+                    }
                 }
                 else
                     return false;

http://git-wip-us.apache.org/repos/asf/ignite/blob/19d8d902/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
index ee46168..94f07e6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
@@ -35,8 +35,10 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedExceptio
 import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.ReaderArguments;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFutureAdapter.LostPolicyValidator;
 import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException;
 import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -47,6 +49,9 @@ import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
+import static java.util.Collections.singleton;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFutureAdapter.OperationType.READ;
+
 /**
  *
  */
@@ -255,7 +260,8 @@ public final class GridDhtGetSingleFuture<K, V> extends GridFutureAdapter<GridCa
         if (!map(key)) {
             retry = cctx.affinity().partition(key);
 
-            onDone((GridCacheEntryInfo)null);
+            if (!isDone())
+                onDone((GridCacheEntryInfo)null);
 
             return;
         }
@@ -285,11 +291,28 @@ public final class GridDhtGetSingleFuture<K, V> extends GridFutureAdapter<GridCa
 
             assert this.part == -1;
 
+            if (part.state() == GridDhtPartitionState.LOST && !recovery) {
+                Throwable error = LostPolicyValidator.validate(cctx, key, READ, singleton(part.id()));
+
+                if (error != null) {
+                    onDone(null, error);
+
+                    return false;
+                }
+            }
+
             // By reserving, we make sure that partition won't be unloaded while processed.
             if (part.reserve()) {
-                this.part = part.id();
+                if (part.state() == GridDhtPartitionState.OWNING || part.state() == GridDhtPartitionState.LOST) {
+                    this.part = part.id();
+
+                    return true;
+                }
+                else {
+                    part.release();
 
-                return true;
+                    return false;
+                }
             }
             else
                 return false;

http://git-wip-us.apache.org/repos/asf/ignite/blob/19d8d902/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFutureAdapter.java
index 9214308..23d0524 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFutureAdapter.java
@@ -29,12 +29,13 @@ import org.apache.ignite.internal.processors.cache.CacheGroupContext;
 import org.apache.ignite.internal.processors.cache.CacheInvalidStateException;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.F;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.cache.PartitionLossPolicy.READ_ONLY_ALL;
 import static org.apache.ignite.cache.PartitionLossPolicy.READ_ONLY_SAFE;
-import static org.apache.ignite.cache.PartitionLossPolicy.READ_WRITE_ALL;
 import static org.apache.ignite.cache.PartitionLossPolicy.READ_WRITE_SAFE;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFutureAdapter.OperationType.WRITE;
 
 /**
  *
@@ -42,7 +43,7 @@ import static org.apache.ignite.cache.PartitionLossPolicy.READ_WRITE_SAFE;
 public abstract class GridDhtTopologyFutureAdapter extends GridFutureAdapter<AffinityTopologyVersion>
     implements GridDhtTopologyFuture {
     /** Cache groups validation results. */
-    protected volatile Map<Integer, CacheValidation> grpValidRes;
+    protected volatile Map<Integer, CacheGroupValidation> grpValidRes;
 
     /** Whether or not cluster is active. */
     protected volatile boolean clusterIsActive = true;
@@ -52,7 +53,7 @@ public abstract class GridDhtTopologyFutureAdapter extends GridFutureAdapter<Aff
      * @param topNodes Topology nodes.
      * @return Validation result.
      */
-    protected final CacheValidation validateCacheGroup(CacheGroupContext grp, Collection<ClusterNode> topNodes) {
+    protected final CacheGroupValidation validateCacheGroup(CacheGroupContext grp, Collection<ClusterNode> topNodes) {
         Collection<Integer> lostParts = grp.isLocal() ?
             Collections.<Integer>emptyList() : grp.topology().lostPartitions();
 
@@ -65,11 +66,11 @@ public abstract class GridDhtTopologyFutureAdapter extends GridFutureAdapter<Aff
                 valid = validator.validate(topNodes);
         }
 
-        return new CacheValidation(valid, lostParts);
+        return new CacheGroupValidation(valid, lostParts);
     }
 
     /** {@inheritDoc} */
-    @Nullable @Override public final Throwable validateCache(
+    @Override public final @Nullable Throwable validateCache(
         GridCacheContext cctx,
         boolean recovery,
         boolean read,
@@ -87,115 +88,174 @@ public abstract class GridDhtTopologyFutureAdapter extends GridFutureAdapter<Aff
             return new CacheInvalidStateException(
                 "Failed to perform cache operation (cluster is not activated): " + cctx.name());
 
+        if (!cctx.shared().kernalContext().state().publicApiActiveState(true))
+            return new CacheInvalidStateException(
+                "Failed to perform cache operation (cluster is not activated): " + cctx.name());
+
+        OperationType opType = read ? OperationType.READ : WRITE;
+
         CacheGroupContext grp = cctx.group();
 
-        PartitionLossPolicy partLossPlc = grp.config().getPartitionLossPolicy();
+        CacheGroupValidation validation = grpValidRes.get(grp.groupId());
 
-        if (grp.needsRecovery() && !recovery) {
-            if (!read && (partLossPlc == READ_ONLY_SAFE || partLossPlc == READ_ONLY_ALL))
-                return new IgniteCheckedException("Failed to write to cache (cache is moved to a read-only state): " +
-                    cctx.name());
-        }
+        if (validation == null)
+            return null;
 
-        if (cctx.shared().readOnlyMode() && !read)
-            return new IgniteCheckedException("Failed to perform cache operation (cluster is in read only mode)" );
+        if (opType == WRITE && !validation.isValid()) {
+            return new IgniteCheckedException("Failed to perform cache operation " +
+                "(cache topology is not valid): " + cctx.name());
+        }
 
-        if (grp.needsRecovery() || grp.topologyValidator() != null) {
-            CacheValidation validation = grpValidRes.get(grp.groupId());
+        if (recovery)
+            return null;
 
-            if (validation == null)
-                return null;
+        if (validation.hasLostPartitions()) {
+            if (key != null)
+                return LostPolicyValidator.validate(cctx, key, opType, validation.lostPartitions());
 
-            if (!validation.valid && !read)
-                return new IgniteCheckedException("Failed to perform cache operation " +
-                    "(cache topology is not valid): " + cctx.name());
+            if (keys != null)
+                return LostPolicyValidator.validate(cctx, keys, opType, validation.lostPartitions());
+        }
 
-            if (recovery || !grp.needsRecovery())
-                return null;
+        return null;
+    }
 
-            if (key != null) {
-                int p = cctx.affinity().partition(key);
+    /**
+     * Cache group validation result.
+     */
+    protected static class CacheGroupValidation {
+        /** Topology validation result. */
+        private final boolean valid;
 
-                CacheInvalidStateException ex = validatePartitionOperation(cctx.name(), read, key, p,
-                    validation.lostParts, partLossPlc);
+        /** Lost partitions on this topology version. */
+        private final Collection<Integer> lostParts;
 
-                if (ex != null)
-                    return ex;
-            }
+        /**
+         * @param valid Valid flag.
+         * @param lostParts Lost partitions.
+         */
+        private CacheGroupValidation(boolean valid, Collection<Integer> lostParts) {
+            this.valid = valid;
+            this.lostParts = lostParts;
+        }
 
-            if (keys != null) {
-                for (Object k : keys) {
-                    int p = cctx.affinity().partition(k);
+        /**
+         * @return True if valid, False if invalide.
+         */
+        public boolean isValid() {
+            return valid;
+        }
 
-                    CacheInvalidStateException ex = validatePartitionOperation(cctx.name(), read, k, p,
-                        validation.lostParts, partLossPlc);
+        /**
+         * @return True if lost partition is present, False if not.
+         */
+        public boolean hasLostPartitions() {
+            return !F.isEmpty(lostParts);
+        }
 
-                    if (ex != null)
-                        return ex;
-                }
-            }
+        /**
+         * @return Lost patition ID collection.
+         */
+        public Collection<Integer> lostPartitions() {
+            return lostParts;
         }
+    }
 
-        return null;
+    /**
+     *
+     */
+    public enum OperationType {
+        /**
+         * Read operation.
+         */
+        READ,
+        /**
+         * Write operation.
+         */
+        WRITE
     }
 
     /**
-     * @param cacheName Cache name.
-     * @param read Read flag.
-     * @param key Key to check.
-     * @param part Partition this key belongs to.
-     * @param lostParts Collection of lost partitions.
-     * @param plc Partition loss policy.
-     * @return Invalid state exception if this operation is disallowed.
+     * Lost policy validator.
      */
-    private CacheInvalidStateException validatePartitionOperation(
-        String cacheName,
-        boolean read,
-        Object key,
-        int part,
-        Collection<Integer> lostParts,
-        PartitionLossPolicy plc
-    ) {
-        if (lostParts.contains(part)) {
-            if (!read) {
-                assert plc == READ_WRITE_ALL || plc == READ_WRITE_SAFE;
+    public static class LostPolicyValidator {
+        /**
+         *
+         */
+        public static Throwable validate(
+            GridCacheContext cctx,
+            Object key,
+            OperationType opType,
+            Collection<Integer> lostParts
+        ) {
+            CacheGroupContext grp = cctx.group();
+
+            PartitionLossPolicy lostPlc = grp.config().getPartitionLossPolicy();
+
+            int partition = cctx.affinity().partition(key);
+
+            return validate(cctx, key, partition, opType, lostPlc, lostParts);
+        }
+
+        /**
+         *
+         */
+        public static Throwable validate(
+            GridCacheContext cctx,
+            Collection<?> keys,
+            OperationType opType,
+            Collection<Integer> lostParts
+        ) {
+            CacheGroupContext grp = cctx.group();
 
-                if (plc == READ_WRITE_SAFE) {
+            PartitionLossPolicy lostPlc = grp.config().getPartitionLossPolicy();
+
+            for (Object key : keys) {
+                int partition = cctx.affinity().partition(key);
+
+                Throwable res = validate(cctx, key, partition, opType, lostPlc, lostParts);
+
+                if (res != null)
+                    return res;
+            }
+
+            return null;
+        }
+
+        /**
+         *
+         */
+        private static Throwable validate(
+            GridCacheContext cctx,
+            Object key,
+            int partition,
+            OperationType opType,
+            PartitionLossPolicy lostPlc,
+            Collection<Integer> lostParts
+        ) {
+            if (opType == WRITE) {
+                if (lostPlc == READ_ONLY_SAFE || lostPlc == READ_ONLY_ALL) {
+                    return new IgniteCheckedException(
+                        "Failed to write to cache (cache is moved to a read-only state): " + cctx.name()
+                    );
+                }
+
+                if (lostParts.contains(partition) && lostPlc == READ_WRITE_SAFE) {
                     return new CacheInvalidStateException("Failed to execute cache operation " +
                         "(all partition owners have left the grid, partition data has been lost) [" +
-                        "cacheName=" + cacheName + ", part=" + part + ", key=" + key + ']');
+                        "cacheName=" + cctx.name() + ", part=" + partition + ", key=" + key + ']');
                 }
             }
-            else {
-                // Read.
-                if (plc == READ_ONLY_SAFE || plc == READ_WRITE_SAFE)
+
+            if (opType == OperationType.READ) {
+                if (lostParts.contains(partition) && (lostPlc == READ_ONLY_SAFE || lostPlc == READ_WRITE_SAFE))
                     return new CacheInvalidStateException("Failed to execute cache operation " +
                         "(all partition owners have left the grid, partition data has been lost) [" +
-                        "cacheName=" + cacheName + ", part=" + part + ", key=" + key + ']');
+                        "cacheName=" + cctx.name() + ", part=" + partition + ", key=" + key + ']'
+                    );
             }
-        }
-
-        return null;
-    }
-
-    /**
-     * Cache validation result.
-     */
-    protected static class CacheValidation {
-        /** Topology validation result. */
-        private boolean valid;
 
-        /** Lost partitions on this topology version. */
-        private Collection<Integer> lostParts;
-
-        /**
-         * @param valid Valid flag.
-         * @param lostParts Lost partitions.
-         */
-        private CacheValidation(boolean valid, Collection<Integer> lostParts) {
-            this.valid = valid;
-            this.lostParts = lostParts;
+            return null;
         }
     }
-
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/19d8d902/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
index 4dc72b2..93cc2ac 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
@@ -764,6 +764,13 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
     private void remap(final AffinityTopologyVersion topVer) {
         cctx.closures().runLocalSafe(new Runnable() {
             @Override public void run() {
+                GridDhtTopologyFuture lastFut = cctx.shared().exchange().lastFinishedFuture();
+
+                Throwable error = lastFut.validateCache(cctx, recovery, true, key, null);
+
+                if (error != null)
+                    onDone(error);
+
                 map(topVer);
             }
         });

http://git-wip-us.apache.org/repos/asf/ignite/blob/19d8d902/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 3702a51..c8471c7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -2077,10 +2077,10 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                 }
             }
 
-            if (serverNodeDiscoveryEvent())
+            if (serverNodeDiscoveryEvent() || localJoinExchange())
                 detectLostPartitions(res);
 
-            Map<Integer, CacheValidation> m = U.newHashMap(cctx.cache().cacheGroups().size());
+            Map<Integer, CacheGroupValidation> m = U.newHashMap(cctx.cache().cacheGroups().size());
 
             for (CacheGroupContext grp : cctx.cache().cacheGroups())
                 m.put(grp.groupId(), validateCacheGroup(grp, events().lastEvent().topologyNodes()));