You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/01/18 16:05:48 UTC

[1/2] ignite git commit: ignite-1811 Optimized cache 'get' on affinity node.

Repository: ignite
Updated Branches:
  refs/heads/master d85616b9b -> 83b2bf5e1


http://git-wip-us.apache.org/repos/asf/ignite/blob/83b2bf5e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheGetRestartTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheGetRestartTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheGetRestartTest.java
new file mode 100644
index 0000000..b14109b
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheGetRestartTest.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.cache.CacheRebalanceMode.ASYNC;
+
+/**
+ *
+ */
+public class IgniteCacheGetRestartTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static final long TEST_TIME = 60_000;
+
+    /** */
+    private static final int SRVS = 3;
+
+    /** */
+    private static final int CLIENTS = 1;
+
+    /** */
+    private static final int KEYS = 100_000;
+
+    /** */
+    private ThreadLocal<Boolean> client = new ThreadLocal<>();
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+        Boolean clientMode = client.get();
+
+        if (clientMode != null) {
+            cfg.setClientMode(clientMode);
+
+            client.remove();
+        }
+
+        cfg.setConsistentId(gridName);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGrids(SRVS);
+
+        for (int i = 0; i < CLIENTS; i++) {
+            client.set(true);
+
+            Ignite client = startGrid(SRVS);
+
+            assertTrue(client.configuration().isClientMode());
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        super.afterTestsStopped();
+
+        stopAllGrids();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected long getTestTimeout() {
+        return TEST_TIME + 60_000;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testGetRestartReplicated() throws Exception {
+        CacheConfiguration<Object, Object> cache = cacheConfiguration(REPLICATED, 0, false);
+
+        checkRestart(cache, 3);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testGetRestartPartitioned1() throws Exception {
+        CacheConfiguration<Object, Object> cache = cacheConfiguration(PARTITIONED, 1, false);
+
+        checkRestart(cache, 1);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testGetRestartPartitioned2() throws Exception {
+        CacheConfiguration<Object, Object> cache = cacheConfiguration(PARTITIONED, 2, false);
+
+        checkRestart(cache, 2);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testGetRestartPartitionedNearEnabled() throws Exception {
+        CacheConfiguration<Object, Object> cache = cacheConfiguration(PARTITIONED, 1, true);
+
+        checkRestart(cache, 1);
+    }
+
+    /**
+     * @param ccfg Cache configuration.
+     * @param restartCnt Number of nodes to restart.
+     * @throws Exception If failed.
+     */
+    private void checkRestart(final CacheConfiguration ccfg, final int restartCnt) throws Exception {
+        ignite(0).createCache(ccfg);
+
+        try {
+            if (ccfg.getNearConfiguration() != null)
+                ignite(SRVS).createNearCache(ccfg.getName(), new NearCacheConfiguration<>());
+
+            try (IgniteDataStreamer<Object, Object> streamer = ignite(0).dataStreamer(ccfg.getName())) {
+                for (int i = 0; i < KEYS; i++)
+                    streamer.addData(i, i);
+            }
+
+            final long stopTime = U.currentTimeMillis() + TEST_TIME;
+
+            final AtomicInteger nodeIdx = new AtomicInteger();
+
+            IgniteInternalFuture<?> fut1 = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
+                @Override public Void call() throws Exception {
+                    Ignite ignite = ignite(nodeIdx.getAndIncrement());
+
+                    log.info("Check get [node=" + ignite.name() +
+                        ", client=" + ignite.configuration().isClientMode() + ']');
+
+                    IgniteCache<Object, Object> cache = ignite.cache(ccfg.getName());
+
+                    while (U.currentTimeMillis() < stopTime)
+                        checkGet(cache);
+
+                    return null;
+                }
+            }, SRVS + CLIENTS, "get-thread");
+
+            final AtomicInteger restartNodeIdx = new AtomicInteger(SRVS + CLIENTS);
+
+            final AtomicBoolean clientNode = new AtomicBoolean();
+
+            IgniteInternalFuture<?> fut2 = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
+                @Override public Void call() throws Exception {
+                    int nodeIdx = restartNodeIdx.getAndIncrement();
+
+                    boolean clientMode = clientNode.compareAndSet(false, true);
+
+                    while (U.currentTimeMillis() < stopTime) {
+                        if (clientMode)
+                            client.set(true);
+
+                        log.info("Restart node [node=" + nodeIdx + ", client=" + clientMode + ']');
+
+                        Ignite ignite = startGrid(nodeIdx);
+
+                        IgniteCache<Object, Object> cache;
+
+                        if (clientMode && ccfg.getNearConfiguration() != null)
+                            cache = ignite.createNearCache(ccfg.getName(), new NearCacheConfiguration<>());
+                        else
+                            cache = ignite.cache(ccfg.getName());
+
+                        checkGet(cache);
+
+                        IgniteInternalFuture<?> syncFut = ((IgniteCacheProxy)cache).context().preloader().syncFuture();
+
+                        while (!syncFut.isDone())
+                            checkGet(cache);
+
+                        checkGet(cache);
+
+                        stopGrid(nodeIdx);
+                    }
+
+                    return null;
+                }
+            }, restartCnt + 1, "restart-thread");
+
+            fut1.get();
+            fut2.get();
+        }
+        finally {
+            ignite(0).destroyCache(ccfg.getName());
+        }
+    }
+
+    /**
+     * @param cache Cache.
+     */
+    private void checkGet(IgniteCache<Object, Object> cache) {
+        for (int i = 0; i < KEYS; i++)
+            assertEquals(i, cache.get(i));
+
+        Set<Integer> keys = new HashSet<>();
+
+        for (int i = 0; i < KEYS; i++) {
+            keys.add(i);
+
+            if (keys.size() == 100) {
+                Map<Object, Object> vals = cache.getAll(keys);
+
+                for (Object key : keys)
+                    assertEquals(key, vals.get(key));
+
+                keys.clear();
+            }
+        }
+    }
+
+    /**
+     * @param cacheMode Cache mode.
+     * @param backups Number of backups.
+     * @param near If {@code true} near cache is enabled.
+     * @return Cache configuration.
+     */
+    private CacheConfiguration<Object, Object> cacheConfiguration(CacheMode cacheMode, int backups, boolean near) {
+        CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
+
+        ccfg.setCacheMode(cacheMode);
+
+        if (cacheMode != REPLICATED)
+            ccfg.setBackups(backups);
+
+        if (near)
+            ccfg.setNearConfiguration(new NearCacheConfiguration<>());
+
+        ccfg.setRebalanceMode(ASYNC);
+
+        return ccfg;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/83b2bf5e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheReadFromBackupTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheReadFromBackupTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheReadFromBackupTest.java
new file mode 100644
index 0000000..af018cc
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheReadFromBackupTest.java
@@ -0,0 +1,427 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.cache.Cache;
+import javax.cache.configuration.Factory;
+import javax.cache.integration.CacheLoaderException;
+import javax.cache.integration.CacheWriterException;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.store.CacheStore;
+import org.apache.ignite.cache.store.CacheStoreAdapter;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetRequest;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+
+/**
+ *
+ */
+public class IgniteCacheReadFromBackupTest extends GridCommonAbstractTest {
+    /** */
+    private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static final int NODES = 4;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        TestRecordingCommunicationSpi commSpi = new TestRecordingCommunicationSpi();
+
+        cfg.setCommunicationSpi(commSpi);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGridsMultiThreaded(NODES);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        super.afterTestsStopped();
+
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testGetFromBackupStoreReadThroughEnabled() throws Exception {
+        for (CacheConfiguration<Object, Object> ccfg : cacheConfigurations()) {
+            ccfg.setCacheStoreFactory(new TestStoreFactory());
+            ccfg.setReadThrough(true);
+
+            boolean near = (ccfg.getNearConfiguration() != null);
+
+            log.info("Test cache [mode=" + ccfg.getCacheMode() +
+                ", atomicity=" + ccfg.getAtomicityMode() +
+                ", backups=" + ccfg.getBackups() +
+                ", near=" + near + "]");
+
+            ignite(0).createCache(ccfg);
+
+            awaitPartitionMapExchange();
+
+            try {
+                for (int i = 0; i < NODES; i++) {
+                    Ignite ignite = ignite(i);
+
+                    log.info("Check node: " + ignite.name());
+
+                    IgniteCache<Integer, Integer> cache = ignite.cache(ccfg.getName());
+
+                    TestRecordingCommunicationSpi spi = recordGetRequests(ignite, near);
+
+                    Integer key = backupKey(cache);
+
+                    assertNull(cache.get(key));
+
+                    List<Object> msgs = spi.recordedMessages();
+
+                    assertEquals(1, msgs.size());
+                }
+            }
+            finally {
+                ignite(0).destroyCache(ccfg.getName());
+            }
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testGetFromBackupStoreReadThroughDisabled() throws Exception {
+        for (CacheConfiguration<Object, Object> ccfg : cacheConfigurations()) {
+            ccfg.setCacheStoreFactory(new TestStoreFactory());
+            ccfg.setReadThrough(false);
+
+            boolean near = (ccfg.getNearConfiguration() != null);
+
+            log.info("Test cache [mode=" + ccfg.getCacheMode() +
+                ", atomicity=" + ccfg.getAtomicityMode() +
+                ", backups=" + ccfg.getBackups() +
+                ", near=" + near + "]");
+
+            ignite(0).createCache(ccfg);
+
+            awaitPartitionMapExchange();
+
+            try {
+                checkLocalRead(NODES, ccfg);
+            }
+            finally {
+                ignite(0).destroyCache(ccfg.getName());
+            }
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testGetFromPrimaryPreloadInProgress() throws Exception {
+        for (final CacheConfiguration<Object, Object> ccfg : cacheConfigurations()) {
+            boolean near = (ccfg.getNearConfiguration() != null);
+
+            log.info("Test cache [mode=" + ccfg.getCacheMode() +
+                ", atomicity=" + ccfg.getAtomicityMode() +
+                ", backups=" + ccfg.getBackups() +
+                ", near=" + near + "]");
+
+            ignite(0).createCache(ccfg);
+
+            awaitPartitionMapExchange();
+
+            try {
+                Map<Ignite, Integer> backupKeys = new HashMap<>();
+                Map<Ignite, Integer> nearKeys = new HashMap<>();
+
+                for (int i = 0; i < NODES; i++) {
+                    Ignite ignite = ignite(i);
+
+                    IgniteCache<Integer, Integer> cache = ignite.cache(ccfg.getName());
+
+                    backupKeys.put(ignite, backupKey(cache));
+
+                    if (ccfg.getCacheMode() == PARTITIONED)
+                        nearKeys.put(ignite, nearKey(cache));
+
+                    TestRecordingCommunicationSpi spi =
+                        (TestRecordingCommunicationSpi)ignite.configuration().getCommunicationSpi();
+
+                    spi.blockMessages(new IgnitePredicate<GridIoMessage>() {
+                        @Override public boolean apply(GridIoMessage ioMsg) {
+                            if (!ioMsg.message().getClass().equals(GridDhtPartitionSupplyMessageV2.class))
+                                return false;
+
+                            GridDhtPartitionSupplyMessageV2 msg = (GridDhtPartitionSupplyMessageV2)ioMsg.message();
+
+                            return msg.cacheId() == CU.cacheId(ccfg.getName());
+                        }
+                    });
+                }
+
+                try (Ignite newNode = startGrid(NODES)) {
+                    IgniteCache<Integer, Integer> cache = newNode.cache(ccfg.getName());
+
+                    TestRecordingCommunicationSpi newNodeSpi = recordGetRequests(newNode, near);
+
+                    Integer key = backupKey(cache);
+
+                    assertNull(cache.get(key));
+
+                    List<Object> msgs = newNodeSpi.recordedMessages();
+
+                    assertEquals(1, msgs.size());
+
+                    for (int i = 0; i < NODES; i++) {
+                        Ignite ignite = ignite(i);
+
+                        log.info("Check node: " + ignite.name());
+
+                        checkLocalRead(ignite, ccfg, backupKeys.get(ignite), nearKeys.get(ignite));
+                    }
+
+                    for (int i = 0; i < NODES; i++) {
+                        Ignite ignite = ignite(i);
+
+                        TestRecordingCommunicationSpi spi =
+                            (TestRecordingCommunicationSpi)ignite.configuration().getCommunicationSpi();
+
+                        spi.stopBlock();
+                    }
+
+                    awaitPartitionMapExchange();
+
+                    checkLocalRead(NODES + 1, ccfg);
+                }
+            }
+            finally {
+                ignite(0).destroyCache(ccfg.getName());
+            }
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNoPrimaryReadPreloadFinished() throws Exception {
+        for (CacheConfiguration<Object, Object> ccfg : cacheConfigurations()) {
+            boolean near = (ccfg.getNearConfiguration() != null);
+
+            log.info("Test cache [mode=" + ccfg.getCacheMode() +
+                ", atomicity=" + ccfg.getAtomicityMode() +
+                ", backups=" + ccfg.getBackups() +
+                ", near=" + near + "]");
+
+            ignite(0).createCache(ccfg);
+
+            awaitPartitionMapExchange();
+
+            try {
+                checkLocalRead(NODES, ccfg);
+            }
+            finally {
+                ignite(0).destroyCache(ccfg.getName());
+            }
+        }
+    }
+
+    /**
+     * @param nodes Number of nodes.
+     * @param ccfg Cache configuration.
+     * @throws Exception If failed.
+     */
+    private void checkLocalRead(int nodes, CacheConfiguration<Object, Object> ccfg) throws Exception {
+        for (int i = 0; i < nodes; i++) {
+            Ignite ignite = ignite(i);
+
+            log.info("Check node: " + ignite.name());
+
+            IgniteCache<Integer, Integer> cache = ignite.cache(ccfg.getName());
+
+            List<Integer> backupKeys = backupKeys(cache, 2, 0);
+
+            Integer backupKey = backupKeys.get(0);
+
+            Integer nearKey = ccfg.getCacheMode() == PARTITIONED ? nearKey(cache) : null;
+
+            checkLocalRead(ignite, ccfg, backupKey, nearKey);
+
+            Set<Integer> keys = new HashSet<>(backupKeys);
+
+            Map<Integer, Integer> vals = cache.getAll(keys);
+
+            for (Integer key : keys)
+                assertNull(vals.get(key));
+
+            TestRecordingCommunicationSpi spi =
+                (TestRecordingCommunicationSpi)ignite.configuration().getCommunicationSpi();
+
+            List<Object> msgs = spi.recordedMessages();
+
+            assertEquals(0, msgs.size());
+        }
+    }
+
+    /**
+     * @param ignite Node.
+     * @param ccfg Cache configuration.
+     * @param backupKey Backup key.
+     * @param nearKey Near key.
+     * @throws Exception If failed.
+     */
+    private void checkLocalRead(Ignite ignite,
+        CacheConfiguration<Object, Object> ccfg,
+        Integer backupKey,
+        Integer nearKey) throws Exception {
+        IgniteCache<Integer, Integer> cache = ignite.cache(ccfg.getName());
+
+        TestRecordingCommunicationSpi spi = recordGetRequests(ignite, ccfg.getNearConfiguration() != null);
+
+        List<Object> msgs;
+
+        if (nearKey != null) {
+            assertNull(cache.get(nearKey));
+
+            msgs = spi.recordedMessages();
+
+            assertEquals(1, msgs.size());
+        }
+
+        assertNull(cache.get(backupKey));
+
+        msgs = spi.recordedMessages();
+
+        assertTrue(msgs.isEmpty());
+    }
+
+    /**
+     * @param ignite Node.
+     * @param near Near cache flag.
+     * @return Communication SPI.
+     */
+    private TestRecordingCommunicationSpi recordGetRequests(Ignite ignite, boolean near) {
+        TestRecordingCommunicationSpi spi =
+            (TestRecordingCommunicationSpi)ignite.configuration().getCommunicationSpi();
+
+        spi.record(near ? GridNearGetRequest.class : GridNearSingleGetRequest.class);
+
+        return spi;
+    }
+
+    /**
+     * @return Cache configurations to test.
+     */
+    private List<CacheConfiguration<Object, Object>> cacheConfigurations() {
+        List<CacheConfiguration<Object, Object>> ccfgs = new ArrayList<>();
+
+        ccfgs.add(cacheConfiguration(REPLICATED, ATOMIC, 0, false));
+        ccfgs.add(cacheConfiguration(REPLICATED, TRANSACTIONAL, 0, false));
+
+        ccfgs.add(cacheConfiguration(PARTITIONED, ATOMIC, 1, false));
+        ccfgs.add(cacheConfiguration(PARTITIONED, ATOMIC, 1, true));
+        ccfgs.add(cacheConfiguration(PARTITIONED, ATOMIC, 2, false));
+
+        ccfgs.add(cacheConfiguration(PARTITIONED, TRANSACTIONAL, 1, false));
+        ccfgs.add(cacheConfiguration(PARTITIONED, TRANSACTIONAL, 1, true));
+        ccfgs.add(cacheConfiguration(PARTITIONED, TRANSACTIONAL, 2, false));
+
+        return ccfgs;
+    }
+
+    /**
+     * @param cacheMode Cache mode.
+     * @param atomicityMode Cache atomicity mode.
+     * @param backups Number of backups.
+     * @param nearEnabled {@code True} if near cache should be enabled.
+     * @return Cache configuration.
+     */
+    private CacheConfiguration<Object, Object> cacheConfiguration(CacheMode cacheMode,
+        CacheAtomicityMode atomicityMode,
+        int backups,
+        boolean nearEnabled) {
+        CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
+
+        ccfg.setCacheMode(cacheMode);
+        ccfg.setAtomicityMode(atomicityMode);
+
+        if (cacheMode != REPLICATED) {
+            ccfg.setBackups(backups);
+
+            if (nearEnabled)
+                ccfg.setNearConfiguration(new NearCacheConfiguration<>());
+        }
+
+        return ccfg;
+    }
+
+    /**
+     *
+     */
+    private static class TestStoreFactory implements Factory<CacheStore<Object, Object>> {
+        /** {@inheritDoc} */
+        @SuppressWarnings("unchecked")
+        @Override public CacheStore<Object, Object> create() {
+            return new CacheStoreAdapter() {
+                @Override public Object load(Object key) throws CacheLoaderException {
+                    return null;
+                }
+
+                @Override public void write(Cache.Entry entry) throws CacheWriterException {
+                    // No-op.
+                }
+
+                @Override public void delete(Object key) throws CacheWriterException {
+                    // No-op.
+                }
+            };
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/83b2bf5e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSingleGetMessageTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSingleGetMessageTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSingleGetMessageTest.java
index 42b5ee3..48fc961 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSingleGetMessageTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSingleGetMessageTest.java
@@ -21,26 +21,19 @@ import java.util.ArrayList;
 import java.util.List;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
-import org.apache.ignite.IgniteException;
 import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
-import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetRequest;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetResponse;
-import org.apache.ignite.lang.IgniteInClosure;
-import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.spi.IgniteSpiException;
-import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.apache.ignite.transactions.Transaction;
-import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
 import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
@@ -69,7 +62,7 @@ public class IgniteCacheSingleGetMessageTest extends GridCommonAbstractTest {
 
         cfg.setClientMode(client);
 
-        TestCommunicationSpi commSpi = new TestCommunicationSpi();
+        TestRecordingCommunicationSpi commSpi = new TestRecordingCommunicationSpi();
 
         cfg.setCommunicationSpi(commSpi);
 
@@ -156,7 +149,7 @@ public class IgniteCacheSingleGetMessageTest extends GridCommonAbstractTest {
 
         Ignite node = cache.unwrap(Ignite.class);
 
-        TestCommunicationSpi spi = (TestCommunicationSpi)node.configuration().getCommunicationSpi();
+        TestRecordingCommunicationSpi spi = (TestRecordingCommunicationSpi)node.configuration().getCommunicationSpi();
 
         spi.record(GridNearSingleGetRequest.class);
 
@@ -164,17 +157,24 @@ public class IgniteCacheSingleGetMessageTest extends GridCommonAbstractTest {
 
         assertNotSame(node, primary);
 
-        TestCommunicationSpi primarySpi = (TestCommunicationSpi)primary.configuration().getCommunicationSpi();
+        TestRecordingCommunicationSpi primarySpi =
+            (TestRecordingCommunicationSpi)primary.configuration().getCommunicationSpi();
 
         primarySpi.record(GridNearSingleGetResponse.class);
 
         assertNull(cache.get(key));
 
-        checkMessages(spi, primarySpi);
+        if (backup)
+            checkNoMessages(spi, primarySpi);
+        else
+            checkMessages(spi, primarySpi);
 
         assertFalse(cache.containsKey(key));
 
-        checkMessages(spi, primarySpi);
+        if (backup)
+            checkNoMessages(spi, primarySpi);
+        else
+            checkMessages(spi, primarySpi);
 
         cache.put(key, 1);
 
@@ -201,7 +201,10 @@ public class IgniteCacheSingleGetMessageTest extends GridCommonAbstractTest {
                 tx.commit();
             }
 
-            checkMessages(spi, primarySpi);
+            if (backup)
+                checkNoMessages(spi, primarySpi);
+            else
+                checkMessages(spi, primarySpi);
 
             try (Transaction tx = node.transactions().txStart(OPTIMISTIC, REPEATABLE_READ)) {
                 assertFalse(cache.containsKey(key));
@@ -209,7 +212,10 @@ public class IgniteCacheSingleGetMessageTest extends GridCommonAbstractTest {
                 tx.commit();
             }
 
-            checkMessages(spi, primarySpi);
+            if (backup)
+                checkNoMessages(spi, primarySpi);
+            else
+                checkMessages(spi, primarySpi);
 
             cache.put(key, 1);
 
@@ -241,7 +247,7 @@ public class IgniteCacheSingleGetMessageTest extends GridCommonAbstractTest {
      * @param spi Near node SPI.
      * @param primarySpi Primary node SPI.
      */
-    private void checkMessages(TestCommunicationSpi spi, TestCommunicationSpi primarySpi) {
+    private void checkMessages(TestRecordingCommunicationSpi spi, TestRecordingCommunicationSpi primarySpi) {
         List<Object> msgs = spi.recordedMessages();
 
         assertEquals(1, msgs.size());
@@ -257,7 +263,7 @@ public class IgniteCacheSingleGetMessageTest extends GridCommonAbstractTest {
      * @param spi Near node SPI.
      * @param primarySpi Primary node SPI.
      */
-    private void checkNoMessages(TestCommunicationSpi spi, TestCommunicationSpi primarySpi) {
+    private void checkNoMessages(TestRecordingCommunicationSpi spi, TestRecordingCommunicationSpi primarySpi) {
         List<Object> msgs = spi.recordedMessages();
         assertEquals(0, msgs.size());
 
@@ -306,52 +312,4 @@ public class IgniteCacheSingleGetMessageTest extends GridCommonAbstractTest {
 
         return ccfg;
     }
-
-    /**
-     *
-     */
-    private static class TestCommunicationSpi extends TcpCommunicationSpi {
-        /** */
-        private Class<?> recordCls;
-
-        /** */
-        private List<Object> recordedMsgs = new ArrayList<>();
-
-        /** {@inheritDoc} */
-        @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC)
-            throws IgniteSpiException {
-            if (msg instanceof GridIoMessage) {
-                Object msg0 = ((GridIoMessage)msg).message();
-
-                synchronized (this) {
-                    if (recordCls != null && msg0.getClass().equals(recordCls))
-                        recordedMsgs.add(msg0);
-                }
-            }
-
-            super.sendMessage(node, msg, ackC);
-        }
-
-        /**
-         * @param recordCls Message class to record.
-         */
-        void record(@Nullable Class<?> recordCls) {
-            synchronized (this) {
-                this.recordCls = recordCls;
-            }
-        }
-
-        /**
-         * @return Recorded messages.
-         */
-        List<Object> recordedMessages() {
-            synchronized (this) {
-                List<Object> msgs = recordedMsgs;
-
-                recordedMsgs = new ArrayList<>();
-
-                return msgs;
-            }
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/83b2bf5e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java
index 68cac17..94613db 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java
@@ -300,6 +300,7 @@ public class IgniteCrossCacheTxStoreSelfTest extends GridCommonAbstractTest {
             throws CacheLoaderException {
         }
 
+        /** {@inheritDoc} */
         @Override public void sessionEnd(boolean commit) throws CacheWriterException {
             evts.offer("sessionEnd " + commit);
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/83b2bf5e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadMessageCountTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadMessageCountTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadMessageCountTest.java
index fd94150..0666349 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadMessageCountTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadMessageCountTest.java
@@ -17,25 +17,17 @@
 
 package org.apache.ignite.internal.processors.cache.distributed.dht;
 
-import java.util.Collection;
-import java.util.concurrent.ConcurrentLinkedQueue;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
-import org.apache.ignite.IgniteException;
 import org.apache.ignite.cache.CachePeekMode;
 import org.apache.ignite.cache.CacheRebalanceMode;
 import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
-import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader;
 import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteInClosure;
-import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.spi.IgniteSpiException;
-import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
@@ -79,7 +71,11 @@ public class GridCacheDhtPreloadMessageCountTest extends GridCommonAbstractTest
         c.setDiscoverySpi(disco);
         c.setCacheConfiguration(cc);
 
-        c.setCommunicationSpi(new TestCommunicationSpi());
+        TestRecordingCommunicationSpi commSpi = new TestRecordingCommunicationSpi();
+
+        commSpi.record(GridDhtPartitionsSingleMessage.class);
+
+        c.setCommunicationSpi(commSpi);
 
         return c;
     }
@@ -110,11 +106,13 @@ public class GridCacheDhtPreloadMessageCountTest extends GridCommonAbstractTest
         IgniteCache<String, Integer> c1 = g1.cache(null);
         IgniteCache<String, Integer> c2 = g2.cache(null);
 
-        TestCommunicationSpi spi0 = (TestCommunicationSpi)g0.configuration().getCommunicationSpi();
-        TestCommunicationSpi spi1 = (TestCommunicationSpi)g1.configuration().getCommunicationSpi();
-        TestCommunicationSpi spi2 = (TestCommunicationSpi)g2.configuration().getCommunicationSpi();
+        TestRecordingCommunicationSpi spi0 = (TestRecordingCommunicationSpi)g0.configuration().getCommunicationSpi();
+        TestRecordingCommunicationSpi spi1 = (TestRecordingCommunicationSpi)g1.configuration().getCommunicationSpi();
+        TestRecordingCommunicationSpi spi2 = (TestRecordingCommunicationSpi)g2.configuration().getCommunicationSpi();
 
-        info(spi0.sentMessages().size() + " " + spi1.sentMessages().size() + " " + spi2.sentMessages().size());
+        info(spi0.recordedMessages().size() + " " +
+            spi1.recordedMessages().size() + " " +
+            spi2.recordedMessages().size());
 
         checkCache(c0, cnt);
         checkCache(c1, cnt);
@@ -137,40 +135,4 @@ public class GridCacheDhtPreloadMessageCountTest extends GridCommonAbstractTest
                 assertEquals(Integer.valueOf(i), c.localPeek(key, CachePeekMode.ONHEAP));
         }
     }
-
-    /**
-     * Communication SPI that will count single partition update messages.
-     */
-    private static class TestCommunicationSpi extends TcpCommunicationSpi {
-        /** Recorded messages. */
-        private Collection<GridDhtPartitionsSingleMessage> sentMsgs = new ConcurrentLinkedQueue<>();
-
-        /** {@inheritDoc} */
-        @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackClosure)
-            throws IgniteSpiException {
-            recordMessage((GridIoMessage)msg);
-
-            super.sendMessage(node, msg, ackClosure);
-        }
-
-        /**
-         * @return Collection of sent messages.
-         */
-        public Collection<GridDhtPartitionsSingleMessage> sentMessages() {
-            return sentMsgs;
-        }
-
-        /**
-         * Adds message to a list if message is of correct type.
-         *
-         * @param msg Message.
-         */
-        private void recordMessage(GridIoMessage msg) {
-            if (msg.message() instanceof GridDhtPartitionsSingleMessage) {
-                GridDhtPartitionsSingleMessage partSingleMsg = (GridDhtPartitionsSingleMessage)msg.message();
-
-                sentMsgs.add(partSingleMsg);
-            }
-        }
-    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/83b2bf5e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheGetStoreErrorSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheGetStoreErrorSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheGetStoreErrorSelfTest.java
index 7bd845a..3e6a245 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheGetStoreErrorSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheGetStoreErrorSelfTest.java
@@ -107,7 +107,11 @@ public class GridCacheGetStoreErrorSelfTest extends GridCommonAbstractTest {
         checkGetError(false, LOCAL);
     }
 
-    /** @throws Exception If failed. */
+    /**
+     * @param nearEnabled Near cache flag.
+     * @param cacheMode Cache mode.
+     * @throws Exception If failed.
+     */
     private void checkGetError(boolean nearEnabled, CacheMode cacheMode) throws Exception {
         this.nearEnabled = nearEnabled;
         this.cacheMode = cacheMode;
@@ -147,14 +151,17 @@ public class GridCacheGetStoreErrorSelfTest extends GridCommonAbstractTest {
      */
     @SuppressWarnings("PublicInnerClass")
     public static class TestStore extends CacheStoreAdapter<Object, Object> {
+        /** {@inheritDoc} */
         @Override public Object load(Object key) {
             throw new IgniteException("Failed to get key from store: " + key);
         }
 
+        /** {@inheritDoc} */
         @Override public void write(Cache.Entry<?, ?> entry) {
             // No-op.
         }
 
+        /** {@inheritDoc} */
         @Override public void delete(Object key) {
             // No-op.
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/83b2bf5e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedNodeRestartTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedNodeRestartTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedNodeRestartTest.java
index 684d6e4..f32a5f7 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedNodeRestartTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedNodeRestartTest.java
@@ -23,7 +23,7 @@ import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.processors.cache.distributed.GridCacheAbstractNodeRestartSelfTest;
 
 import static org.apache.ignite.cache.CacheMode.PARTITIONED;
-import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
 
 /**
@@ -46,7 +46,7 @@ public class GridCachePartitionedNodeRestartTest extends GridCacheAbstractNodeRe
         cc.setName(CACHE_NAME);
         cc.setAtomicityMode(atomicityMode());
         cc.setCacheMode(PARTITIONED);
-        cc.setWriteSynchronizationMode(FULL_ASYNC);
+        cc.setWriteSynchronizationMode(FULL_SYNC);
         cc.setNearConfiguration(null);
         cc.setStartSize(20);
         cc.setRebalanceMode(rebalancMode);

http://git-wip-us.apache.org/repos/asf/ignite/blob/83b2bf5e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedOptimisticTxNodeRestartTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedOptimisticTxNodeRestartTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedOptimisticTxNodeRestartTest.java
index a458aa7..ab7caad 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedOptimisticTxNodeRestartTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedOptimisticTxNodeRestartTest.java
@@ -25,7 +25,7 @@ import org.apache.ignite.internal.processors.cache.distributed.GridCacheAbstract
 import org.apache.ignite.transactions.TransactionConcurrency;
 
 import static org.apache.ignite.cache.CacheMode.PARTITIONED;
-import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
 
 /**
@@ -53,7 +53,7 @@ public class GridCachePartitionedOptimisticTxNodeRestartTest extends GridCacheAb
 
         cc.setName(CACHE_NAME);
         cc.setCacheMode(PARTITIONED);
-        cc.setWriteSynchronizationMode(FULL_ASYNC);
+        cc.setWriteSynchronizationMode(FULL_SYNC);
         cc.setStartSize(20);
         cc.setRebalanceMode(rebalancMode);
         cc.setRebalanceBatchSize(rebalancBatchSize);

http://git-wip-us.apache.org/repos/asf/ignite/blob/83b2bf5e/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite2.java
index de87e99..0513786 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite2.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite2.java
@@ -23,6 +23,7 @@ import org.apache.ignite.internal.processors.cache.IgniteCacheAtomicPutAllFailov
 import org.apache.ignite.internal.processors.cache.IgniteCachePutAllRestartTest;
 import org.apache.ignite.internal.processors.cache.distributed.IgniteBinaryMetadataUpdateNodeRestartTest;
 import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheAtomicNodeRestartTest;
+import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheGetRestartTest;
 import org.apache.ignite.internal.processors.cache.distributed.replicated.IgniteCacheAtomicReplicatedNodeRestartSelfTest;
 
 /**
@@ -45,6 +46,8 @@ public class IgniteCacheRestartTestSuite2 extends TestSuite {
 
         suite.addTestSuite(IgniteBinaryMetadataUpdateNodeRestartTest.class);
 
+        suite.addTestSuite(IgniteCacheGetRestartTest.class);
+
         return suite;
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/83b2bf5e/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
index 04d0881..68e52df 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
@@ -84,6 +84,7 @@ import org.apache.ignite.internal.processors.cache.distributed.CacheGetFutureHan
 import org.apache.ignite.internal.processors.cache.distributed.CacheNoValueClassOnServerNodeTest;
 import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheCreatePutMultiNodeSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheCreatePutTest;
+import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheReadFromBackupTest;
 import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheSingleGetMessageTest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridCacheDhtTxPreloadSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCacheLockFailoverSelfTest;
@@ -289,6 +290,7 @@ public class IgniteCacheTestSuite4 extends TestSuite {
         suite.addTestSuite(CacheGetFutureHangsSelfTest.class);
 
         suite.addTestSuite(IgniteCacheSingleGetMessageTest.class);
+        suite.addTestSuite(IgniteCacheReadFromBackupTest.class);
 
         suite.addTestSuite(IgniteCacheGetCustomCollectionsSelfTest.class);
         suite.addTestSuite(IgniteCacheLoadRebalanceEvictionSelfTest.class);


[2/2] ignite git commit: ignite-1811 Optimized cache 'get' on affinity node.

Posted by sb...@apache.org.
ignite-1811 Optimized cache 'get' on affinity node.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/83b2bf5e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/83b2bf5e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/83b2bf5e

Branch: refs/heads/master
Commit: 83b2bf5e1f287dc83343945b0e47b83ee7724a8e
Parents: d85616b
Author: sboikov <sb...@gridgain.com>
Authored: Mon Jan 18 18:05:37 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Jan 18 18:05:37 2016 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheAdapter.java      |  30 +-
 .../processors/cache/GridCacheContext.java      |  33 ++
 .../dht/CacheDistributedGetFutureAdapter.java   |  28 +-
 .../dht/GridClientPartitionTopology.java        |   2 +
 .../dht/GridDhtPartitionTopologyImpl.java       |  27 +-
 .../dht/GridPartitionedGetFuture.java           | 241 ++++++-----
 .../dht/GridPartitionedSingleGetFuture.java     | 229 ++++++----
 .../dht/atomic/GridDhtAtomicCache.java          |  26 ++
 .../distributed/near/GridNearGetFuture.java     | 267 +++++++-----
 .../cache/transactions/IgniteTxManager.java     |  18 +-
 .../internal/TestRecordingCommunicationSpi.java | 157 +++++++
 ...idCacheConfigurationConsistencySelfTest.java |  58 +--
 .../cache/IgniteCacheNearLockValueSelfTest.java |  62 +--
 .../cache/IgniteCacheStoreCollectionTest.java   |  12 +
 ...eDynamicCacheStartNoExchangeTimeoutTest.java |   7 +
 ...ridCachePartitionNotLoadedEventSelfTest.java |   7 +-
 .../IgniteCacheAtomicNodeRestartTest.java       |   2 +
 ...niteCacheClientNodeChangingTopologyTest.java |   4 +-
 .../distributed/IgniteCacheGetRestartTest.java  | 280 ++++++++++++
 .../IgniteCacheReadFromBackupTest.java          | 427 +++++++++++++++++++
 .../IgniteCacheSingleGetMessageTest.java        |  88 +---
 .../IgniteCrossCacheTxStoreSelfTest.java        |   1 +
 .../GridCacheDhtPreloadMessageCountTest.java    |  62 +--
 .../near/GridCacheGetStoreErrorSelfTest.java    |   9 +-
 .../GridCachePartitionedNodeRestartTest.java    |   4 +-
 ...ePartitionedOptimisticTxNodeRestartTest.java |   4 +-
 .../IgniteCacheRestartTestSuite2.java           |   3 +
 .../testsuites/IgniteCacheTestSuite4.java       |   2 +
 28 files changed, 1524 insertions(+), 566 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/83b2bf5e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 5d4c386..2582e6c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -4540,9 +4540,33 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
      * @return Cached value.
      * @throws IgniteCheckedException If failed.
      */
-    @Nullable public V get(K key, boolean deserializeBinary)
-        throws IgniteCheckedException {
-        return getAsync(key, deserializeBinary).get();
+    @Nullable public V get(K key, boolean deserializeBinary) throws IgniteCheckedException {
+        checkJta();
+
+        String taskName = ctx.kernalContext().job().currentTaskName();
+
+        return get(key, taskName, deserializeBinary);
+    }
+
+    /**
+     * @param key Key.
+     * @param taskName Task name.
+     * @param deserializeBinary Deserialize binary flag.
+     * @return Cached value.
+     * @throws IgniteCheckedException If failed.
+     */
+    protected V get(
+        final K key,
+        String taskName,
+        boolean deserializeBinary) throws IgniteCheckedException {
+        return getAsync(key,
+            !ctx.config().isReadFromBackup(),
+            /*skip tx*/false,
+            null,
+            taskName,
+            deserializeBinary,
+            false,
+            /*can remap*/true).get();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/83b2bf5e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index c10ebf3..fc48b9d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -111,6 +111,7 @@ import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_VALUES;
 import static org.apache.ignite.cache.CacheRebalanceMode.NONE;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING;
 
 /**
  * Cache context.
@@ -1434,6 +1435,13 @@ public class GridCacheContext<K, V> implements Externalizable {
     }
 
     /**
+     * @return {@code True} if store and read-through mode are enabled in configuration.
+     */
+    public boolean readThroughConfigured() {
+        return store().configured() && cacheCfg.isReadThrough();
+    }
+
+    /**
      * @return {@code True} if {@link CacheConfiguration#isLoadPreviousValue()} flag is set.
      */
     public boolean loadPreviousValue() {
@@ -1961,6 +1969,31 @@ public class GridCacheContext<K, V> implements Externalizable {
         });
     }
 
+    /**
+     * @param part Partition.
+     * @param affNodes Affinity nodes.
+     * @param topVer Topology version.
+     * @return {@code True} if cache 'get' operation is allowed to get entry locally.
+     */
+    public boolean allowFastLocalRead(int part, List<ClusterNode> affNodes, AffinityTopologyVersion topVer) {
+        return affinityNode() && rebalanceEnabled() && hasPartition(part, affNodes, topVer);
+    }
+
+    /**
+     * @param part Partition.
+     * @param affNodes Affinity nodes.
+     * @param topVer Topology version.
+     * @return {@code True} if partition is available locally.
+     */
+    private boolean hasPartition(int part, List<ClusterNode> affNodes, AffinityTopologyVersion topVer) {
+        assert affinityNode();
+
+        GridDhtPartitionTopology top = topology();
+
+        return (top.rebalanceFinished(topVer) && (isReplicated() || affNodes.contains(locNode)))
+            || (top.partitionState(localNodeId(), part) == OWNING);
+    }
+
     /** {@inheritDoc} */
     @Override public void writeExternal(ObjectOutput out) throws IOException {
         U.writeString(out, gridName());

http://git-wip-us.apache.org/repos/asf/ignite/blob/83b2bf5e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
index c43cce9..40eec63 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
@@ -24,6 +24,7 @@ import java.util.UUID;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheFuture;
@@ -39,6 +40,7 @@ import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_NEAR_GET_MAX_REMAPS;
 import static org.apache.ignite.IgniteSystemProperties.getInteger;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING;
 
 /**
  *
@@ -168,14 +170,11 @@ public abstract class CacheDistributedGetFutureAdapter<K, V> extends GridCompoun
     /**
      * Affinity node to send get request to.
      *
-     * @param key Key to get.
-     * @param topVer Topology version.
+     * @param affNodes All affinity nodes.
      * @return Affinity node to get key from.
      */
-    protected final ClusterNode affinityNode(KeyCacheObject key, AffinityTopologyVersion topVer) {
+    protected final ClusterNode affinityNode(List<ClusterNode> affNodes) {
         if (!canRemap) {
-            List<ClusterNode> affNodes = cctx.affinity().nodes(key, topVer);
-
             for (ClusterNode node : affNodes) {
                 if (cctx.discovery().alive(node))
                     return node;
@@ -184,6 +183,23 @@ public abstract class CacheDistributedGetFutureAdapter<K, V> extends GridCompoun
             return null;
         }
         else
-            return cctx.affinity().primary(key, topVer);
+            return affNodes.get(0);
+    }
+
+    /**
+     * @param part Partition.
+     * @return {@code True} if partition is in owned state.
+     */
+    protected final boolean partitionOwned(int part) {
+        return cctx.topology().partitionState(cctx.localNodeId(), part) == OWNING;
+    }
+
+    /**
+     * @param topVer Topology version.
+     * @return Exception.
+     */
+    protected final ClusterTopologyServerNotFoundException serverNotFoundError(AffinityTopologyVersion topVer) {
+        return new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
+            "(all partition nodes left the grid) [topVer=" + topVer + ", cache=" + cctx.name() + ']');
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/83b2bf5e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index 8aef5ad..dcfc038 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -882,6 +882,8 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
 
     /** {@inheritDoc} */
     @Override public boolean rebalanceFinished(AffinityTopologyVersion topVer) {
+        assert false : "Should not be called on non-affinity node";
+
         return false;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/83b2bf5e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index a0709c5..2ab8a12 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -88,7 +88,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     private GridDhtPartitionExchangeId lastExchangeId;
 
     /** */
-    private AffinityTopologyVersion topVer = AffinityTopologyVersion.NONE;
+    private volatile AffinityTopologyVersion topVer = AffinityTopologyVersion.NONE;
 
     /** */
     private volatile boolean stopping;
@@ -136,9 +136,9 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
             topReadyFut = null;
 
-            topVer = AffinityTopologyVersion.NONE;
-
             rebalancedTopVer = AffinityTopologyVersion.NONE;
+
+            topVer = AffinityTopologyVersion.NONE;
         }
         finally {
             lock.writeLock().unlock();
@@ -223,13 +223,13 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
             this.stopping = stopping;
 
-            topVer = exchId.topologyVersion();
-
             updateSeq.setIfGreater(updSeq);
 
             topReadyFut = exchFut;
 
             rebalancedTopVer = AffinityTopologyVersion.NONE;
+
+            topVer = exchId.topologyVersion();
         }
         finally {
             lock.writeLock().unlock();
@@ -238,17 +238,12 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
     /** {@inheritDoc} */
     @Override public AffinityTopologyVersion topologyVersion() {
-        lock.readLock().lock();
+        AffinityTopologyVersion topVer = this.topVer;
 
-        try {
-            assert topVer.topologyVersion() > 0 : "Invalid topology version [topVer=" + topVer +
-                ", cacheName=" + cctx.name() + ']';
+        assert topVer.topologyVersion() > 0 : "Invalid topology version [topVer=" + topVer +
+            ", cacheName=" + cctx.name() + ']';
 
-            return topVer;
-        }
-        finally {
-            lock.readLock().unlock();
-        }
+        return topVer;
     }
 
     /** {@inheritDoc} */
@@ -1336,7 +1331,9 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
     /** {@inheritDoc} */
     @Override public boolean rebalanceFinished(AffinityTopologyVersion topVer) {
-        return topVer.equals(rebalancedTopVer);
+        AffinityTopologyVersion curTopVer = this.topVer;
+
+        return curTopVer.equals(topVer) && curTopVer.equals(rebalancedTopVer);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/83b2bf5e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index 19df1c2..1f2d7c5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicReference;
@@ -234,15 +235,16 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
         Map<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> mapped,
         AffinityTopologyVersion topVer
     ) {
-        if (CU.affinityNodes(cctx, topVer).isEmpty()) {
+        Collection<ClusterNode> cacheNodes = CU.affinityNodes(cctx, topVer);
+
+        if (cacheNodes.isEmpty()) {
             onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
                 "(all partition nodes left the grid) [topVer=" + topVer + ", cache=" + cctx.name() + ']'));
 
             return;
         }
 
-        Map<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> mappings =
-            U.newHashMap(CU.affinityNodes(cctx, topVer).size());
+        Map<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> mappings = U.newHashMap(cacheNodes.size());
 
         final int keysSize = keys.size();
 
@@ -374,135 +376,160 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
         AffinityTopologyVersion topVer,
         Map<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> mapped
     ) {
-        GridDhtCacheAdapter<K, V> colocated = cache();
+        int part = cctx.affinity().partition(key);
 
-        boolean remote = false;
+        List<ClusterNode> affNodes = cctx.affinity().nodes(part, topVer);
 
-        // Allow to get cached value from the local node.
-        boolean allowLocRead = (cctx.affinityNode() && !forcePrimary) ||
-                cctx.affinity().primary(cctx.localNode(), key, topVer);
+        if (affNodes.isEmpty()) {
+            onDone(serverNotFoundError(topVer));
 
-        while (true) {
-            GridCacheEntryEx entry;
+            return false;
+        }
 
-            try {
-                if (allowLocRead) {
-                    try {
-                        entry = colocated.context().isSwapOrOffheapEnabled() ? colocated.entryEx(key) :
-                            colocated.peekEx(key);
-
-                        // If our DHT cache do has value, then we peek it.
-                        if (entry != null) {
-                            boolean isNew = entry.isNewLocked();
-
-                            CacheObject v = null;
-                            GridCacheVersion ver = null;
-
-                            if (needVer) {
-                                T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned(
-                                    null,
-                                    /*swap*/true,
-                                    /*unmarshal*/true,
-                                    /**update-metrics*/false,
-                                    /*event*/!skipVals,
-                                    subjId,
-                                    null,
-                                    taskName,
-                                    expiryPlc,
-                                    !deserializeBinary);
-
-                                if (res != null) {
-                                    v = res.get1();
-                                    ver = res.get2();
-                                }
-                            }
-                            else {
-                                v = entry.innerGet(null,
-                                    /*swap*/true,
-                                    /*read-through*/false,
-                                    /*fail-fast*/true,
-                                    /*unmarshal*/true,
-                                    /**update-metrics*/false,
-                                    /*event*/!skipVals,
-                                    /*temporary*/false,
-                                    subjId,
-                                    null,
-                                    taskName,
-                                    expiryPlc,
-                                    !deserializeBinary);
-                            }
+        boolean fastLocGet = (!forcePrimary || affNodes.get(0).isLocal()) &&
+            cctx.allowFastLocalRead(part, affNodes, topVer);
 
-                            colocated.context().evicts().touch(entry, topVer);
+        if (fastLocGet && localGet(key, part, locVals))
+            return false;
 
-                            // Entry was not in memory or in swap, so we remove it from cache.
-                            if (v == null) {
-                                if (isNew && entry.markObsoleteIfEmpty(ver))
-                                    colocated.removeIfObsolete(key);
-                            }
-                            else {
-                                if (needVer)
-                                    versionedResult(locVals, key, v, ver);
-                                else
-                                    cctx.addResult(locVals,
-                                        key,
-                                        v,
-                                        skipVals,
-                                        keepCacheObjects,
-                                        deserializeBinary,
-                                        true);
-
-                                return false;
-                            }
-                        }
-                    }
-                    catch (GridDhtInvalidPartitionException ignored) {
-                        // No-op.
-                    }
-                }
+        ClusterNode node = affinityNode(affNodes);
 
-                ClusterNode node = affinityNode(key, topVer);
+        if (node == null) {
+            onDone(serverNotFoundError(topVer));
 
-                if (node == null) {
-                    onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
-                        "(all partition nodes left the grid)."));
+            return false;
+        }
 
-                    return false;
-                }
+        boolean remote = !node.isLocal();
+
+        LinkedHashMap<KeyCacheObject, Boolean> keys = mapped.get(node);
+
+        if (keys != null && keys.containsKey(key)) {
+            if (REMAP_CNT_UPD.incrementAndGet(this) > MAX_REMAP_CNT) {
+                onDone(new ClusterTopologyCheckedException("Failed to remap key to a new node after " +
+                    MAX_REMAP_CNT + " attempts (key got remapped to the same node) [key=" + key + ", node=" +
+                    U.toShortString(node) + ", mappings=" + mapped + ']'));
+
+                return false;
+            }
+        }
+
+        LinkedHashMap<KeyCacheObject, Boolean> old = mappings.get(node);
+
+        if (old == null)
+            mappings.put(node, old = new LinkedHashMap<>(3, 1f));
+
+        old.put(key, false);
+
+        return remote;
+    }
 
-                remote = !node.isLocal();
+    /**
+     * @param key Key.
+     * @param part Partition.
+     * @param locVals Local values.
+     * @return {@code True} if there is no need to further search value.
+     */
+    private boolean localGet(KeyCacheObject key, int part, Map<K, V> locVals) {
+        assert cctx.affinityNode() : this;
 
-                LinkedHashMap<KeyCacheObject, Boolean> keys = mapped.get(node);
+        GridDhtCacheAdapter<K, V> cache = cache();
 
-                if (keys != null && keys.containsKey(key)) {
-                    if (REMAP_CNT_UPD.incrementAndGet(this) > MAX_REMAP_CNT) {
-                        onDone(new ClusterTopologyCheckedException("Failed to remap key to a new node after " +
-                            MAX_REMAP_CNT + " attempts (key got remapped to the same node) [key=" + key + ", node=" +
-                            U.toShortString(node) + ", mappings=" + mapped + ']'));
+        while (true) {
+            GridCacheEntryEx entry;
 
-                        return false;
+            try {
+                entry = cache.context().isSwapOrOffheapEnabled() ? cache.entryEx(key) : cache.peekEx(key);
+
+                // If our DHT cache do has value, then we peek it.
+                if (entry != null) {
+                    boolean isNew = entry.isNewLocked();
+
+                    CacheObject v = null;
+                    GridCacheVersion ver = null;
+
+                    if (needVer) {
+                        T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned(
+                            null,
+                            /*swap*/true,
+                            /*unmarshal*/true,
+                            /**update-metrics*/false,
+                            /*event*/!skipVals,
+                            subjId,
+                            null,
+                            taskName,
+                            expiryPlc,
+                            !deserializeBinary);
+
+                        if (res != null) {
+                            v = res.get1();
+                            ver = res.get2();
+                        }
+                    }
+                    else {
+                        v = entry.innerGet(null,
+                            /*swap*/true,
+                            /*read-through*/false,
+                            /*fail-fast*/true,
+                            /*unmarshal*/true,
+                            /**update-metrics*/false,
+                            /*event*/!skipVals,
+                            /*temporary*/false,
+                            subjId,
+                            null,
+                            taskName,
+                            expiryPlc,
+                            !deserializeBinary);
+                    }
+
+                    cache.context().evicts().touch(entry, topVer);
+
+                    // Entry was not in memory or in swap, so we remove it from cache.
+                    if (v == null) {
+                        if (isNew && entry.markObsoleteIfEmpty(ver))
+                            cache.removeIfObsolete(key);
+                    }
+                    else {
+                        if (needVer)
+                            versionedResult(locVals, key, v, ver);
+                        else {
+                            cctx.addResult(locVals,
+                                key,
+                                v,
+                                skipVals,
+                                keepCacheObjects,
+                                deserializeBinary,
+                                true);
+                        }
+
+                        return true;
                     }
                 }
 
-                LinkedHashMap<KeyCacheObject, Boolean> old = mappings.get(node);
+                boolean topStable = cctx.isReplicated() || topVer.equals(cctx.topology().topologyVersion());
 
-                if (old == null)
-                    mappings.put(node, old = new LinkedHashMap<>(3, 1f));
+                // Entry not found, do not continue search if topology did not change and there is no store.
+                if (!cctx.readThroughConfigured() && (topStable || partitionOwned(part))) {
+                    if (!skipVals && cctx.config().isStatisticsEnabled())
+                        cache.metrics0().onRead(false);
 
-                old.put(key, false);
+                    return true;
+                }
 
-                break;
+                return false;
+            }
+            catch (GridCacheEntryRemovedException ignored) {
+                // No-op, will retry.
+            }
+            catch (GridDhtInvalidPartitionException ignored) {
+                return false;
             }
             catch (IgniteCheckedException e) {
                 onDone(e);
 
-                break;
-            }
-            catch (GridCacheEntryRemovedException ignored) {
-                // No-op, will retry.
+                return true;
             }
         }
-
-        return remote;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/83b2bf5e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
index 29971fd..0c811ae 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
@@ -58,6 +58,8 @@ import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.jetbrains.annotations.Nullable;
 
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING;
+
 /**
  *
  */
@@ -319,105 +321,140 @@ public class GridPartitionedSingleGetFuture extends GridFutureAdapter<Object> im
      * @return Primary node or {@code null} if future was completed.
      */
     @Nullable private ClusterNode mapKeyToNode(AffinityTopologyVersion topVer) {
-        ClusterNode primary = affinityNode(key, topVer);
+        int part = cctx.affinity().partition(key);
+
+        List<ClusterNode> affNodes = cctx.affinity().nodes(part, topVer);
 
-        if (primary == null) {
-            onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
-                "(all partition nodes left the grid) [topVer=" + topVer + ", cache=" + cctx.name() + ']'));
+        if (affNodes.isEmpty()) {
+            onDone(serverNotFoundError(topVer));
 
             return null;
         }
 
-        boolean allowLocRead = (cctx.affinityNode() && !forcePrimary) || primary.isLocal();
-
-        if (allowLocRead) {
-            GridDhtCacheAdapter colocated = cctx.dht();
-
-            while (true) {
-                GridCacheEntryEx entry;
-
-                try {
-                    entry = colocated.context().isSwapOrOffheapEnabled() ? colocated.entryEx(key) :
-                        colocated.peekEx(key);
-
-                    // If our DHT cache do has value, then we peek it.
-                    if (entry != null) {
-                        boolean isNew = entry.isNewLocked();
-
-                        CacheObject v = null;
-                        GridCacheVersion ver = null;
-
-                        if (needVer) {
-                            T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned(
-                                null,
-                                /*swap*/true,
-                                /*unmarshal*/true,
-                                /**update-metrics*/false,
-                                /*event*/!skipVals,
-                                subjId,
-                                null,
-                                taskName,
-                                expiryPlc,
-                                true);
-
-                            if (res != null) {
-                                v = res.get1();
-                                ver = res.get2();
-                            }
-                        }
-                        else {
-                            v = entry.innerGet(null,
-                                /*swap*/true,
-                                /*read-through*/false,
-                                /*fail-fast*/true,
-                                /*unmarshal*/true,
-                                /**update-metrics*/false,
-                                /*event*/!skipVals,
-                                /*temporary*/false,
-                                subjId,
-                                null,
-                                taskName,
-                                expiryPlc,
-                                true);
-                        }
+        boolean fastLocGet = (!forcePrimary || affNodes.get(0).isLocal()) &&
+            cctx.allowFastLocalRead(part, affNodes, topVer);
 
-                        colocated.context().evicts().touch(entry, topVer);
+        if (fastLocGet && localGet(topVer, part))
+            return null;
 
-                        // Entry was not in memory or in swap, so we remove it from cache.
-                        if (v == null) {
-                            if (isNew && entry.markObsoleteIfEmpty(ver))
-                                colocated.removeIfObsolete(key);
-                        }
-                        else {
-                            if (!skipVals && cctx.config().isStatisticsEnabled())
-                                cctx.cache().metrics0().onRead(true);
+        ClusterNode affNode = affinityNode(affNodes);
+
+        if (affNode == null) {
+            onDone(serverNotFoundError(topVer));
+
+            return null;
+        }
+
+        return affNode;
+    }
+
+    /**
+     * @param topVer Topology version.
+     * @param part Partition.
+     * @return {@code True} if future completed.
+     */
+    private boolean localGet(AffinityTopologyVersion topVer, int part) {
+        assert cctx.affinityNode() : this;
+
+        GridDhtCacheAdapter colocated = cctx.dht();
 
-                            if (!skipVals)
-                                setResult(v, ver);
-                            else
-                                setSkipValueResult(true, ver);
+        while (true) {
+            GridCacheEntryEx entry;
 
-                            return null;
+            try {
+                entry = colocated.context().isSwapOrOffheapEnabled() ? colocated.entryEx(key) :
+                    colocated.peekEx(key);
+
+                // If our DHT cache do has value, then we peek it.
+                if (entry != null) {
+                    boolean isNew = entry.isNewLocked();
+
+                    CacheObject v = null;
+                    GridCacheVersion ver = null;
+
+                    if (needVer) {
+                        T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned(
+                            null,
+                            /*swap*/true,
+                            /*unmarshal*/true,
+                            /**update-metrics*/false,
+                            /*event*/!skipVals,
+                            subjId,
+                            null,
+                            taskName,
+                            expiryPlc,
+                            true);
+
+                        if (res != null) {
+                            v = res.get1();
+                            ver = res.get2();
                         }
                     }
+                    else {
+                        v = entry.innerGet(null,
+                            /*swap*/true,
+                            /*read-through*/false,
+                            /*fail-fast*/true,
+                            /*unmarshal*/true,
+                            /**update-metrics*/false,
+                            /*event*/!skipVals,
+                            /*temporary*/false,
+                            subjId,
+                            null,
+                            taskName,
+                            expiryPlc,
+                            true);
+                    }
 
-                    break;
-                }
-                catch (GridDhtInvalidPartitionException ignored) {
-                    break;
-                }
-                catch (IgniteCheckedException e) {
-                    onDone(e);
+                    colocated.context().evicts().touch(entry, topVer);
+
+                    // Entry was not in memory or in swap, so we remove it from cache.
+                    if (v == null) {
+                        if (isNew && entry.markObsoleteIfEmpty(ver))
+                            colocated.removeIfObsolete(key);
+                    }
+                    else {
+                        if (!skipVals && cctx.config().isStatisticsEnabled())
+                            cctx.cache().metrics0().onRead(true);
+
+                        if (!skipVals)
+                            setResult(v, ver);
+                        else
+                            setSkipValueResult(true, ver);
 
-                    return null;
+                        return true;
+                    }
                 }
-                catch (GridCacheEntryRemovedException ignored) {
-                    // No-op, will retry.
+
+                boolean topStable = cctx.isReplicated() || topVer.equals(cctx.topology().topologyVersion());
+
+                // Entry not found, complete future with null result if topology did not change and there is no store.
+                if (!cctx.readThroughConfigured() && (topStable || partitionOwned(part))) {
+                    if (!skipVals && cctx.config().isStatisticsEnabled())
+                        colocated.metrics0().onRead(false);
+
+                    if (skipVals)
+                        setSkipValueResult(false, null);
+                    else
+                        setResult(null, null);
+
+                    return true;
                 }
+
+                return false;
             }
-        }
+            catch (GridCacheEntryRemovedException ignored) {
+                // No-op, will retry.
+            }
+            catch (GridDhtInvalidPartitionException ignored) {
+                return false;
+            }
+            catch (IgniteCheckedException e) {
+                onDone(e);
 
-        return primary;
+                return true;
+            }
+        }
     }
 
     /**
@@ -595,7 +632,7 @@ public class GridPartitionedSingleGetFuture extends GridFutureAdapter<Object> im
                 }
                 else {
                     if (!keepCacheObjects) {
-                        Object res = cctx.unwrapBinaryIfNeeded(val, !deserializeBinary && !skipVals);
+                        Object res = cctx.unwrapBinaryIfNeeded(val, !deserializeBinary);
 
                         onDone(res);
                     }
@@ -612,16 +649,30 @@ public class GridPartitionedSingleGetFuture extends GridFutureAdapter<Object> im
     }
 
     /**
+     * @param part Partition.
+     * @return {@code True} if partition is in owned state.
+     */
+    private boolean partitionOwned(int part) {
+        return cctx.topology().partitionState(cctx.localNodeId(), part) == OWNING;
+    }
+
+    /**
+     * @param topVer Topology version.
+     * @return Exception.
+     */
+    private ClusterTopologyServerNotFoundException serverNotFoundError(AffinityTopologyVersion topVer) {
+        return new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
+            "(all partition nodes left the grid) [topVer=" + topVer + ", cache=" + cctx.name() + ']');
+    }
+
+    /**
      * Affinity node to send get request to.
      *
-     * @param key Key to get.
-     * @param topVer Topology version.
+     * @param affNodes All affinity nodes.
      * @return Affinity node to get key from.
      */
-    private ClusterNode affinityNode(KeyCacheObject key, AffinityTopologyVersion topVer) {
+    @Nullable private ClusterNode affinityNode(List<ClusterNode> affNodes) {
         if (!canRemap) {
-            List<ClusterNode> affNodes = cctx.affinity().nodes(key, topVer);
-
             for (ClusterNode node : affNodes) {
                 if (cctx.discovery().alive(node))
                     return node;
@@ -630,7 +681,7 @@ public class GridPartitionedSingleGetFuture extends GridFutureAdapter<Object> im
             return null;
         }
         else
-            return cctx.affinity().primary(key, topVer);
+            return affNodes.get(0);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/83b2bf5e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 393413e..81fd5d6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -317,6 +317,32 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
     }
 
     /** {@inheritDoc} */
+    @Override protected V get(K key, String taskName, boolean deserializeBinary) throws IgniteCheckedException {
+        ctx.checkSecurity(SecurityPermission.CACHE_READ);
+
+        if (keyCheck)
+            validateCacheKey(key);
+
+        CacheOperationContext opCtx = ctx.operationContextPerCall();
+
+        UUID subjId = ctx.subjectIdPerCall(null, opCtx);
+
+        final ExpiryPolicy expiryPlc = opCtx != null ? opCtx.expiry() : null;
+
+        final boolean skipStore = opCtx != null && opCtx.skipStore();
+
+        return getAsync0(ctx.toCacheKeyObject(key),
+            !ctx.config().isReadFromBackup(),
+            subjId,
+            taskName,
+            deserializeBinary,
+            expiryPlc,
+            false,
+            skipStore,
+            true).get();
+    }
+
+    /** {@inheritDoc} */
     @Override protected IgniteInternalFuture<V> getAsync(final K key,
         final boolean forcePrimary,
         final boolean skipTx,

http://git-wip-us.apache.org/repos/asf/ignite/blob/83b2bf5e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
index c547a88..9291001 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicReference;
@@ -405,10 +406,20 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
         Map<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> mapped,
         Map<KeyCacheObject, GridNearCacheEntry> saved
     ) {
+        int part = cctx.affinity().partition(key);
+
+        List<ClusterNode> affNodes = cctx.affinity().nodes(part, topVer);
+
+        if (affNodes.isEmpty()) {
+            onDone(serverNotFoundError(topVer));
+
+            return null;
+        }
+
         final GridNearCacheAdapter near = cache();
 
         // Allow to get cached value from the local node.
-        boolean allowLocRead = !forcePrimary || cctx.affinity().primary(cctx.localNode(), key, topVer);
+        boolean allowLocRead = !forcePrimary || cctx.localNode().equals(affNodes.get(0));
 
         while (true) {
             GridNearCacheEntry entry = allowLocRead ? (GridNearCacheEntry)near.peekEx(key) : null;
@@ -456,124 +467,23 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
                     }
                 }
 
-                ClusterNode affNode = null;
-
-                if (v == null && allowLocRead && cctx.affinityNode()) {
-                    GridDhtCacheAdapter<K, V> dht = cache().dht();
-
-                    GridCacheEntryEx dhtEntry = null;
-
-                    try {
-                        dhtEntry = dht.context().isSwapOrOffheapEnabled() ? dht.entryEx(key) : dht.peekEx(key);
-
-                        // If near cache does not have value, then we peek DHT cache.
-                        if (dhtEntry != null) {
-                            boolean isNew = dhtEntry.isNewLocked() || !dhtEntry.valid(topVer);
-
-                            if (needVer) {
-                                T2<CacheObject, GridCacheVersion> res = dhtEntry.innerGetVersioned(
-                                    null,
-                                    /*swap*/true,
-                                    /*unmarshal*/true,
-                                    /**update-metrics*/false,
-                                    /*event*/!isNear && !skipVals,
-                                    subjId,
-                                    null,
-                                    taskName,
-                                    expiryPlc,
-                                    !deserializeBinary);
-
-                                if (res != null) {
-                                    v = res.get1();
-                                    ver = res.get2();
-                                }
-                            }
-                            else {
-                                v = dhtEntry.innerGet(tx,
-                                    /*swap*/true,
-                                    /*read-through*/false,
-                                    /*fail-fast*/true,
-                                    /*unmarshal*/true,
-                                    /*update-metrics*/false,
-                                    /*events*/!isNear && !skipVals,
-                                    /*temporary*/false,
-                                    subjId,
-                                    null,
-                                    taskName,
-                                    expiryPlc,
-                                    !deserializeBinary);
-                            }
-
-                            // Entry was not in memory or in swap, so we remove it from cache.
-                            if (v == null && isNew && dhtEntry.markObsoleteIfEmpty(ver))
-                                dht.removeIfObsolete(key);
-                        }
-
-                        if (v != null) {
-                            if (cctx.cache().configuration().isStatisticsEnabled() && !skipVals)
-                                near.metrics0().onRead(true);
-                        }
-                        else {
-                            affNode = affinityNode(key, topVer);
-
-                            if (affNode == null) {
-                                onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
-                                    "(all partition nodes left the grid)."));
-
-                                return saved;
-                            }
-
-                            if (!affNode.isLocal() && cctx.cache().configuration().isStatisticsEnabled() && !skipVals)
-                                near.metrics0().onRead(false);
-                        }
-                    }
-                    catch (GridDhtInvalidPartitionException | GridCacheEntryRemovedException ignored) {
-                        // No-op.
-                    }
-                    finally {
-                        if (dhtEntry != null && (tx == null || (!tx.implicit() && tx.isolation() == READ_COMMITTED))) {
-                            dht.context().evicts().touch(dhtEntry, topVer);
-
-                            entry = null;
-                        }
-                    }
-                }
-
-                if (v != null) {
-                    if (needVer) {
-                        V val0 = (V)new T2<>(skipVals ? true : v, ver);
+                if (v == null) {
+                    boolean fastLocGet = allowLocRead && cctx.allowFastLocalRead(part, affNodes, topVer);
 
-                        add(new GridFinishedFuture<>(Collections.singletonMap((K)key, val0)));
-                    }
-                    else {
-                        if (keepCacheObjects) {
-                            K key0 = (K)key;
-                            V val0 = (V)(skipVals ? true : v);
+                    if (fastLocGet && localDhtGet(key, part, topVer, isNear))
+                        break;
 
-                            add(new GridFinishedFuture<>(Collections.singletonMap(key0, val0)));
-                        }
-                        else {
-                            K key0 = (K)cctx.unwrapBinaryIfNeeded(key, !deserializeBinary, false);
-                            V val0 = !skipVals ?
-                                (V)cctx.unwrapBinaryIfNeeded(v, !deserializeBinary, false) :
-                                (V)Boolean.TRUE;
+                    ClusterNode affNode = affinityNode(affNodes);
 
-                            add(new GridFinishedFuture<>(Collections.singletonMap(key0, val0)));
-                        }
-                    }
-                }
-                else {
                     if (affNode == null) {
-                        affNode = affinityNode(key, topVer);
-
-                        if (affNode == null) {
-                            onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
-                                "(all partition nodes left the grid)."));
+                        onDone(serverNotFoundError(topVer));
 
-                            return saved;
-                        }
+                        return saved;
                     }
 
+                    if (cctx.cache().configuration().isStatisticsEnabled() && !skipVals && !affNode.isLocal())
+                        cache().metrics0().onRead(false);
+
                     LinkedHashMap<KeyCacheObject, Boolean> keys = mapped.get(affNode);
 
                     if (keys != null && keys.containsKey(key)) {
@@ -586,7 +496,7 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
                         }
                     }
 
-                    if (!cctx.affinity().localNode(key, topVer)) {
+                    if (!affNodes.contains(cctx.localNode())) {
                         GridNearCacheEntry nearEntry = entry != null ? entry : near.entryExx(key, topVer);
 
                         nearEntry.reserveEviction();
@@ -612,6 +522,8 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
 
                     old.put(key, addRdr);
                 }
+                else
+                    addResult(key, v, ver);
 
                 break;
             }
@@ -633,6 +545,135 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
     }
 
     /**
+     * @param key Key.
+     * @param part Partition.
+     * @param topVer Topology version.
+     * @param nearRead {@code True} if already tried to read from near cache.
+     * @return {@code True} if there is no need to further search value.
+     */
+    private boolean localDhtGet(KeyCacheObject key,
+        int part,
+        AffinityTopologyVersion topVer,
+        boolean nearRead) {
+        GridDhtCacheAdapter<K, V> dht = cache().dht();
+
+        assert dht.context().affinityNode() : this;
+
+        while (true) {
+            GridCacheEntryEx dhtEntry = null;
+
+            try {
+                dhtEntry = dht.context().isSwapOrOffheapEnabled() ? dht.entryEx(key) : dht.peekEx(key);
+
+                CacheObject v = null;
+
+                // If near cache does not have value, then we peek DHT cache.
+                if (dhtEntry != null) {
+                    boolean isNew = dhtEntry.isNewLocked() || !dhtEntry.valid(topVer);
+
+                    if (needVer) {
+                        T2<CacheObject, GridCacheVersion> res = dhtEntry.innerGetVersioned(
+                            null,
+                            /*swap*/true,
+                            /*unmarshal*/true,
+                            /**update-metrics*/false,
+                            /*event*/!nearRead && !skipVals,
+                            subjId,
+                            null,
+                            taskName,
+                            expiryPlc,
+                            !deserializeBinary);
+
+                        if (res != null) {
+                            v = res.get1();
+                            ver = res.get2();
+                        }
+                    }
+                    else {
+                        v = dhtEntry.innerGet(tx,
+                            /*swap*/true,
+                            /*read-through*/false,
+                            /*fail-fast*/true,
+                            /*unmarshal*/true,
+                            /*update-metrics*/false,
+                            /*events*/!nearRead && !skipVals,
+                            /*temporary*/false,
+                            subjId,
+                            null,
+                            taskName,
+                            expiryPlc,
+                            !deserializeBinary);
+                    }
+
+                    // Entry was not in memory or in swap, so we remove it from cache.
+                    if (v == null && isNew && dhtEntry.markObsoleteIfEmpty(ver))
+                        dht.removeIfObsolete(key);
+                }
+
+                if (v != null) {
+                    if (cctx.cache().configuration().isStatisticsEnabled() && !skipVals)
+                        cache().metrics0().onRead(true);
+
+                    addResult(key, v, ver);
+
+                    return true;
+                }
+                else {
+                    boolean topStable = cctx.isReplicated() || topVer.equals(cctx.topology().topologyVersion());
+
+                    // Entry not found, do not continue search if topology did not change and there is no store.
+                    return !cctx.readThroughConfigured() && (topStable || partitionOwned(part));
+                }
+            }
+            catch (GridCacheEntryRemovedException ignored) {
+                // Retry.
+            }
+            catch (GridDhtInvalidPartitionException e) {
+                return false;
+            }
+            catch (IgniteCheckedException e) {
+                onDone(e);
+
+                return false;
+            }
+            finally {
+                if (dhtEntry != null && (tx == null || (!tx.implicit() && tx.isolation() == READ_COMMITTED)))
+                    dht.context().evicts().touch(dhtEntry, topVer);
+            }
+        }
+    }
+
+    /**
+     * @param key Key.
+     * @param v Value.
+     * @param ver Version.
+     */
+    @SuppressWarnings("unchecked")
+    private void addResult(KeyCacheObject key, CacheObject v, GridCacheVersion ver) {
+        if (needVer) {
+            V val0 = (V)new T2<>(skipVals ? true : v, ver);
+
+            add(new GridFinishedFuture<>(Collections.singletonMap((K)key, val0)));
+        }
+        else {
+            if (keepCacheObjects) {
+                K key0 = (K)key;
+                V val0 = (V)(skipVals ? true : v);
+
+                add(new GridFinishedFuture<>(Collections.singletonMap(key0, val0)));
+            }
+            else {
+                K key0 = (K)cctx.unwrapBinaryIfNeeded(key, !deserializeBinary, false);
+                V val0 = !skipVals ?
+                    (V)cctx.unwrapBinaryIfNeeded(v, !deserializeBinary, false) :
+                    (V)Boolean.TRUE;
+
+                add(new GridFinishedFuture<>(Collections.singletonMap(key0, val0)));
+            }
+        }
+    }
+
+    /**
      * @return Near cache.
      */
     private GridNearCacheAdapter<K, V> cache() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/83b2bf5e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index ca15e20..7a3b8ff 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -619,17 +619,19 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
                 return topVer;
         }
 
-        for (GridCacheContext cacheCtx : cctx.cache().context().cacheContexts()) {
-            if (!cacheCtx.systemTx())
-                continue;
+        if (!sysThreadMap.isEmpty()) {
+            for (GridCacheContext cacheCtx : cctx.cache().context().cacheContexts()) {
+                if (!cacheCtx.systemTx())
+                    continue;
 
-            tx = sysThreadMap.get(new TxThreadKey(threadId, cacheCtx.cacheId()));
+                tx = sysThreadMap.get(new TxThreadKey(threadId, cacheCtx.cacheId()));
 
-            if (tx != null && tx != ignore) {
-                AffinityTopologyVersion topVer = tx.topologyVersionSnapshot();
+                if (tx != null && tx != ignore) {
+                    AffinityTopologyVersion topVer = tx.topologyVersionSnapshot();
 
-                if (topVer != null)
-                    return topVer;
+                    if (topVer != null)
+                        return topVer;
+                }
             }
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/83b2bf5e/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java b/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java
new file mode 100644
index 0000000..8a602ad
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_GRID_NAME;
+
+/**
+ *
+ */
+public class TestRecordingCommunicationSpi extends TcpCommunicationSpi {
+    /** */
+    private Class<?> recordCls;
+
+    /** */
+    private List<Object> recordedMsgs = new ArrayList<>();
+
+    /** */
+    private List<T2<ClusterNode, GridIoMessage>> blockedMsgs = new ArrayList<>();
+
+    /** */
+    private Map<Class<?>, Set<String>> blockCls = new HashMap<>();
+
+    /** */
+    private IgnitePredicate<GridIoMessage> blockP;
+
+    /** {@inheritDoc} */
+    @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC)
+        throws IgniteSpiException {
+        if (msg instanceof GridIoMessage) {
+            GridIoMessage ioMsg = (GridIoMessage)msg;
+
+            Object msg0 = ioMsg.message();
+
+            synchronized (this) {
+                if (recordCls != null && msg0.getClass().equals(recordCls))
+                    recordedMsgs.add(msg0);
+
+                boolean block = false;
+
+                if (blockP != null && blockP.apply(ioMsg))
+                    block = true;
+                else {
+                    Set<String> blockNodes = blockCls.get(msg0.getClass());
+
+                    if (blockNodes != null) {
+                        String nodeName = (String)node.attributes().get(ATTR_GRID_NAME);
+
+                        block = blockNodes.contains(nodeName);
+                    }
+                }
+
+                if (block) {
+                    blockedMsgs.add(new T2<>(node, ioMsg));
+
+                    return;
+                }
+            }
+        }
+
+        super.sendMessage(node, msg, ackC);
+    }
+
+    /**
+     * @param recordCls Message class to record.
+     */
+    public void record(@Nullable Class<?> recordCls) {
+        synchronized (this) {
+            this.recordCls = recordCls;
+        }
+    }
+
+    /**
+     * @return Recorded messages.
+     */
+    public List<Object> recordedMessages() {
+        synchronized (this) {
+            List<Object> msgs = recordedMsgs;
+
+            recordedMsgs = new ArrayList<>();
+
+            return msgs;
+        }
+    }
+
+    /**
+     * @param blockP Message block predicate.
+     */
+    public void blockMessages(IgnitePredicate<GridIoMessage> blockP) {
+        synchronized (this) {
+            this.blockP = blockP;
+        }
+    }
+
+    /**
+     * @param cls Message class.
+     * @param nodeName Node name.
+     */
+    public void blockMessages(Class<?> cls, String nodeName) {
+        synchronized (this) {
+            Set<String> set = blockCls.get(cls);
+
+            if (set == null) {
+                set = new HashSet<>();
+
+                blockCls.put(cls, set);
+            }
+
+            set.add(nodeName);
+        }
+    }
+
+    /**
+     * Stops block messages and sends all already blocked messages.
+     */
+    public void stopBlock() {
+        synchronized (this) {
+            blockCls.clear();
+
+            for (T2<ClusterNode, GridIoMessage> msg : blockedMsgs)
+                super.sendMessage(msg.get1(), msg.get2());
+
+            blockedMsgs.clear();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/83b2bf5e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConfigurationConsistencySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConfigurationConsistencySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConfigurationConsistencySelfTest.java
index e28e89f..a1f917f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConfigurationConsistencySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConfigurationConsistencySelfTest.java
@@ -19,25 +19,19 @@ package org.apache.ignite.internal.processors.cache;
 
 import java.io.Externalizable;
 import java.io.Serializable;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
 import java.util.concurrent.Callable;
 import javax.cache.Cache;
-import javax.cache.integration.CacheLoaderException;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cache.CacheInterceptorAdapter;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.affinity.AffinityFunction;
 import org.apache.ignite.cache.affinity.AffinityNodeIdHashResolver;
-import org.apache.ignite.cache.affinity.fair.FairAffinityFunction;
 import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
 import org.apache.ignite.cache.eviction.EvictionFilter;
 import org.apache.ignite.cache.eviction.fifo.FifoEvictionPolicy;
 import org.apache.ignite.cache.eviction.lru.LruEvictionPolicy;
 import org.apache.ignite.cache.eviction.random.RandomEvictionPolicy;
-import org.apache.ignite.cache.store.CacheStore;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.DeploymentMode;
@@ -46,7 +40,6 @@ import org.apache.ignite.configuration.NearCacheConfiguration;
 import org.apache.ignite.internal.util.typedef.C1;
 import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.CU;
-import org.apache.ignite.lang.IgniteBiInClosure;
 import org.apache.ignite.lang.IgniteCallable;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
@@ -54,7 +47,6 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
 import org.apache.ignite.testframework.GridStringLogger;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
 import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
@@ -862,49 +854,9 @@ public class GridCacheConfigurationConsistencySelfTest extends GridCommonAbstrac
         }, IgniteCheckedException.class, null);
     }
 
-    /** */
-    private static class TestStore implements CacheStore<Object,Object> {
-        /** {@inheritDoc} */
-        @Nullable @Override public Object load(Object key) {
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void loadCache(IgniteBiInClosure<Object, Object> clo, @Nullable Object... args) {
-            // No-op.
-        }
-
-        /** {@inheritDoc} */
-        @Override public Map<Object, Object> loadAll(Iterable<?> keys) throws CacheLoaderException {
-            return Collections.emptyMap();
-        }
-
-        /** {@inheritDoc} */
-        @Override public void write(Cache.Entry<?, ?> entry) {
-            // No-op.
-        }
-
-        /** {@inheritDoc} */
-        @Override public void writeAll(Collection<Cache.Entry<?, ?>> entries) {
-            // No-op.
-        }
-
-        /** {@inheritDoc} */
-        @Override public void delete(Object key) {
-            // No-op.
-        }
-
-        /** {@inheritDoc} */
-        @Override public void deleteAll(Collection<?> keys) {
-            // No-op.
-        }
-
-        /** {@inheritDoc} */
-        @Override public void sessionEnd(boolean commit) {
-            // No-op.
-        }
-    }
-
+    /**
+     *
+     */
     private static class TestRendezvousAffinityFunction extends RendezvousAffinityFunction {
         /**
          * Empty constructor required by {@link Externalizable}.
@@ -941,6 +893,10 @@ public class GridCacheConfigurationConsistencySelfTest extends GridCommonAbstrac
         // No-op, just different class.
     }
 
+    /**
+     *
+     */
     private static class TestCacheDefaultAffinityKeyMapper extends GridCacheDefaultAffinityKeyMapper {
+        // No-op, just different class.
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/83b2bf5e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNearLockValueSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNearLockValueSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNearLockValueSelfTest.java
index 100acfe..f106fec 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNearLockValueSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNearLockValueSelfTest.java
@@ -18,22 +18,15 @@
 package org.apache.ignite.internal.processors.cache;
 
 import java.util.Collection;
-import java.util.concurrent.ConcurrentLinkedDeque;
 import org.apache.ignite.IgniteCache;
-import org.apache.ignite.IgniteException;
 import org.apache.ignite.cache.CacheAtomicityMode;
-import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.NearCacheConfiguration;
 import org.apache.ignite.internal.IgniteKernal;
-import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockRequest;
-import org.apache.ignite.lang.IgniteInClosure;
-import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.spi.IgniteSpiException;
-import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
@@ -71,7 +64,11 @@ public class IgniteCacheNearLockValueSelfTest extends GridCommonAbstractTest {
         if (getTestGridName(0).equals(gridName))
             cfg.setClientMode(true);
 
-        cfg.setCommunicationSpi(new TestCommunicationSpi());
+        TestRecordingCommunicationSpi commSpi = new TestRecordingCommunicationSpi();
+
+        commSpi.record(GridNearLockRequest.class);
+
+        cfg.setCommunicationSpi(commSpi);
 
         return cfg;
     }
@@ -88,18 +85,18 @@ public class IgniteCacheNearLockValueSelfTest extends GridCommonAbstractTest {
             cache.put("key1", "val1");
 
             for (int i = 0; i < 3; i++) {
-                ((TestCommunicationSpi)ignite(0).configuration().getCommunicationSpi()).clear();
-                ((TestCommunicationSpi)ignite(1).configuration().getCommunicationSpi()).clear();
-
                 try (Transaction tx = ignite(0).transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
                     cache.get("key1");
 
                     tx.commit();
                 }
 
-                TestCommunicationSpi comm = (TestCommunicationSpi)ignite(0).configuration().getCommunicationSpi();
+                TestRecordingCommunicationSpi comm =
+                    (TestRecordingCommunicationSpi)ignite(0).configuration().getCommunicationSpi();
+
+                Collection<GridNearLockRequest> reqs = (Collection)comm.recordedMessages();
 
-                assertEquals(1, comm.requests().size());
+                assertEquals(1, reqs.size());
 
                 GridCacheAdapter<Object, Object> primary = ((IgniteKernal)grid(1)).internalCache("partitioned");
 
@@ -107,7 +104,7 @@ public class IgniteCacheNearLockValueSelfTest extends GridCommonAbstractTest {
 
                 assertNotNull(dhtEntry);
 
-                GridNearLockRequest req = comm.requests().iterator().next();
+                GridNearLockRequest req = reqs.iterator().next();
 
                 assertEquals(dhtEntry.version(), req.dhtVersion(0));
 
@@ -122,39 +119,4 @@ public class IgniteCacheNearLockValueSelfTest extends GridCommonAbstractTest {
             }
         }
     }
-
-    /**
-     *
-     */
-    private static class TestCommunicationSpi extends TcpCommunicationSpi {
-        /** */
-        private Collection<GridNearLockRequest> reqs = new ConcurrentLinkedDeque<>();
-
-        /** {@inheritDoc} */
-        @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC)
-            throws IgniteSpiException {
-            if (msg instanceof GridIoMessage) {
-                GridIoMessage ioMsg = (GridIoMessage)msg;
-
-                if (ioMsg.message() instanceof GridNearLockRequest)
-                    reqs.add((GridNearLockRequest)ioMsg.message());
-            }
-
-            super.sendMessage(node, msg, ackC);
-        }
-
-        /**
-         * @return Collected requests.
-         */
-        public Collection<GridNearLockRequest> requests() {
-            return reqs;
-        }
-
-        /**
-         *
-         */
-        public void clear() {
-            reqs.clear();
-        }
-    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/83b2bf5e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheStoreCollectionTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheStoreCollectionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheStoreCollectionTest.java
index 57d57ca..48acdfc 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheStoreCollectionTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheStoreCollectionTest.java
@@ -22,29 +22,41 @@ import java.util.Collections;
 import java.util.Map;
 import java.util.Set;
 import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 
 import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
 import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
 
 /**
  *
  */
 public class IgniteCacheStoreCollectionTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(gridName);
 
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
         CacheConfiguration<Object, Object> ccfg1 = new CacheConfiguration<>();
         ccfg1.setName("cache1");
         ccfg1.setAtomicityMode(ATOMIC);
+        ccfg1.setWriteSynchronizationMode(FULL_SYNC);
 
         CacheConfiguration<Object, Object> ccfg2 = new CacheConfiguration<>();
         ccfg2.setName("cache2");
         ccfg2.setAtomicityMode(TRANSACTIONAL);
+        ccfg2.setWriteSynchronizationMode(FULL_SYNC);
 
         cfg.setCacheConfiguration(ccfg1, ccfg2);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/83b2bf5e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartNoExchangeTimeoutTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartNoExchangeTimeoutTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartNoExchangeTimeoutTest.java
index 9acc4b5..ac80d69 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartNoExchangeTimeoutTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartNoExchangeTimeoutTest.java
@@ -46,6 +46,7 @@ import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
 import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 
 /**
  *
@@ -344,6 +345,7 @@ public class IgniteDynamicCacheStartNoExchangeTimeoutTest extends GridCommonAbst
             ccfg.setName("cache-1");
             ccfg.setAtomicityMode(ATOMIC);
             ccfg.setBackups(0);
+            ccfg.setWriteSynchronizationMode(FULL_SYNC);
 
             res.add(ccfg);
         }
@@ -354,6 +356,7 @@ public class IgniteDynamicCacheStartNoExchangeTimeoutTest extends GridCommonAbst
             ccfg.setName("cache-2");
             ccfg.setAtomicityMode(ATOMIC);
             ccfg.setBackups(1);
+            ccfg.setWriteSynchronizationMode(FULL_SYNC);
 
             res.add(ccfg);
         }
@@ -365,6 +368,7 @@ public class IgniteDynamicCacheStartNoExchangeTimeoutTest extends GridCommonAbst
             ccfg.setAtomicityMode(ATOMIC);
             ccfg.setBackups(1);
             ccfg.setAffinity(new FairAffinityFunction());
+            ccfg.setWriteSynchronizationMode(FULL_SYNC);
 
             res.add(ccfg);
         }
@@ -375,6 +379,7 @@ public class IgniteDynamicCacheStartNoExchangeTimeoutTest extends GridCommonAbst
             ccfg.setName("cache-4");
             ccfg.setAtomicityMode(TRANSACTIONAL);
             ccfg.setBackups(0);
+            ccfg.setWriteSynchronizationMode(FULL_SYNC);
 
             res.add(ccfg);
         }
@@ -385,6 +390,7 @@ public class IgniteDynamicCacheStartNoExchangeTimeoutTest extends GridCommonAbst
             ccfg.setName("cache-5");
             ccfg.setAtomicityMode(TRANSACTIONAL);
             ccfg.setBackups(1);
+            ccfg.setWriteSynchronizationMode(FULL_SYNC);
 
             res.add(ccfg);
         }
@@ -396,6 +402,7 @@ public class IgniteDynamicCacheStartNoExchangeTimeoutTest extends GridCommonAbst
             ccfg.setAtomicityMode(TRANSACTIONAL);
             ccfg.setBackups(1);
             ccfg.setAffinity(new FairAffinityFunction());
+            ccfg.setWriteSynchronizationMode(FULL_SYNC);
 
             res.add(ccfg);
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/83b2bf5e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePartitionNotLoadedEventSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePartitionNotLoadedEventSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePartitionNotLoadedEventSelfTest.java
index 5bc779c..6a42752 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePartitionNotLoadedEventSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePartitionNotLoadedEventSelfTest.java
@@ -22,7 +22,6 @@ import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 import org.apache.ignite.IgniteCache;
-import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.affinity.Affinity;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
@@ -42,6 +41,9 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.apache.ignite.util.TestTcpCommunicationSpi;
 import org.eclipse.jetty.util.ConcurrentHashSet;
 
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
 /**
  *
  */
@@ -76,8 +78,9 @@ public class GridCachePartitionNotLoadedEventSelfTest extends GridCommonAbstract
 
         CacheConfiguration<Integer, Integer> cacheCfg = new CacheConfiguration<>();
 
-        cacheCfg.setCacheMode(CacheMode.PARTITIONED);
+        cacheCfg.setCacheMode(PARTITIONED);
         cacheCfg.setBackups(backupCnt);
+        cacheCfg.setWriteSynchronizationMode(FULL_SYNC);
 
         cfg.setCacheConfiguration(cacheCfg);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/83b2bf5e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheAtomicNodeRestartTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheAtomicNodeRestartTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheAtomicNodeRestartTest.java
index 327db0e..37ed866 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheAtomicNodeRestartTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheAtomicNodeRestartTest.java
@@ -31,10 +31,12 @@ public class IgniteCacheAtomicNodeRestartTest extends GridCachePartitionedNodeRe
         return ATOMIC;
     }
 
+    /** {@inheritDoc} */
     @Override public void testRestartWithPutFourNodesNoBackups() {
         fail("https://issues.apache.org/jira/browse/IGNITE-1587");
     }
 
+    /** {@inheritDoc} */
     @Override public void testRestartWithPutFourNodesOneBackupsOffheapTiered() {
         fail("https://issues.apache.org/jira/browse/IGNITE-1587");
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/83b2bf5e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
index e7657a6..13f2598 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
@@ -2010,7 +2010,7 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
         private List<Object> recordedMsgs = new ArrayList<>();
 
         /** {@inheritDoc} */
-        @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackClosure)
+        @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC)
             throws IgniteSpiException {
             if (msg instanceof GridIoMessage) {
                 Object msg0 = ((GridIoMessage)msg).message();
@@ -2032,7 +2032,7 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
                 }
             }
 
-            super.sendMessage(node, msg, ackClosure);
+            super.sendMessage(node, msg, ackC);
         }
 
         /**