You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/07/13 10:21:40 UTC
[1/2] ignite git commit: ignite-5578 Affinity for local join
Repository: ignite
Updated Branches:
refs/heads/ignite-5578-locJoin 84d10aeb7 -> b5319a02a
ignite-5578 Affinity for local join
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d4bd1ee8
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d4bd1ee8
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d4bd1ee8
Branch: refs/heads/ignite-5578-locJoin
Commit: d4bd1ee8e9ec74ab8c4ddbe48509e526f4223632
Parents: 83c779b
Author: sboikov <sb...@gridgain.com>
Authored: Thu Jul 13 11:51:43 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Jul 13 13:20:24 2017 +0300
----------------------------------------------------------------------
.../cache/CacheAffinitySharedManager.java | 8 +
.../GridCachePartitionExchangeManager.java | 32 +-
.../dht/GridClientPartitionTopology.java | 2 +-
.../dht/GridDhtPartitionTopology.java | 3 +-
.../dht/GridDhtPartitionTopologyImpl.java | 5 +-
.../GridDhtPartitionsExchangeFuture.java | 2 +-
.../ignite/internal/util/GridListSet.java | 8 +
.../CacheLateAffinityAssignmentTest.java | 39 +-
.../distributed/CachePartitionStateTest.java | 354 ++++++++++++++++++-
.../TestCacheNodeExcludingFilter.java | 53 +++
10 files changed, 450 insertions(+), 56 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/d4bd1ee8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 879e6a9a..8a293ae 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -517,6 +517,14 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
}
}
+ for (DynamicCacheDescriptor desc : startDescs) {
+ CacheGroupContext grp = cctx.cache().cacheGroup(desc.groupId());
+
+ assert grp != null;
+
+ grp.topology().onExchangeDone(grp.affinity().cachedAffinity(topVer), true);
+ }
+
cctx.cache().initCacheProxies(topVer, null);
cctx.cache().completeClientCacheChangeFuture(msg.requestId(), null);
http://git-wip-us.apache.org/repos/asf/ignite/blob/d4bd1ee8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 51214e3..06f336e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -28,6 +28,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
+import java.util.ListIterator;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
@@ -1438,19 +1439,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
return;
try {
- List<GridDhtPartitionsExchangeFuture> futs = exchangeFutures();
-
- GridDhtPartitionsExchangeFuture fut = null;
-
- for (int i = futs.size() - 1; i >= 0; i++) {
- GridDhtPartitionsExchangeFuture fut0 = futs.get(i);
-
- if (fut0.exchangeId().equals(msg.exchangeId())) {
- fut = fut0;
-
- break;
- }
- }
+ GridDhtPartitionsExchangeFuture fut = exchFuts.find(msg.exchangeId());
if (fut != null)
fut.processSinglePartitionRequest(node, msg);
@@ -2266,6 +2255,23 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
return super.values();
}
+ /**
+ * @param exchangeId Exchange ID.
+ * @return Future.
+ */
+ public synchronized GridDhtPartitionsExchangeFuture find(GridDhtPartitionExchangeId exchangeId) {
+ ListIterator<GridDhtPartitionsExchangeFuture> it = listIterator(size() - 1);
+
+ while (it.hasPrevious()) {
+ GridDhtPartitionsExchangeFuture fut0 = it.previous();
+
+ if (fut0.exchangeId().equals(exchangeId))
+ return fut0;
+ }
+
+ return null;
+ }
+
/** {@inheritDoc} */
@Override public synchronized String toString() {
return S.toString(ExchangeFutureSet.class, this, super.toString());
http://git-wip-us.apache.org/repos/asf/ignite/blob/d4bd1ee8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index c1a465d..232ce38 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -835,7 +835,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
}
/** {@inheritDoc} */
- @Override public void onExchangeDone(AffinityAssignment assignment) {
+ @Override public void onExchangeDone(AffinityAssignment assignment, boolean updateRebalanceVer) {
// No-op.
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/d4bd1ee8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
index caf3936..d9e04a6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
@@ -340,6 +340,7 @@ public interface GridDhtPartitionTopology {
* Callback on exchange done.
*
* @param assignment New affinity assignment.
+ * @param updateRebalanceVer {@code True} if need check rebalance state.
*/
- public void onExchangeDone(AffinityAssignment assignment);
+ public void onExchangeDone(AffinityAssignment assignment, boolean updateRebalanceVer);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/d4bd1ee8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 9d16d90..5ef499c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -1509,12 +1509,15 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
}
/** {@inheritDoc} */
- @Override public void onExchangeDone(AffinityAssignment assignment) {
+ @Override public void onExchangeDone(AffinityAssignment assignment, boolean updateRebalanceVer) {
lock.writeLock().lock();
try {
if (assignment.topologyVersion().compareTo(diffFromAffinityVer) >= 0)
rebuildDiff(assignment);
+
+ if (updateRebalanceVer)
+ updateRebalanceVersion(assignment.assignment());
}
finally {
lock.writeLock().unlock();
http://git-wip-us.apache.org/repos/asf/ignite/blob/d4bd1ee8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index a8d1589..513f950 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -1347,7 +1347,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
if (err == null) {
for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
if (!grp.isLocal())
- grp.topology().onExchangeDone(grp.affinity().cachedAffinity(topologyVersion()));
+ grp.topology().onExchangeDone(grp.affinity().cachedAffinity(topologyVersion()), false);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/d4bd1ee8/modules/core/src/main/java/org/apache/ignite/internal/util/GridListSet.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridListSet.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridListSet.java
index 6226bd2..1a632b0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridListSet.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridListSet.java
@@ -373,6 +373,14 @@ public class GridListSet<V> extends GridSerializableSet<V> implements Cloneable
return vals.iterator();
}
+ /**
+ * @param idx Start index.
+ * @return List iterator.
+ */
+ public ListIterator<V> listIterator(int idx) {
+ return vals.listIterator(idx);
+ }
+
/** {@inheritDoc} */
@Override public int size() {
return vals.size();
http://git-wip-us.apache.org/repos/asf/ignite/blob/d4bd1ee8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
index 840dda1..a1a01e1 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
@@ -100,7 +100,6 @@ import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheRebalanceMode.ASYNC;
import static org.apache.ignite.cache.CacheRebalanceMode.SYNC;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
-import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME;
/**
*
@@ -332,7 +331,7 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest {
}
};
- cacheNodeFilter = new CacheNodeFilter(F.asList(getTestIgniteInstanceName(0)));
+ cacheNodeFilter = new TestCacheNodeExcludingFilter(F.asList(getTestIgniteInstanceName(0)));
testAffinitySimpleSequentialStart();
@@ -352,7 +351,7 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest {
}
};
- cacheNodeFilter = new CacheNodeFilter(F.asList(getTestIgniteInstanceName(1)));
+ cacheNodeFilter = new TestCacheNodeExcludingFilter(F.asList(getTestIgniteInstanceName(1)));
startServer(0, 1);
@@ -392,7 +391,7 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest {
}
};
- cacheNodeFilter = new CacheNodeFilter(F.asList(getTestIgniteInstanceName(1), getTestIgniteInstanceName(2)));
+ cacheNodeFilter = new TestCacheNodeExcludingFilter(F.asList(getTestIgniteInstanceName(1), getTestIgniteInstanceName(2)));
startServer(0, 1);
startServer(1, 2);
@@ -440,7 +439,7 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest {
}
};
- cacheNodeFilter = new CacheNodeFilter(F.asList(getTestIgniteInstanceName(0)));
+ cacheNodeFilter = new TestCacheNodeExcludingFilter(F.asList(getTestIgniteInstanceName(0)));
Ignite ignite0 = startServer(0, 1);
@@ -468,7 +467,7 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest {
}
};
- cacheNodeFilter = new CacheNodeFilter(F.asList(getTestIgniteInstanceName(0)));
+ cacheNodeFilter = new TestCacheNodeExcludingFilter(F.asList(getTestIgniteInstanceName(0)));
Ignite ignite0 = startServer(0, 1);
@@ -521,7 +520,7 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest {
*/
private void cacheDestroyAndCreate(boolean cacheOnCrd) throws Exception {
if (!cacheOnCrd)
- cacheNodeFilter = new CacheNodeFilter(Collections.singletonList(getTestIgniteInstanceName(0)));
+ cacheNodeFilter = new TestCacheNodeExcludingFilter(Collections.singletonList(getTestIgniteInstanceName(0)));
startServer(0, 1);
@@ -1904,7 +1903,7 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest {
startServer(0, 1);
cacheC = null;
- cacheNodeFilter = new CacheNodeFilter(Collections.singletonList(getTestIgniteInstanceName(0)));
+ cacheNodeFilter = new TestCacheNodeExcludingFilter(Collections.singletonList(getTestIgniteInstanceName(0)));
startServer(1, 2);
@@ -2093,7 +2092,7 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest {
exclude.add("server-" + (srvIdx + rnd.nextInt(10)));
}
- ccfg.setNodeFilter(new CacheNodeFilter(exclude));
+ ccfg.setNodeFilter(new TestCacheNodeExcludingFilter(exclude));
}
ccfg.setName(name);
@@ -2669,28 +2668,6 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest {
/**
*
*/
- static class CacheNodeFilter implements IgnitePredicate<ClusterNode> {
- /** */
- private Collection<String> excludeNodes;
-
- /**
- * @param excludeNodes Nodes names.
- */
- public CacheNodeFilter(Collection<String> excludeNodes) {
- this.excludeNodes = excludeNodes;
- }
-
- /** {@inheritDoc} */
- @Override public boolean apply(ClusterNode clusterNode) {
- String name = clusterNode.attribute(ATTR_IGNITE_INSTANCE_NAME).toString();
-
- return !excludeNodes.contains(name);
- }
- }
-
- /**
- *
- */
static class TestTcpDiscoverySpi extends TcpDiscoverySpi {
/** */
private boolean blockCustomEvt;
http://git-wip-us.apache.org/repos/asf/ignite/blob/d4bd1ee8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CachePartitionStateTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CachePartitionStateTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CachePartitionStateTest.java
index c4f3f4a..c64ed0b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CachePartitionStateTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CachePartitionStateTest.java
@@ -17,10 +17,24 @@
package org.apache.ignite.internal.processors.cache.distributed;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.cache.affinity.Affinity;
import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
@@ -28,6 +42,11 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.EVICTED;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.MOVING;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING;
+
/**
*
*/
@@ -35,22 +54,321 @@ public class CachePartitionStateTest extends GridCommonAbstractTest {
/** */
private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+ /** */
+ private boolean client;
+
+ /** */
+ private CacheConfiguration ccfg;
+
/** {@inheritDoc} */
- @Override protected IgniteConfiguration getConfiguration() throws Exception {
- IgniteConfiguration cfg = super.getConfiguration();
+ protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
+ cfg.setClientMode(client);
+
+ if (ccfg != null) {
+ cfg.setCacheConfiguration(ccfg);
+
+ ccfg = null;
+ }
+
return cfg;
}
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+
+ super.afterTest();
+ }
+
/**
* @throws Exception If failed.
*/
- public void testPartitionState1() throws Exception {
+ public void testPartitionState1_1() throws Exception {
+ partitionState1(0, true);
+ }
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPartitionState1_2() throws Exception {
+ partitionState1(1, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPartitionState1_2_NoCacheOnCoordinator() throws Exception {
+ partitionState1(1, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPartitionState1_3() throws Exception {
+ partitionState1(100, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPartitionState2_1() throws Exception {
+ partitionState2(0, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPartitionState2_2() throws Exception {
+ partitionState2(1, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPartitionState2_2_NoCacheOnCoordinator() throws Exception {
+ partitionState2(1, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPartitionState2_3() throws Exception {
+ partitionState2(100, true);
+ }
+
+ /**
+ * @param backups Number of backups.
+ * @param crdAffNode If {@code false} cache is not created on coordinator.
+ * @throws Exception If failed.
+ */
+ private void partitionState1(int backups, boolean crdAffNode) throws Exception {
+ startGrids(3);
+
+ blockSupplySend(DEFAULT_CACHE_NAME);
+
+ CacheConfiguration ccfg = cacheConfiguration(DEFAULT_CACHE_NAME, backups);
+
+ if (!crdAffNode)
+ ccfg.setNodeFilter(new TestCacheNodeExcludingFilter(getTestIgniteInstanceName(0)));
+
+ ignite(1).createCache(ccfg);
+
+ AffinityAssignment assign0 =
+ grid(1).context().cache().internalCache(DEFAULT_CACHE_NAME).context().affinity().assignment(
+ new AffinityTopologyVersion(3, 1));
+
+ awaitPartitionMapExchange();
+
+ checkPartitionsState(assign0, DEFAULT_CACHE_NAME, OWNING);
+
+ checkRebalance(DEFAULT_CACHE_NAME, true);
+
+ client = true;
+
+ Ignite clientNode = startGrid(4);
+
+ checkPartitionsState(assign0, DEFAULT_CACHE_NAME, OWNING);
+
+ clientNode.cache(DEFAULT_CACHE_NAME);
+
+ checkPartitionsState(assign0, DEFAULT_CACHE_NAME, OWNING);
+
+ checkRebalance(DEFAULT_CACHE_NAME, true);
+
+ client = false;
+
+ startGrid(5);
+
+ checkRebalance(DEFAULT_CACHE_NAME, false);
+
+ for (int i = 0; i < 3; i++)
+ checkNodePartitions(assign0, ignite(i).cluster().localNode(), DEFAULT_CACHE_NAME, OWNING);
+
+ AffinityAssignment assign1 =
+ grid(1).context().cache().internalCache(DEFAULT_CACHE_NAME).context().affinity().assignment(
+ new AffinityTopologyVersion(5, 0));
+
+ checkNodePartitions(assign1, ignite(5).cluster().localNode(), DEFAULT_CACHE_NAME, MOVING);
+
+ stopBlock();
+
+ awaitPartitionMapExchange();
+
+ AffinityAssignment assign2 =
+ grid(1).context().cache().internalCache(DEFAULT_CACHE_NAME).context().affinity().assignment(
+ new AffinityTopologyVersion(5, 1));
+
+ checkPartitionsState(assign2, DEFAULT_CACHE_NAME, OWNING);
+
+ checkRebalance(DEFAULT_CACHE_NAME, true);
+
+ if (!crdAffNode)
+ ignite(0).cache(DEFAULT_CACHE_NAME);
+
+ checkPartitionsState(assign2, DEFAULT_CACHE_NAME, OWNING);
+
+ checkRebalance(DEFAULT_CACHE_NAME, true);
+
+ startGrid(6);
+
+ awaitPartitionMapExchange();
+
+ AffinityAssignment assign3 =
+ grid(1).context().cache().internalCache(DEFAULT_CACHE_NAME).context().affinity().assignment(
+ new AffinityTopologyVersion(6, 1));
+
+ checkPartitionsState(assign3, DEFAULT_CACHE_NAME, OWNING);
+
+ checkRebalance(DEFAULT_CACHE_NAME, true);
+ }
+
+ /**
+ * @param backups Number of backups.
+ * @param crdAffNode If {@code false} cache is not created on coordinator.
+ * @throws Exception If failed.
+ */
+ private void partitionState2(int backups, boolean crdAffNode) throws Exception {
+ startGrids(3);
+
+ blockSupplySend(DEFAULT_CACHE_NAME);
+
+ ccfg = cacheConfiguration(DEFAULT_CACHE_NAME, backups);
+
+ if (!crdAffNode)
+ ccfg.setNodeFilter(new TestCacheNodeExcludingFilter(getTestIgniteInstanceName(0)));
+
+ startGrid(4);
+
+ AffinityAssignment assign0 =
+ grid(1).context().cache().internalCache(DEFAULT_CACHE_NAME).context().affinity().assignment(
+ new AffinityTopologyVersion(4, 0));
+
+ checkPartitionsState(assign0, DEFAULT_CACHE_NAME, OWNING);
+
+ checkRebalance(DEFAULT_CACHE_NAME, true);
+
+ if (!crdAffNode)
+ ignite(0).cache(DEFAULT_CACHE_NAME);
+
+ checkPartitionsState(assign0, DEFAULT_CACHE_NAME, OWNING);
+
+ checkRebalance(DEFAULT_CACHE_NAME, true);
+
+ stopBlock();
+
+ startGrid(5);
+
+ AffinityAssignment assign1 =
+ grid(1).context().cache().internalCache(DEFAULT_CACHE_NAME).context().affinity().assignment(
+ new AffinityTopologyVersion(5, 1));
+
+ awaitPartitionMapExchange();
+
+ checkPartitionsState(assign1, DEFAULT_CACHE_NAME, OWNING);
+
+ checkRebalance(DEFAULT_CACHE_NAME, true);
+ }
+
+ /**
+ * @param assign Assignments.
+ * @param cacheName Cache name.
+ * @param expState Expected state.
+ */
+ private void checkPartitionsState(AffinityAssignment assign, String cacheName, GridDhtPartitionState expState) {
+ for (Ignite node : G.allGrids())
+ checkNodePartitions(assign, node.cluster().localNode(), cacheName, expState);
+ }
+
+ /**
+ * @param assign Assignments.
+ * @param clusterNode Node.
+ * @param cacheName Cache name.
+ * @param expState Expected partitions state.
+ */
+ private void checkNodePartitions(AffinityAssignment assign,
+ ClusterNode clusterNode,
+ String cacheName,
+ GridDhtPartitionState expState)
+ {
+ Affinity<Object> aff = ignite(0).affinity(cacheName);
+
+ Set<Integer> nodeParts = new HashSet<>();
+
+ nodeParts.addAll(assign.primaryPartitions(clusterNode.id()));
+ nodeParts.addAll(assign.backupPartitions(clusterNode.id()));
+
+ log.info("Test state [node=" + clusterNode.id() + ", parts=" + nodeParts.size() + ", state=" + expState + ']');
+
+ if (grid(0).context().discovery().cacheAffinityNode(clusterNode, cacheName))
+ assertFalse(nodeParts.isEmpty());
+
+ boolean check = false;
+
+ for (Ignite node : G.allGrids()) {
+ GridCacheAdapter cache =
+ ((IgniteKernal)node).context().cache().internalCache(cacheName);
+
+ if (cache != null) {
+ check = true;
+
+ GridDhtPartitionTopology top = cache.context().topology();
+
+ GridDhtPartitionMap partsMap = top.partitions(clusterNode.id());
+
+ for (int p = 0; p < aff.partitions(); p++) {
+ if (nodeParts.contains(p)) {
+ assertNotNull(partsMap);
+ assertEquals(expState, partsMap.get(p));
+ }
+ else {
+ if (partsMap != null) {
+ GridDhtPartitionState state = partsMap.get(p);
+
+ assertTrue("Unexpected state: " + state, state == null || state == EVICTED);
+ }
+ }
+ }
+ }
+ else {
+ assertEquals(0, aff.primaryPartitions(((IgniteKernal)node).localNode()).length);
+ assertEquals(0, aff.backupPartitions(((IgniteKernal)node).localNode()).length);
+ }
+ }
+
+ assertTrue(check);
+ }
+
+ /**
+ * @param cacheName Cache name.
+ * @param expDone Expected rebalance finish flag.
+ */
+ private void checkRebalance(String cacheName, boolean expDone) {
+ for (Ignite node : G.allGrids()) {
+ IgniteKernal node0 = (IgniteKernal)node;
+
+ GridCacheAdapter cache = node0.context().cache().internalCache(cacheName);
+
+ AffinityTopologyVersion topVer = node0.context().cache().context().exchange().readyAffinityVersion();
+
+ if (cache != null)
+ assertEquals(expDone, cache.context().topology().rebalanceFinished(topVer));
+ else
+ node0.context().discovery().cacheAffinityNode(node0.localNode(), cacheName);
+ }
+ }
+
+ /**
+ * @param cacheName Cache name.
+ */
+ private void blockSupplySend(String cacheName) {
+ for (Ignite node : G.allGrids())
+ blockSupplySend(TestRecordingCommunicationSpi.spi(node), cacheName);
}
/**
@@ -58,15 +376,35 @@ public class CachePartitionStateTest extends GridCommonAbstractTest {
* @param cacheName Cache name.
*/
private void blockSupplySend(TestRecordingCommunicationSpi spi, final String cacheName) {
- final int grpId = groupIdForCache(spi.ignite(), cacheName);
+ final int grpId = CU.cacheId(cacheName);
spi.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() {
@Override public boolean apply(ClusterNode node, Message msg) {
- if (!msg.getClass().equals(GridDhtPartitionSupplyMessage.class))
- return false;
-
- return ((GridDhtPartitionSupplyMessage)msg).groupId() == grpId;
+ return msg.getClass().equals(GridDhtPartitionSupplyMessage.class) &&
+ ((GridDhtPartitionSupplyMessage)msg).groupId() == grpId;
}
});
}
+
+ /**
+ *
+ */
+ private void stopBlock() {
+ for (Ignite node : G.allGrids())
+ TestRecordingCommunicationSpi.spi(node).stopBlock();
+ }
+
+ /**
+ * @param name Cache name.
+ * @param backups Backups number.
+ * @return Cache configuration.
+ */
+ private CacheConfiguration cacheConfiguration(String name, int backups) {
+ CacheConfiguration ccfg = new CacheConfiguration(name);
+
+ ccfg.setWriteSynchronizationMode(FULL_SYNC);
+ ccfg.setBackups(backups);
+
+ return ccfg;
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/d4bd1ee8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/TestCacheNodeExcludingFilter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/TestCacheNodeExcludingFilter.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/TestCacheNodeExcludingFilter.java
new file mode 100644
index 0000000..a3f7d27
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/TestCacheNodeExcludingFilter.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import java.util.Arrays;
+import java.util.Collection;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.lang.IgnitePredicate;
+
+import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME;
+
+/**
+ *
+ */
+public class TestCacheNodeExcludingFilter implements IgnitePredicate<ClusterNode> {
+ /** */
+ private Collection<String> excludeNodes;
+
+ /**
+ * @param excludeNodes Nodes names.
+ */
+ public TestCacheNodeExcludingFilter(Collection<String> excludeNodes) {
+ this.excludeNodes = excludeNodes;
+ }
+ /**
+ * @param excludeNodes Nodes names.
+ */
+ public TestCacheNodeExcludingFilter(String... excludeNodes) {
+ this.excludeNodes = Arrays.asList(excludeNodes);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean apply(ClusterNode clusterNode) {
+ String name = clusterNode.attribute(ATTR_IGNITE_INSTANCE_NAME).toString();
+
+ return !excludeNodes.contains(name);
+ }
+}
[2/2] ignite git commit: Merge remote-tracking branch
'origin/ignite-5578-locJoin' into ignite-5578-locJoin
Posted by sb...@apache.org.
Merge remote-tracking branch 'origin/ignite-5578-locJoin' into ignite-5578-locJoin
# Conflicts:
# modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CachePartitionStateTest.java
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b5319a02
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b5319a02
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b5319a02
Branch: refs/heads/ignite-5578-locJoin
Commit: b5319a02a5f247db3979b8d7e85ca736a7fae127
Parents: d4bd1ee 84d10ae
Author: sboikov <sb...@gridgain.com>
Authored: Thu Jul 13 13:21:16 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Jul 13 13:21:16 2017 +0300
----------------------------------------------------------------------
----------------------------------------------------------------------