You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/07/13 10:21:40 UTC

[1/2] ignite git commit: ignite-5578 Affinity for local join

Repository: ignite
Updated Branches:
  refs/heads/ignite-5578-locJoin 84d10aeb7 -> b5319a02a


ignite-5578 Affinity for local join


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d4bd1ee8
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d4bd1ee8
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d4bd1ee8

Branch: refs/heads/ignite-5578-locJoin
Commit: d4bd1ee8e9ec74ab8c4ddbe48509e526f4223632
Parents: 83c779b
Author: sboikov <sb...@gridgain.com>
Authored: Thu Jul 13 11:51:43 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Jul 13 13:20:24 2017 +0300

----------------------------------------------------------------------
 .../cache/CacheAffinitySharedManager.java       |   8 +
 .../GridCachePartitionExchangeManager.java      |  32 +-
 .../dht/GridClientPartitionTopology.java        |   2 +-
 .../dht/GridDhtPartitionTopology.java           |   3 +-
 .../dht/GridDhtPartitionTopologyImpl.java       |   5 +-
 .../GridDhtPartitionsExchangeFuture.java        |   2 +-
 .../ignite/internal/util/GridListSet.java       |   8 +
 .../CacheLateAffinityAssignmentTest.java        |  39 +-
 .../distributed/CachePartitionStateTest.java    | 354 ++++++++++++++++++-
 .../TestCacheNodeExcludingFilter.java           |  53 +++
 10 files changed, 450 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/d4bd1ee8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 879e6a9a..8a293ae 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -517,6 +517,14 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
             }
         }
 
+        for (DynamicCacheDescriptor desc : startDescs) {
+            CacheGroupContext grp = cctx.cache().cacheGroup(desc.groupId());
+
+            assert grp != null;
+
+            grp.topology().onExchangeDone(grp.affinity().cachedAffinity(topVer), true);
+        }
+
         cctx.cache().initCacheProxies(topVer, null);
 
         cctx.cache().completeClientCacheChangeFuture(msg.requestId(), null);

http://git-wip-us.apache.org/repos/asf/ignite/blob/d4bd1ee8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 51214e3..06f336e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -28,6 +28,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.ListIterator;
 import java.util.Map;
 import java.util.NavigableMap;
 import java.util.TreeMap;
@@ -1438,19 +1439,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
             return;
 
         try {
-            List<GridDhtPartitionsExchangeFuture> futs = exchangeFutures();
-
-            GridDhtPartitionsExchangeFuture fut = null;
-
-            for (int i = futs.size() - 1; i >= 0; i++) {
-                GridDhtPartitionsExchangeFuture fut0 = futs.get(i);
-
-                if (fut0.exchangeId().equals(msg.exchangeId())) {
-                    fut = fut0;
-
-                    break;
-                }
-            }
+            GridDhtPartitionsExchangeFuture fut = exchFuts.find(msg.exchangeId());
 
             if (fut != null)
                 fut.processSinglePartitionRequest(node, msg);
@@ -2266,6 +2255,23 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
             return super.values();
         }
 
+        /**
+         * @param exchangeId Exchange ID.
+         * @return Future.
+         */
+        public synchronized GridDhtPartitionsExchangeFuture find(GridDhtPartitionExchangeId exchangeId) {
+            ListIterator<GridDhtPartitionsExchangeFuture> it = listIterator(size() - 1);
+
+            while (it.hasPrevious()) {
+                GridDhtPartitionsExchangeFuture fut0 = it.previous();
+
+                if (fut0.exchangeId().equals(exchangeId))
+                    return fut0;
+            }
+
+            return null;
+        }
+
         /** {@inheritDoc} */
         @Override public synchronized String toString() {
             return S.toString(ExchangeFutureSet.class, this, super.toString());

http://git-wip-us.apache.org/repos/asf/ignite/blob/d4bd1ee8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index c1a465d..232ce38 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -835,7 +835,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
-    @Override public void onExchangeDone(AffinityAssignment assignment) {
+    @Override public void onExchangeDone(AffinityAssignment assignment, boolean updateRebalanceVer) {
         // No-op.
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/d4bd1ee8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
index caf3936..d9e04a6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
@@ -340,6 +340,7 @@ public interface GridDhtPartitionTopology {
      * Callback on exchange done.
      *
      * @param assignment New affinity assignment.
+     * @param updateRebalanceVer {@code True} if need check rebalance state.
      */
-    public void onExchangeDone(AffinityAssignment assignment);
+    public void onExchangeDone(AffinityAssignment assignment, boolean updateRebalanceVer);
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/d4bd1ee8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 9d16d90..5ef499c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -1509,12 +1509,15 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
-    @Override public void onExchangeDone(AffinityAssignment assignment) {
+    @Override public void onExchangeDone(AffinityAssignment assignment, boolean updateRebalanceVer) {
         lock.writeLock().lock();
 
         try {
             if (assignment.topologyVersion().compareTo(diffFromAffinityVer) >= 0)
                 rebuildDiff(assignment);
+
+            if (updateRebalanceVer)
+                updateRebalanceVersion(assignment.assignment());
         }
         finally {
             lock.writeLock().unlock();

http://git-wip-us.apache.org/repos/asf/ignite/blob/d4bd1ee8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index a8d1589..513f950 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -1347,7 +1347,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
         if (err == null) {
             for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
                 if (!grp.isLocal())
-                    grp.topology().onExchangeDone(grp.affinity().cachedAffinity(topologyVersion()));
+                    grp.topology().onExchangeDone(grp.affinity().cachedAffinity(topologyVersion()), false);
             }
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/d4bd1ee8/modules/core/src/main/java/org/apache/ignite/internal/util/GridListSet.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridListSet.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridListSet.java
index 6226bd2..1a632b0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridListSet.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridListSet.java
@@ -373,6 +373,14 @@ public class GridListSet<V> extends GridSerializableSet<V> implements Cloneable
         return vals.iterator();
     }
 
+    /**
+     * @param idx Start index.
+     * @return List iterator.
+     */
+    public ListIterator<V> listIterator(int idx) {
+        return vals.listIterator(idx);
+    }
+
     /** {@inheritDoc} */
     @Override public int size() {
         return vals.size();

http://git-wip-us.apache.org/repos/asf/ignite/blob/d4bd1ee8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
index 840dda1..a1a01e1 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
@@ -100,7 +100,6 @@ import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
 import static org.apache.ignite.cache.CacheRebalanceMode.ASYNC;
 import static org.apache.ignite.cache.CacheRebalanceMode.SYNC;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
-import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME;
 
 /**
  *
@@ -332,7 +331,7 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest {
             }
         };
 
-        cacheNodeFilter = new CacheNodeFilter(F.asList(getTestIgniteInstanceName(0)));
+        cacheNodeFilter = new TestCacheNodeExcludingFilter(F.asList(getTestIgniteInstanceName(0)));
 
         testAffinitySimpleSequentialStart();
 
@@ -352,7 +351,7 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest {
             }
         };
 
-        cacheNodeFilter = new CacheNodeFilter(F.asList(getTestIgniteInstanceName(1)));
+        cacheNodeFilter = new TestCacheNodeExcludingFilter(F.asList(getTestIgniteInstanceName(1)));
 
         startServer(0, 1);
 
@@ -392,7 +391,7 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest {
             }
         };
 
-        cacheNodeFilter = new CacheNodeFilter(F.asList(getTestIgniteInstanceName(1), getTestIgniteInstanceName(2)));
+        cacheNodeFilter = new TestCacheNodeExcludingFilter(F.asList(getTestIgniteInstanceName(1), getTestIgniteInstanceName(2)));
 
         startServer(0, 1);
         startServer(1, 2);
@@ -440,7 +439,7 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest {
             }
         };
 
-        cacheNodeFilter = new CacheNodeFilter(F.asList(getTestIgniteInstanceName(0)));
+        cacheNodeFilter = new TestCacheNodeExcludingFilter(F.asList(getTestIgniteInstanceName(0)));
 
         Ignite ignite0 = startServer(0, 1);
 
@@ -468,7 +467,7 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest {
             }
         };
 
-        cacheNodeFilter = new CacheNodeFilter(F.asList(getTestIgniteInstanceName(0)));
+        cacheNodeFilter = new TestCacheNodeExcludingFilter(F.asList(getTestIgniteInstanceName(0)));
 
         Ignite ignite0 = startServer(0, 1);
 
@@ -521,7 +520,7 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest {
      */
     private void cacheDestroyAndCreate(boolean cacheOnCrd) throws Exception {
         if (!cacheOnCrd)
-            cacheNodeFilter = new CacheNodeFilter(Collections.singletonList(getTestIgniteInstanceName(0)));
+            cacheNodeFilter = new TestCacheNodeExcludingFilter(Collections.singletonList(getTestIgniteInstanceName(0)));
 
         startServer(0, 1);
 
@@ -1904,7 +1903,7 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest {
         startServer(0, 1);
 
         cacheC = null;
-        cacheNodeFilter = new CacheNodeFilter(Collections.singletonList(getTestIgniteInstanceName(0)));
+        cacheNodeFilter = new TestCacheNodeExcludingFilter(Collections.singletonList(getTestIgniteInstanceName(0)));
 
         startServer(1, 2);
 
@@ -2093,7 +2092,7 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest {
                     exclude.add("server-" + (srvIdx + rnd.nextInt(10)));
             }
 
-            ccfg.setNodeFilter(new CacheNodeFilter(exclude));
+            ccfg.setNodeFilter(new TestCacheNodeExcludingFilter(exclude));
         }
 
         ccfg.setName(name);
@@ -2669,28 +2668,6 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest {
     /**
      *
      */
-    static class CacheNodeFilter implements IgnitePredicate<ClusterNode> {
-        /** */
-        private Collection<String> excludeNodes;
-
-        /**
-         * @param excludeNodes Nodes names.
-         */
-        public CacheNodeFilter(Collection<String> excludeNodes) {
-            this.excludeNodes = excludeNodes;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean apply(ClusterNode clusterNode) {
-            String name = clusterNode.attribute(ATTR_IGNITE_INSTANCE_NAME).toString();
-
-            return !excludeNodes.contains(name);
-        }
-    }
-
-    /**
-     *
-     */
     static class TestTcpDiscoverySpi extends TcpDiscoverySpi {
         /** */
         private boolean blockCustomEvt;

http://git-wip-us.apache.org/repos/asf/ignite/blob/d4bd1ee8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CachePartitionStateTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CachePartitionStateTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CachePartitionStateTest.java
index c4f3f4a..c64ed0b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CachePartitionStateTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CachePartitionStateTest.java
@@ -17,10 +17,24 @@
 
 package org.apache.ignite.internal.processors.cache.distributed;
 
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.cache.affinity.Affinity;
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
@@ -28,6 +42,11 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.EVICTED;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.MOVING;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING;
+
 /**
  *
  */
@@ -35,22 +54,321 @@ public class CachePartitionStateTest extends GridCommonAbstractTest {
     /** */
     private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
 
+    /** */
+    private boolean client;
+
+    /** */
+    private CacheConfiguration ccfg;
+
     /** {@inheritDoc} */
-    @Override protected IgniteConfiguration getConfiguration() throws Exception {
-        IgniteConfiguration cfg = super.getConfiguration();
+    protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
 
         ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
 
         cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
 
+        cfg.setClientMode(client);
+
+        if (ccfg != null) {
+            cfg.setCacheConfiguration(ccfg);
+
+            ccfg = null;
+        }
+
         return cfg;
     }
 
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+
+        super.afterTest();
+    }
+
     /**
      * @throws Exception If failed.
      */
-    public void testPartitionState1() throws Exception {
+    public void testPartitionState1_1() throws Exception {
+        partitionState1(0, true);
+    }
 
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPartitionState1_2() throws Exception {
+        partitionState1(1, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPartitionState1_2_NoCacheOnCoordinator() throws Exception {
+        partitionState1(1, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPartitionState1_3() throws Exception {
+        partitionState1(100, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPartitionState2_1() throws Exception {
+        partitionState2(0, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPartitionState2_2() throws Exception {
+        partitionState2(1, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPartitionState2_2_NoCacheOnCoordinator() throws Exception {
+        partitionState2(1, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPartitionState2_3() throws Exception {
+        partitionState2(100, true);
+    }
+
+    /**
+     * @param backups Number of backups.
+     * @param crdAffNode If {@code false} cache is not created on coordinator.
+     * @throws Exception If failed.
+     */
+    private void partitionState1(int backups, boolean crdAffNode) throws Exception {
+        startGrids(3);
+
+        blockSupplySend(DEFAULT_CACHE_NAME);
+
+        CacheConfiguration ccfg = cacheConfiguration(DEFAULT_CACHE_NAME, backups);
+
+        if (!crdAffNode)
+            ccfg.setNodeFilter(new TestCacheNodeExcludingFilter(getTestIgniteInstanceName(0)));
+
+        ignite(1).createCache(ccfg);
+
+        AffinityAssignment assign0 =
+            grid(1).context().cache().internalCache(DEFAULT_CACHE_NAME).context().affinity().assignment(
+                new AffinityTopologyVersion(3, 1));
+
+        awaitPartitionMapExchange();
+
+        checkPartitionsState(assign0, DEFAULT_CACHE_NAME, OWNING);
+
+        checkRebalance(DEFAULT_CACHE_NAME, true);
+
+        client = true;
+
+        Ignite clientNode = startGrid(4);
+
+        checkPartitionsState(assign0, DEFAULT_CACHE_NAME, OWNING);
+
+        clientNode.cache(DEFAULT_CACHE_NAME);
+
+        checkPartitionsState(assign0, DEFAULT_CACHE_NAME, OWNING);
+
+        checkRebalance(DEFAULT_CACHE_NAME, true);
+
+        client = false;
+
+        startGrid(5);
+
+        checkRebalance(DEFAULT_CACHE_NAME, false);
+
+        for (int i = 0; i < 3; i++)
+            checkNodePartitions(assign0, ignite(i).cluster().localNode(), DEFAULT_CACHE_NAME, OWNING);
+
+        AffinityAssignment assign1 =
+            grid(1).context().cache().internalCache(DEFAULT_CACHE_NAME).context().affinity().assignment(
+                new AffinityTopologyVersion(5, 0));
+
+        checkNodePartitions(assign1, ignite(5).cluster().localNode(), DEFAULT_CACHE_NAME, MOVING);
+
+        stopBlock();
+
+        awaitPartitionMapExchange();
+
+        AffinityAssignment assign2 =
+            grid(1).context().cache().internalCache(DEFAULT_CACHE_NAME).context().affinity().assignment(
+                new AffinityTopologyVersion(5, 1));
+
+        checkPartitionsState(assign2, DEFAULT_CACHE_NAME, OWNING);
+
+        checkRebalance(DEFAULT_CACHE_NAME, true);
+
+        if (!crdAffNode)
+            ignite(0).cache(DEFAULT_CACHE_NAME);
+
+        checkPartitionsState(assign2, DEFAULT_CACHE_NAME, OWNING);
+
+        checkRebalance(DEFAULT_CACHE_NAME, true);
+
+        startGrid(6);
+
+        awaitPartitionMapExchange();
+
+        AffinityAssignment assign3 =
+            grid(1).context().cache().internalCache(DEFAULT_CACHE_NAME).context().affinity().assignment(
+                new AffinityTopologyVersion(6, 1));
+
+        checkPartitionsState(assign3, DEFAULT_CACHE_NAME, OWNING);
+
+        checkRebalance(DEFAULT_CACHE_NAME, true);
+    }
+
+    /**
+     * @param backups Number of backups.
+     * @param crdAffNode If {@code false} cache is not created on coordinator.
+     * @throws Exception If failed.
+     */
+    private void partitionState2(int backups, boolean crdAffNode) throws Exception {
+        startGrids(3);
+
+        blockSupplySend(DEFAULT_CACHE_NAME);
+
+        ccfg = cacheConfiguration(DEFAULT_CACHE_NAME, backups);
+
+        if (!crdAffNode)
+            ccfg.setNodeFilter(new TestCacheNodeExcludingFilter(getTestIgniteInstanceName(0)));
+
+        startGrid(4);
+
+        AffinityAssignment assign0 =
+            grid(1).context().cache().internalCache(DEFAULT_CACHE_NAME).context().affinity().assignment(
+                new AffinityTopologyVersion(4, 0));
+
+        checkPartitionsState(assign0, DEFAULT_CACHE_NAME, OWNING);
+
+        checkRebalance(DEFAULT_CACHE_NAME, true);
+
+        if (!crdAffNode)
+            ignite(0).cache(DEFAULT_CACHE_NAME);
+
+        checkPartitionsState(assign0, DEFAULT_CACHE_NAME, OWNING);
+
+        checkRebalance(DEFAULT_CACHE_NAME, true);
+
+        stopBlock();
+
+        startGrid(5);
+
+        AffinityAssignment assign1 =
+            grid(1).context().cache().internalCache(DEFAULT_CACHE_NAME).context().affinity().assignment(
+                new AffinityTopologyVersion(5, 1));
+
+        awaitPartitionMapExchange();
+
+        checkPartitionsState(assign1, DEFAULT_CACHE_NAME, OWNING);
+
+        checkRebalance(DEFAULT_CACHE_NAME, true);
+    }
+
+    /**
+     * @param assign Assignments.
+     * @param cacheName Cache name.
+     * @param expState Expected state.
+     */
+    private void checkPartitionsState(AffinityAssignment assign, String cacheName, GridDhtPartitionState expState) {
+        for (Ignite node : G.allGrids())
+            checkNodePartitions(assign, node.cluster().localNode(), cacheName, expState);
+    }
+
+    /**
+     * @param assign Assignments.
+     * @param clusterNode Node.
+     * @param cacheName Cache name.
+     * @param expState Expected partitions state.
+     */
+    private void checkNodePartitions(AffinityAssignment assign,
+        ClusterNode clusterNode,
+        String cacheName,
+        GridDhtPartitionState expState)
+    {
+        Affinity<Object> aff = ignite(0).affinity(cacheName);
+
+        Set<Integer> nodeParts = new HashSet<>();
+
+        nodeParts.addAll(assign.primaryPartitions(clusterNode.id()));
+        nodeParts.addAll(assign.backupPartitions(clusterNode.id()));
+
+        log.info("Test state [node=" + clusterNode.id() + ", parts=" + nodeParts.size() + ", state=" + expState + ']');
+
+        if (grid(0).context().discovery().cacheAffinityNode(clusterNode, cacheName))
+            assertFalse(nodeParts.isEmpty());
+
+        boolean check = false;
+
+        for (Ignite node : G.allGrids()) {
+            GridCacheAdapter cache =
+                ((IgniteKernal)node).context().cache().internalCache(cacheName);
+
+            if (cache != null) {
+                check = true;
+
+                GridDhtPartitionTopology top = cache.context().topology();
+
+                GridDhtPartitionMap partsMap = top.partitions(clusterNode.id());
+
+                for (int p = 0; p < aff.partitions(); p++) {
+                    if (nodeParts.contains(p)) {
+                        assertNotNull(partsMap);
+                        assertEquals(expState, partsMap.get(p));
+                    }
+                    else {
+                        if (partsMap != null) {
+                            GridDhtPartitionState state = partsMap.get(p);
+
+                            assertTrue("Unexpected state: " + state, state == null || state == EVICTED);
+                        }
+                    }
+                }
+            }
+            else {
+                assertEquals(0, aff.primaryPartitions(((IgniteKernal)node).localNode()).length);
+                assertEquals(0, aff.backupPartitions(((IgniteKernal)node).localNode()).length);
+            }
+        }
+
+        assertTrue(check);
+    }
+
+    /**
+     * @param cacheName Cache name.
+     * @param expDone Expected rebalance finish flag.
+     */
+    private void checkRebalance(String cacheName, boolean expDone) {
+        for (Ignite node : G.allGrids()) {
+            IgniteKernal node0 = (IgniteKernal)node;
+
+            GridCacheAdapter cache = node0.context().cache().internalCache(cacheName);
+
+            AffinityTopologyVersion topVer = node0.context().cache().context().exchange().readyAffinityVersion();
+
+            if (cache != null)
+                assertEquals(expDone, cache.context().topology().rebalanceFinished(topVer));
+            else
+                node0.context().discovery().cacheAffinityNode(node0.localNode(), cacheName);
+        }
+    }
+
+    /**
+     * @param cacheName Cache name.
+     */
+    private void blockSupplySend(String cacheName) {
+        for (Ignite node : G.allGrids())
+            blockSupplySend(TestRecordingCommunicationSpi.spi(node), cacheName);
     }
 
     /**
@@ -58,15 +376,35 @@ public class CachePartitionStateTest extends GridCommonAbstractTest {
      * @param cacheName Cache name.
      */
     private void blockSupplySend(TestRecordingCommunicationSpi spi, final String cacheName) {
-        final int grpId = groupIdForCache(spi.ignite(), cacheName);
+        final int grpId = CU.cacheId(cacheName);
 
         spi.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() {
             @Override public boolean apply(ClusterNode node, Message msg) {
-                if (!msg.getClass().equals(GridDhtPartitionSupplyMessage.class))
-                    return false;
-
-                return ((GridDhtPartitionSupplyMessage)msg).groupId() == grpId;
+                return msg.getClass().equals(GridDhtPartitionSupplyMessage.class) &&
+                    ((GridDhtPartitionSupplyMessage)msg).groupId() == grpId;
             }
         });
     }
+
+    /**
+     *
+     */
+    private void stopBlock() {
+        for (Ignite node : G.allGrids())
+            TestRecordingCommunicationSpi.spi(node).stopBlock();
+    }
+
+    /**
+     * @param name Cache name.
+     * @param backups Backups number.
+     * @return Cache configuration.
+     */
+    private CacheConfiguration cacheConfiguration(String name, int backups) {
+        CacheConfiguration ccfg = new CacheConfiguration(name);
+
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+        ccfg.setBackups(backups);
+
+        return ccfg;
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/d4bd1ee8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/TestCacheNodeExcludingFilter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/TestCacheNodeExcludingFilter.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/TestCacheNodeExcludingFilter.java
new file mode 100644
index 0000000..a3f7d27
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/TestCacheNodeExcludingFilter.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import java.util.Arrays;
+import java.util.Collection;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.lang.IgnitePredicate;
+
+import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME;
+
+/**
+ *
+ */
+public class TestCacheNodeExcludingFilter implements IgnitePredicate<ClusterNode> {
+    /** */
+    private Collection<String> excludeNodes;
+
+    /**
+     * @param excludeNodes Nodes names.
+     */
+    public TestCacheNodeExcludingFilter(Collection<String> excludeNodes) {
+        this.excludeNodes = excludeNodes;
+    }
+    /**
+     * @param excludeNodes Nodes names.
+     */
+    public TestCacheNodeExcludingFilter(String... excludeNodes) {
+        this.excludeNodes = Arrays.asList(excludeNodes);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean apply(ClusterNode clusterNode) {
+        String name = clusterNode.attribute(ATTR_IGNITE_INSTANCE_NAME).toString();
+
+        return !excludeNodes.contains(name);
+    }
+}


[2/2] ignite git commit: Merge remote-tracking branch 'origin/ignite-5578-locJoin' into ignite-5578-locJoin

Posted by sb...@apache.org.
Merge remote-tracking branch 'origin/ignite-5578-locJoin' into ignite-5578-locJoin

# Conflicts:
#	modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CachePartitionStateTest.java


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b5319a02
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b5319a02
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b5319a02

Branch: refs/heads/ignite-5578-locJoin
Commit: b5319a02a5f247db3979b8d7e85ca736a7fae127
Parents: d4bd1ee 84d10ae
Author: sboikov <sb...@gridgain.com>
Authored: Thu Jul 13 13:21:16 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Jul 13 13:21:16 2017 +0300

----------------------------------------------------------------------

----------------------------------------------------------------------