You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2015/09/24 03:12:48 UTC
[1/3] ignite git commit: IGNITE-1016 - Exclude neighbors flag for
fair affinity. Fixes #80.
Repository: ignite
Updated Branches:
refs/heads/ignite-1016 [created] 8f5a4c9de
http://git-wip-us.apache.org/repos/asf/ignite/blob/8f5a4c9d/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
index 1deb3bc..df657e5 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
@@ -22,10 +22,10 @@ import junit.framework.TestSuite;
import org.apache.ignite.GridCacheAffinityBackupsSelfTest;
import org.apache.ignite.IgniteCacheAffinitySelfTest;
import org.apache.ignite.cache.IgniteWarmupClosureSelfTest;
-import org.apache.ignite.cache.affinity.IgniteClientNodeAffinityTest;
-import org.apache.ignite.cache.affinity.fair.GridFairAffinityFunctionNodesSelfTest;
-import org.apache.ignite.cache.affinity.fair.GridFairAffinityFunctionSelfTest;
-import org.apache.ignite.cache.affinity.fair.IgniteFairAffinityDynamicCacheSelfTest;
+import org.apache.ignite.cache.affinity.AffinityClientNodeSelfTest;
+import org.apache.ignite.cache.affinity.fair.FairAffinityDynamicCacheSelfTest;
+import org.apache.ignite.cache.affinity.fair.FairAffinityFunctionNodesSelfTest;
+import org.apache.ignite.cache.affinity.fair.FairAffinityFunctionSelfTest;
import org.apache.ignite.cache.store.GridCacheBalancingStoreSelfTest;
import org.apache.ignite.cache.store.GridCacheLoadOnlyStoreAdapterSelfTest;
import org.apache.ignite.cache.store.StoreResourceInjectionSelfTest;
@@ -183,12 +183,12 @@ public class IgniteCacheTestSuite extends TestSuite {
suite.addTestSuite(IgniteWarmupClosureSelfTest.class);
// Affinity tests.
- suite.addTestSuite(GridFairAffinityFunctionNodesSelfTest.class);
- suite.addTestSuite(GridFairAffinityFunctionSelfTest.class);
- suite.addTestSuite(IgniteFairAffinityDynamicCacheSelfTest.class);
+ suite.addTestSuite(FairAffinityFunctionNodesSelfTest.class);
+ suite.addTestSuite(FairAffinityFunctionSelfTest.class);
+ suite.addTestSuite(FairAffinityDynamicCacheSelfTest.class);
suite.addTestSuite(GridCacheAffinityBackupsSelfTest.class);
suite.addTestSuite(IgniteCacheAffinitySelfTest.class);
- suite.addTestSuite(IgniteClientNodeAffinityTest.class);
+ suite.addTestSuite(AffinityClientNodeSelfTest.class);
// Swap tests.
suite.addTestSuite(GridCacheSwapPreloadSelfTest.class);
http://git-wip-us.apache.org/repos/asf/ignite/blob/8f5a4c9d/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
index bd24e51..93bd26c 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
@@ -18,6 +18,10 @@
package org.apache.ignite.testsuites;
import junit.framework.TestSuite;
+import org.apache.ignite.cache.affinity.fair.FairAffinityFunctionBackupFilterSelfTest;
+import org.apache.ignite.cache.affinity.fair.FairAffinityFunctionExcludeNeighborsSelfTest;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunctionBackupFilterSelfTest;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunctionExcludeNeighborsSelfTest;
import org.apache.ignite.internal.processors.cache.CacheDhtLocalPartitionAfterRemoveSelfTest;
import org.apache.ignite.internal.processors.cache.CrossCacheTxRandomOperationsTest;
import org.apache.ignite.internal.processors.cache.GridCacheAtomicMessageCountSelfTest;
@@ -32,7 +36,6 @@ import org.apache.ignite.internal.processors.cache.IgniteCachePartitionMapUpdate
import org.apache.ignite.internal.processors.cache.IgniteDynamicCacheAndNodeStop;
import org.apache.ignite.internal.processors.cache.distributed.CacheLoadingConcurrentGridStartSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.GridCachePartitionNotLoadedEventSelfTest;
-import org.apache.ignite.internal.processors.cache.distributed.GridCachePartitionedAffinityFilterSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.GridCacheTransformEventSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheClientNodeChangingTopologyTest;
import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheClientNodePartitionsExchangeTest;
@@ -101,7 +104,6 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridCachePar
import org.apache.ignite.internal.processors.cache.distributed.near.GridCachePartitionedTxSingleThreadedSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridCachePartitionedTxTimeoutSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheRendezvousAffinityClientSelfTest;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheRendezvousAffinityFunctionExcludeNeighborsSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridPartitionedBackupLoadSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.near.NoneRebalanceModeSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.replicated.GridCacheReplicatedEvictionSelfTest;
@@ -162,7 +164,8 @@ public class IgniteCacheTestSuite2 extends TestSuite {
suite.addTest(new TestSuite(GridCacheNearReaderPreloadSelfTest.class));
suite.addTest(new TestSuite(GridCacheAtomicNearReadersSelfTest.class));
suite.addTest(new TestSuite(GridCachePartitionedAffinitySelfTest.class));
- suite.addTest(new TestSuite(GridCacheRendezvousAffinityFunctionExcludeNeighborsSelfTest.class));
+ suite.addTest(new TestSuite(RendezvousAffinityFunctionExcludeNeighborsSelfTest.class));
+ suite.addTest(new TestSuite(FairAffinityFunctionExcludeNeighborsSelfTest.class));
suite.addTest(new TestSuite(GridCacheRendezvousAffinityClientSelfTest.class));
suite.addTest(new TestSuite(GridCachePartitionedProjectionAffinitySelfTest.class));
suite.addTest(new TestSuite(GridCachePartitionedBasicOpSelfTest.class));
@@ -198,7 +201,8 @@ public class IgniteCacheTestSuite2 extends TestSuite {
suite.addTest(new TestSuite(GridCacheNearPreloadRestartSelfTest.class));
suite.addTest(new TestSuite(GridCacheDhtPreloadStartStopSelfTest.class));
suite.addTest(new TestSuite(GridCacheDhtPreloadUnloadSelfTest.class));
- suite.addTest(new TestSuite(GridCachePartitionedAffinityFilterSelfTest.class));
+ suite.addTest(new TestSuite(RendezvousAffinityFunctionBackupFilterSelfTest.class));
+ suite.addTest(new TestSuite(FairAffinityFunctionBackupFilterSelfTest.class));
suite.addTest(new TestSuite(GridCachePartitionedPreloadLifecycleSelfTest.class));
suite.addTest(new TestSuite(CacheLoadingConcurrentGridStartSelfTest.class));
suite.addTest(new TestSuite(GridCacheDhtPreloadDelayedSelfTest.class));
http://git-wip-us.apache.org/repos/asf/ignite/blob/8f5a4c9d/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java
index cb39660..f3fbf15 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java
@@ -47,6 +47,7 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CachePeekMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cache.query.ScanQuery;
import org.apache.ignite.cache.query.SqlFieldsQuery;
@@ -179,6 +180,9 @@ public abstract class IgniteCacheAbstractQuerySelfTest extends GridCommonAbstrac
IgniteCacheReplicatedQuerySelfTest.CacheKey.class, IgniteCacheReplicatedQuerySelfTest.CacheValue.class
);
+ if (cacheMode() != CacheMode.LOCAL)
+ cc.setAffinity(new RendezvousAffinityFunction());
+
// Explicitly set number of backups equal to number of grids.
if (cacheMode() == CacheMode.PARTITIONED)
cc.setBackups(gridCount());
[2/3] ignite git commit: IGNITE-1016 - Exclude neighbors flag for
fair affinity. Fixes #80.
Posted by ag...@apache.org.
http://git-wip-us.apache.org/repos/asf/ignite/blob/8f5a4c9d/modules/core/src/test/java/org/apache/ignite/cache/affinity/IgniteClientNodeAffinityTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/IgniteClientNodeAffinityTest.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/IgniteClientNodeAffinityTest.java
deleted file mode 100644
index 888904b..0000000
--- a/modules/core/src/test/java/org/apache/ignite/cache/affinity/IgniteClientNodeAffinityTest.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.cache.affinity;
-
-import java.util.Collection;
-import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteCache;
-import org.apache.ignite.cache.affinity.fair.FairAffinityFunction;
-import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.configuration.NearCacheConfiguration;
-import org.apache.ignite.internal.IgniteNodeAttributes;
-import org.apache.ignite.lang.IgnitePredicate;
-import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-
-import static org.apache.ignite.cache.CacheMode.REPLICATED;
-
-/**
- *
- */
-public class IgniteClientNodeAffinityTest extends GridCommonAbstractTest {
- /** */
- protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
-
- /** */
- private static final int NODE_CNT = 4;
-
- /** */
- private static final String CACHE1 = "cache1";
-
- /** */
- private static final String CACHE2 = "cache2";
-
- /** */
- private static final String CACHE3 = "cache3";
-
- /** */
- private static final String CACHE4 = "cache4";
-
- /** {@inheritDoc} */
- @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
- IgniteConfiguration cfg = super.getConfiguration(gridName);
-
- ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
-
- if (gridName.equals(getTestGridName(NODE_CNT - 1)))
- cfg.setClientMode(true);
-
- CacheConfiguration ccfg1 = new CacheConfiguration();
-
- ccfg1.setBackups(1);
- ccfg1.setName(CACHE1);
- ccfg1.setAffinity(new RendezvousAffinityFunction());
- ccfg1.setNodeFilter(new TestNodesFilter());
-
- CacheConfiguration ccfg2 = new CacheConfiguration();
-
- ccfg2.setBackups(1);
- ccfg2.setName(CACHE2);
- ccfg2.setAffinity(new RendezvousAffinityFunction());
-
- CacheConfiguration ccfg3 = new CacheConfiguration();
-
- ccfg3.setBackups(1);
- ccfg3.setName(CACHE3);
- ccfg3.setAffinity(new FairAffinityFunction());
- ccfg3.setNodeFilter(new TestNodesFilter());
-
- CacheConfiguration ccfg4 = new CacheConfiguration();
-
- ccfg4.setCacheMode(REPLICATED);
- ccfg4.setName(CACHE4);
- ccfg4.setNodeFilter(new TestNodesFilter());
-
- cfg.setCacheConfiguration(ccfg1, ccfg2, ccfg3, ccfg4);
-
- return cfg;
- }
-
- /** {@inheritDoc} */
- @Override protected void beforeTestsStarted() throws Exception {
- super.beforeTestsStarted();
-
- startGrids(NODE_CNT);
- }
-
- /** {@inheritDoc} */
- @Override protected void afterTestsStopped() throws Exception {
- stopAllGrids();
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testClientNodeNotInAffinity() throws Exception {
- checkCache(CACHE1, 2);
-
- checkCache(CACHE2, 2);
-
- checkCache(CACHE3, 2);
-
- checkCache(CACHE4, 3);
-
- Ignite client = ignite(NODE_CNT - 1);
-
- CacheConfiguration ccfg = new CacheConfiguration();
-
- ccfg.setBackups(0);
-
- ccfg.setNodeFilter(new TestNodesFilter());
-
- IgniteCache<Integer, Integer> cache = client.createCache(ccfg);
-
- try {
- checkCache(null, 1);
- }
- finally {
- cache.destroy();
- }
-
- cache = client.createCache(ccfg, new NearCacheConfiguration());
-
- try {
- checkCache(null, 1);
- }
- finally {
- cache.destroy();
- }
- }
-
- /**
- * @param cacheName Cache name.
- * @param expNodes Expected number of nodes per partition.
- */
- private void checkCache(String cacheName, int expNodes) {
- log.info("Test cache: " + cacheName);
-
- Ignite client = ignite(NODE_CNT - 1);
-
- assertTrue(client.configuration().isClientMode());
-
- ClusterNode clientNode = client.cluster().localNode();
-
- for (int i = 0; i < NODE_CNT; i++) {
- Ignite ignite = ignite(i);
-
- Affinity<Integer> aff = ignite.affinity(cacheName);
-
- for (int part = 0; part < aff.partitions(); part++) {
- Collection<ClusterNode> nodes = aff.mapPartitionToPrimaryAndBackups(part);
-
- assertEquals(expNodes, nodes.size());
-
- assertFalse(nodes.contains(clientNode));
- }
- }
- }
-
- /**
- *
- */
- private static class TestNodesFilter implements IgnitePredicate<ClusterNode> {
- /** {@inheritDoc} */
- @Override public boolean apply(ClusterNode clusterNode) {
- Boolean attr = clusterNode.attribute(IgniteNodeAttributes.ATTR_CLIENT_MODE);
-
- assertNotNull(attr);
-
- assertFalse(attr);
-
- return true;
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8f5a4c9d/modules/core/src/test/java/org/apache/ignite/cache/affinity/fair/FairAffinityDynamicCacheSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/fair/FairAffinityDynamicCacheSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/fair/FairAffinityDynamicCacheSelfTest.java
new file mode 100644
index 0000000..0b32320
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/cache/affinity/fair/FairAffinityDynamicCacheSelfTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.affinity.fair;
+
+import java.util.concurrent.Callable;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ *
+ */
+public class FairAffinityDynamicCacheSelfTest extends GridCommonAbstractTest {
+ /** */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ public FairAffinityDynamicCacheSelfTest(){
+ super(false);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+ disco.setIpFinder(IP_FINDER);
+
+ cfg.getTransactionConfiguration().setTxSerializableEnabled(true);
+
+ cfg.setDiscoverySpi(disco);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ startGridsMultiThreaded(3);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ stopAllGrids();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testStartStopCache() throws Exception {
+ CacheConfiguration<Integer, Integer> cacheCfg = new CacheConfiguration<>();
+
+ cacheCfg.setCacheMode(CacheMode.PARTITIONED);
+ cacheCfg.setAtomicityMode(CacheAtomicityMode.ATOMIC);
+ cacheCfg.setBackups(1);
+ cacheCfg.setName("test");
+ cacheCfg.setAffinity(new FairAffinityFunction());
+
+ final IgniteCache<Integer, Integer> cache = ignite(0).createCache(cacheCfg);
+
+ for (int i = 0; i < 10_000; i++)
+ cache.put(i, i);
+
+ IgniteInternalFuture<Object> destFut = GridTestUtils.runAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ ignite(0).destroyCache(cache.getName());
+
+ return null;
+ }
+ });
+
+ destFut.get(2000L);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8f5a4c9d/modules/core/src/test/java/org/apache/ignite/cache/affinity/fair/FairAffinityFunctionBackupFilterSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/fair/FairAffinityFunctionBackupFilterSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/fair/FairAffinityFunctionBackupFilterSelfTest.java
new file mode 100644
index 0000000..eedc9e4
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/cache/affinity/fair/FairAffinityFunctionBackupFilterSelfTest.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.affinity.fair;
+
+import org.apache.ignite.cache.affinity.AffinityFunction;
+import org.apache.ignite.cache.affinity.AffinityFunctionBackupFilterAbstractSelfTest;
+
+/**
+ * Tests backup filter for {@link FairAffinityFunction}.
+ */
+public class FairAffinityFunctionBackupFilterSelfTest extends AffinityFunctionBackupFilterAbstractSelfTest {
+ /** {@inheritDoc} */
+ @Override protected AffinityFunction affinityFunction() {
+ FairAffinityFunction aff = new FairAffinityFunction(false);
+
+ aff.setBackupFilter(backupFilter);
+
+ return aff;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8f5a4c9d/modules/core/src/test/java/org/apache/ignite/cache/affinity/fair/FairAffinityFunctionExcludeNeighborsSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/fair/FairAffinityFunctionExcludeNeighborsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/fair/FairAffinityFunctionExcludeNeighborsSelfTest.java
new file mode 100644
index 0000000..4182cd3
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/cache/affinity/fair/FairAffinityFunctionExcludeNeighborsSelfTest.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.affinity.fair;
+
+import org.apache.ignite.cache.affinity.AffinityFunction;
+import org.apache.ignite.cache.affinity.AffinityFunctionExcludeNeighborsAbstractSelfTest;
+
+/**
+ * Tests exclude neighbors flag for {@link FairAffinityFunction}.
+ */
+public class FairAffinityFunctionExcludeNeighborsSelfTest extends AffinityFunctionExcludeNeighborsAbstractSelfTest {
+ /** {@inheritDoc} */
+ @Override protected AffinityFunction affinityFunction() {
+ return new FairAffinityFunction(true);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8f5a4c9d/modules/core/src/test/java/org/apache/ignite/cache/affinity/fair/FairAffinityFunctionNodesSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/fair/FairAffinityFunctionNodesSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/fair/FairAffinityFunctionNodesSelfTest.java
new file mode 100644
index 0000000..7420a0d
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/cache/affinity/fair/FairAffinityFunctionNodesSelfTest.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.affinity.fair;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.TreeSet;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ * Tests partition fair affinity in real grid.
+ */
+public class FairAffinityFunctionNodesSelfTest extends GridCommonAbstractTest {
+ /** IP finder. */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** Number of backups. */
+ private int backups;
+
+ /** Number of partitions. */
+ private int parts = 512;
+
+ /** Add nodes test. */
+ private static final boolean[] ADD_ONLY = new boolean[] {true, true, true, true, true, true};
+
+ /** Add nodes test. */
+ private static final boolean[] ADD_REMOVE = new boolean[]
+ {
+ true, true, true, true, true, true,
+ false, false, false, false, false
+ };
+
+ /** */
+ private static final boolean[] MIXED1 = new boolean[]
+ {
+ // 1 2 3 2 3 4 3 4 5 4 3 2
+ true, true, true, false, true, true, false, true, true, false, false, false
+ };
+
+ /** */
+ private static final boolean[] MIXED2 = new boolean[]
+ {
+ // 1 2 3 2 1 2 1 2 3 2 1 2
+ true, true, true, false, false, true, false, true, true, false, false, true
+ };
+
+ /** */
+ private static final boolean[] MIXED3 = new boolean[]
+ {
+ // 1 2 3 4 5 6 5 6 7 8 9 8 7 8 9
+ true, true, true, true, true, true, false, true, true, true, true, false, false, true, true,
+ // 8 7 6
+ false, false, false
+ };
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ CacheConfiguration ccfg = cacheConfiguration();
+
+ cfg.setCacheConfiguration(ccfg);
+
+ TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+
+ discoSpi.setIpFinder(IP_FINDER);
+
+ cfg.setDiscoverySpi(discoSpi);
+
+ return cfg;
+ }
+
+ /**
+ * @return Cache configuration.
+ */
+ private CacheConfiguration cacheConfiguration() {
+ CacheConfiguration cfg = new CacheConfiguration();
+
+ cfg.setBackups(backups);
+
+ cfg.setCacheMode(CacheMode.PARTITIONED);
+
+ cfg.setNearConfiguration(null);
+
+ cfg.setAffinity(new FairAffinityFunction(parts));
+
+ return cfg;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAdd() throws Exception {
+ checkSequence(ADD_ONLY);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAddRemove() throws Exception {
+ checkSequence(ADD_REMOVE);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testMixed1() throws Exception {
+ checkSequence(MIXED1);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testMixed2() throws Exception {
+ checkSequence(MIXED2);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testMixed3() throws Exception {
+ checkSequence(MIXED3);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ private void checkSequence(boolean[] seq) throws Exception {
+ for (int b = 0; b < 3; b++) {
+ backups = b;
+
+ info(">>>>>>>>>>>>>>>> Checking backups: " + backups);
+
+ checkSequence0(seq);
+
+ info(">>>>>>>>>>>>>>>> Finished check: " + backups);
+ }
+ }
+
+ /**
+ * @param seq Start/stop sequence.
+ * @throws Exception If failed.
+ */
+ private void checkSequence0(boolean[] seq) throws Exception {
+ try {
+ startGrid(0);
+
+ TreeSet<Integer> started = new TreeSet<>();
+
+ started.add(0);
+
+ int topVer = 1;
+
+ for (boolean start : seq) {
+ if (start) {
+ int nextIdx = nextIndex(started);
+
+ startGrid(nextIdx);
+
+ started.add(nextIdx);
+ }
+ else {
+ int idx = started.last();
+
+ stopGrid(idx);
+
+ started.remove(idx);
+ }
+
+ topVer++;
+
+ info("Grid 0: " + grid(0).localNode().id());
+
+ ((IgniteKernal)grid(0)).internalCache().context().affinity().affinityReadyFuture(topVer).get();
+
+ for (int i : started) {
+ if (i != 0) {
+ IgniteEx grid = grid(i);
+
+ ((IgniteKernal)grid).internalCache().context().affinity().affinityReadyFuture(topVer).get();
+
+ info("Grid " + i + ": " + grid.localNode().id());
+
+ for (int part = 0; part < parts; part++) {
+ List<ClusterNode> firstNodes = (List<ClusterNode>)grid(0).affinity(null)
+ .mapPartitionToPrimaryAndBackups(part);
+
+ List<ClusterNode> secondNodes = (List<ClusterNode>)grid.affinity(null)
+ .mapPartitionToPrimaryAndBackups(part);
+
+ assertEquals(firstNodes.size(), secondNodes.size());
+
+ for (int n = 0; n < firstNodes.size(); n++)
+ assertEquals(firstNodes.get(n), secondNodes.get(n));
+ }
+ }
+ }
+ }
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
+
+ /**
+ * First positive integer that is not present in started set.
+ *
+ * @param started Already started indices.
+ * @return First positive integer that is not present in started set.
+ */
+ private int nextIndex(Collection<Integer> started) {
+ assert started.contains(0);
+
+ for (int i = 1; i < 10000; i++) {
+ if (!started.contains(i))
+ return i;
+ }
+
+ throw new IllegalStateException();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8f5a4c9d/modules/core/src/test/java/org/apache/ignite/cache/affinity/fair/FairAffinityFunctionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/fair/FairAffinityFunctionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/fair/FairAffinityFunctionSelfTest.java
new file mode 100644
index 0000000..a79c9fc
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/cache/affinity/fair/FairAffinityFunctionSelfTest.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.affinity.fair;
+
+import org.apache.ignite.cache.affinity.AbstractAffinityFunctionSelfTest;
+import org.apache.ignite.cache.affinity.AffinityFunction;
+
+/**
+ * Tests for {@link FairAffinityFunction}.
+ */
+public class FairAffinityFunctionSelfTest extends AbstractAffinityFunctionSelfTest {
+ /** {@inheritDoc} */
+ @Override protected AffinityFunction affinityFunction() {
+ return new FairAffinityFunction();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8f5a4c9d/modules/core/src/test/java/org/apache/ignite/cache/affinity/fair/GridFairAffinityFunctionNodesSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/fair/GridFairAffinityFunctionNodesSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/fair/GridFairAffinityFunctionNodesSelfTest.java
deleted file mode 100644
index cf57b66..0000000
--- a/modules/core/src/test/java/org/apache/ignite/cache/affinity/fair/GridFairAffinityFunctionNodesSelfTest.java
+++ /dev/null
@@ -1,245 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.cache.affinity.fair;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.TreeSet;
-import org.apache.ignite.cache.CacheMode;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.internal.IgniteEx;
-import org.apache.ignite.internal.IgniteKernal;
-import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-
-/**
- * Tests partition fair affinity in real grid.
- */
-public class GridFairAffinityFunctionNodesSelfTest extends GridCommonAbstractTest {
- /** IP finder. */
- private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
-
- /** Number of backups. */
- private int backups;
-
- /** Number of partitions. */
- private int parts = 512;
-
- /** Add nodes test. */
- private static final boolean[] ADD_ONLY = new boolean[] {true, true, true, true, true, true};
-
- /** Add nodes test. */
- private static final boolean[] ADD_REMOVE = new boolean[]
- {
- true, true, true, true, true, true,
- false, false, false, false, false
- };
-
- /** */
- private static final boolean[] MIXED1 = new boolean[]
- {
- // 1 2 3 2 3 4 3 4 5 4 3 2
- true, true, true, false, true, true, false, true, true, false, false, false
- };
-
- /** */
- private static final boolean[] MIXED2 = new boolean[]
- {
- // 1 2 3 2 1 2 1 2 3 2 1 2
- true, true, true, false, false, true, false, true, true, false, false, true
- };
-
- /** */
- private static final boolean[] MIXED3 = new boolean[]
- {
- // 1 2 3 4 5 6 5 6 7 8 9 8 7 8 9
- true, true, true, true, true, true, false, true, true, true, true, false, false, true, true,
- // 8 7 6
- false, false, false
- };
-
- /** {@inheritDoc} */
- @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
- IgniteConfiguration cfg = super.getConfiguration(gridName);
-
- CacheConfiguration ccfg = cacheConfiguration();
-
- cfg.setCacheConfiguration(ccfg);
-
- TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
-
- discoSpi.setIpFinder(IP_FINDER);
-
- cfg.setDiscoverySpi(discoSpi);
-
- return cfg;
- }
-
- /**
- * @return Cache configuration.
- */
- private CacheConfiguration cacheConfiguration() {
- CacheConfiguration cfg = new CacheConfiguration();
-
- cfg.setBackups(backups);
-
- cfg.setCacheMode(CacheMode.PARTITIONED);
-
- cfg.setNearConfiguration(null);
-
- cfg.setAffinity(new FairAffinityFunction(parts));
-
- return cfg;
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testAdd() throws Exception {
- checkSequence(ADD_ONLY);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testAddRemove() throws Exception {
- checkSequence(ADD_REMOVE);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testMixed1() throws Exception {
- checkSequence(MIXED1);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testMixed2() throws Exception {
- checkSequence(MIXED2);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testMixed3() throws Exception {
- checkSequence(MIXED3);
- }
-
- /**
- * @throws Exception If failed.
- */
- private void checkSequence(boolean[] seq) throws Exception {
- for (int b = 0; b < 3; b++) {
- backups = b;
-
- info(">>>>>>>>>>>>>>>> Checking backups: " + backups);
-
- checkSequence0(seq);
-
- info(">>>>>>>>>>>>>>>> Finished check: " + backups);
- }
- }
-
- /**
- * @param seq Start/stop sequence.
- * @throws Exception If failed.
- */
- private void checkSequence0(boolean[] seq) throws Exception {
- try {
- startGrid(0);
-
- TreeSet<Integer> started = new TreeSet<>();
-
- started.add(0);
-
- int topVer = 1;
-
- for (boolean start : seq) {
- if (start) {
- int nextIdx = nextIndex(started);
-
- startGrid(nextIdx);
-
- started.add(nextIdx);
- }
- else {
- int idx = started.last();
-
- stopGrid(idx);
-
- started.remove(idx);
- }
-
- topVer++;
-
- info("Grid 0: " + grid(0).localNode().id());
-
- ((IgniteKernal)grid(0)).internalCache().context().affinity().affinityReadyFuture(topVer).get();
-
- for (int i : started) {
- if (i != 0) {
- IgniteEx grid = grid(i);
-
- ((IgniteKernal)grid).internalCache().context().affinity().affinityReadyFuture(topVer).get();
-
- info("Grid " + i + ": " + grid.localNode().id());
-
- for (int part = 0; part < parts; part++) {
- List<ClusterNode> firstNodes = (List<ClusterNode>)grid(0).affinity(null)
- .mapPartitionToPrimaryAndBackups(part);
-
- List<ClusterNode> secondNodes = (List<ClusterNode>)grid.affinity(null)
- .mapPartitionToPrimaryAndBackups(part);
-
- assertEquals(firstNodes.size(), secondNodes.size());
-
- for (int n = 0; n < firstNodes.size(); n++)
- assertEquals(firstNodes.get(n), secondNodes.get(n));
- }
- }
- }
- }
- }
- finally {
- stopAllGrids();
- }
- }
-
- /**
- * First positive integer that is not present in started set.
- *
- * @param started Already started indices.
- * @return First positive integer that is not present in started set.
- */
- private int nextIndex(Collection<Integer> started) {
- assert started.contains(0);
-
- for (int i = 1; i < 10000; i++) {
- if (!started.contains(i))
- return i;
- }
-
- throw new IllegalStateException();
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8f5a4c9d/modules/core/src/test/java/org/apache/ignite/cache/affinity/fair/GridFairAffinityFunctionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/fair/GridFairAffinityFunctionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/fair/GridFairAffinityFunctionSelfTest.java
deleted file mode 100644
index e746b42..0000000
--- a/modules/core/src/test/java/org/apache/ignite/cache/affinity/fair/GridFairAffinityFunctionSelfTest.java
+++ /dev/null
@@ -1,270 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.cache.affinity.fair;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.UUID;
-import org.apache.ignite.cache.affinity.AffinityFunction;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.events.DiscoveryEvent;
-import org.apache.ignite.events.EventType;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.affinity.GridAffinityFunctionContextImpl;
-import org.apache.ignite.testframework.GridTestNode;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-
-/**
- *
- */
-public class GridFairAffinityFunctionSelfTest extends GridCommonAbstractTest {
- /**
- * @throws Exception If failed.
- */
- public void testNodeRemovedNoBackups() throws Exception {
- checkNodeRemoved(0);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testNodeRemovedOneBackup() throws Exception {
- checkNodeRemoved(1);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testNodeRemovedTwoBackups() throws Exception {
- checkNodeRemoved(2);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testNodeRemovedThreeBackups() throws Exception {
- checkNodeRemoved(3);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testRandomReassignmentNoBackups() throws Exception {
- checkRandomReassignment(0);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testRandomReassignmentOneBackup() throws Exception {
- checkRandomReassignment(1);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testRandomReassignmentTwoBackups() throws Exception {
- checkRandomReassignment(2);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testRandomReassignmentThreeBackups() throws Exception {
- checkRandomReassignment(3);
- }
-
- /**
- * @throws Exception If failed.
- */
- private void checkNodeRemoved(int backups) throws Exception {
- int parts = 256;
-
- AffinityFunction aff = new FairAffinityFunction(parts);
-
- int nodesCnt = 50;
-
- List<ClusterNode> nodes = new ArrayList<>(nodesCnt);
-
- List<List<ClusterNode>> prev = null;
-
- for (int i = 0; i < nodesCnt; i++) {
- info("======================================");
- info("Assigning partitions: " + i);
- info("======================================");
-
- ClusterNode node = new GridTestNode(UUID.randomUUID());
-
- nodes.add(node);
-
- DiscoveryEvent discoEvt = new DiscoveryEvent(node, "", EventType.EVT_NODE_JOINED,
- node);
-
- List<List<ClusterNode>> assignment = aff.assignPartitions(
- new GridAffinityFunctionContextImpl(nodes, prev, discoEvt, new AffinityTopologyVersion(i),
- backups));
-
- info("Assigned.");
-
- verifyAssignment(assignment, backups, parts, nodes.size());
-
- prev = assignment;
- }
-
- info("======================================");
- info("Will remove nodes.");
- info("======================================");
-
- for (int i = 0; i < nodesCnt - 1; i++) {
- info("======================================");
- info("Assigning partitions: " + i);
- info("======================================");
-
- ClusterNode rmv = nodes.remove(nodes.size() - 1);
-
- DiscoveryEvent discoEvt = new DiscoveryEvent(rmv, "", EventType.EVT_NODE_LEFT, rmv);
-
- List<List<ClusterNode>> assignment = aff.assignPartitions(
- new GridAffinityFunctionContextImpl(nodes, prev, discoEvt, new AffinityTopologyVersion(i),
- backups));
-
- info("Assigned.");
-
- verifyAssignment(assignment, backups, parts, nodes.size());
-
- prev = assignment;
- }
- }
-
- @SuppressWarnings("IfMayBeConditional")
- private void checkRandomReassignment(int backups) {
- int parts = 256;
-
- AffinityFunction aff = new FairAffinityFunction(parts);
-
- Random rnd = new Random();
-
- int maxNodes = 50;
-
- List<ClusterNode> nodes = new ArrayList<>(maxNodes);
-
- List<List<ClusterNode>> prev = null;
-
- int state = 0;
-
- int i = 0;
-
- while (true) {
- boolean add;
-
- if (nodes.size() < 2) {
- // Returned back to one node?
- if (state == 1)
- return;
-
- add = true;
- }
- else if (nodes.size() == maxNodes) {
- if (state == 0)
- state = 1;
-
- add = false;
- }
- else {
- // Nodes size in [2, maxNodes - 1].
- if (state == 0)
- add = rnd.nextInt(3) != 0; // 66% to add, 33% to remove.
- else
- add = rnd.nextInt(3) == 0; // 33% to add, 66% to remove.
- }
-
- DiscoveryEvent discoEvt;
-
- if (add) {
- ClusterNode addedNode = new GridTestNode(UUID.randomUUID());
-
- nodes.add(addedNode);
-
- discoEvt = new DiscoveryEvent(addedNode, "", EventType.EVT_NODE_JOINED, addedNode);
- }
- else {
- ClusterNode rmvNode = nodes.remove(rnd.nextInt(nodes.size()));
-
- discoEvt = new DiscoveryEvent(rmvNode, "", EventType.EVT_NODE_LEFT, rmvNode);
- }
-
- info("======================================");
- info("Assigning partitions [iter=" + i + ", discoEvt=" + discoEvt + ", nodesSize=" + nodes.size() + ']');
- info("======================================");
-
- List<List<ClusterNode>> assignment = aff.assignPartitions(
- new GridAffinityFunctionContextImpl(nodes, prev, discoEvt, new AffinityTopologyVersion(i),
- backups));
-
- verifyAssignment(assignment, backups, parts, nodes.size());
-
- prev = assignment;
-
- i++;
- }
- }
-
- /**
- * @param assignment Assignment to verify.
- */
- private void verifyAssignment(List<List<ClusterNode>> assignment, int keyBackups, int partsCnt, int topSize) {
- Map<UUID, Collection<Integer>> mapping = new HashMap<>();
-
- int ideal = Math.round((float)partsCnt / topSize * Math.min(keyBackups + 1, topSize));
-
- for (int part = 0; part < assignment.size(); part++) {
- for (ClusterNode node : assignment.get(part)) {
- assert node != null;
-
- Collection<Integer> parts = mapping.get(node.id());
-
- if (parts == null) {
- parts = new HashSet<>();
-
- mapping.put(node.id(), parts);
- }
-
- assertTrue(parts.add(part));
- }
- }
-
- int max = -1, min = Integer.MAX_VALUE;
-
- for (Collection<Integer> parts : mapping.values()) {
- max = Math.max(max, parts.size());
- min = Math.min(min, parts.size());
- }
-
- log().warning("max=" + max + ", min=" + min + ", ideal=" + ideal + ", minDev=" + deviation(min, ideal) + "%, " +
- "maxDev=" + deviation(max, ideal) + "%");
- }
-
- private static int deviation(int val, int ideal) {
- return Math.round(Math.abs(((float)val - ideal) / ideal * 100));
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8f5a4c9d/modules/core/src/test/java/org/apache/ignite/cache/affinity/fair/IgniteFairAffinityDynamicCacheSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/fair/IgniteFairAffinityDynamicCacheSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/fair/IgniteFairAffinityDynamicCacheSelfTest.java
deleted file mode 100644
index eb1c455..0000000
--- a/modules/core/src/test/java/org/apache/ignite/cache/affinity/fair/IgniteFairAffinityDynamicCacheSelfTest.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.cache.affinity.fair;
-
-import java.util.concurrent.Callable;
-import org.apache.ignite.IgniteCache;
-import org.apache.ignite.cache.CacheAtomicityMode;
-import org.apache.ignite.cache.CacheMode;
-import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
-import org.apache.ignite.testframework.GridTestUtils;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-
-/**
- *
- */
-public class IgniteFairAffinityDynamicCacheSelfTest extends GridCommonAbstractTest {
- /** */
- private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
-
- /** */
- public IgniteFairAffinityDynamicCacheSelfTest(){
- super(false);
- }
-
- /** {@inheritDoc} */
- @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
- IgniteConfiguration cfg = super.getConfiguration(gridName);
-
- TcpDiscoverySpi disco = new TcpDiscoverySpi();
-
- disco.setIpFinder(IP_FINDER);
-
- cfg.getTransactionConfiguration().setTxSerializableEnabled(true);
-
- cfg.setDiscoverySpi(disco);
-
- return cfg;
- }
-
- /** {@inheritDoc} */
- @Override protected void beforeTestsStarted() throws Exception {
- startGridsMultiThreaded(3);
- }
-
- /** {@inheritDoc} */
- @Override protected void afterTestsStopped() throws Exception {
- stopAllGrids();
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testStartStopCache() throws Exception {
- CacheConfiguration<Integer, Integer> cacheCfg = new CacheConfiguration<>();
-
- cacheCfg.setCacheMode(CacheMode.PARTITIONED);
- cacheCfg.setAtomicityMode(CacheAtomicityMode.ATOMIC);
- cacheCfg.setBackups(1);
- cacheCfg.setName("test");
- cacheCfg.setAffinity(new FairAffinityFunction());
-
- final IgniteCache<Integer, Integer> cache = ignite(0).createCache(cacheCfg);
-
- for (int i = 0; i < 10_000; i++)
- cache.put(i, i);
-
- IgniteInternalFuture<Object> destFut = GridTestUtils.runAsync(new Callable<Object>() {
- @Override public Object call() throws Exception {
- ignite(0).destroyCache(cache.getName());
-
- return null;
- }
- });
-
- destFut.get(2000L);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8f5a4c9d/modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunctionBackupFilterSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunctionBackupFilterSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunctionBackupFilterSelfTest.java
new file mode 100644
index 0000000..d5d8b8f
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunctionBackupFilterSelfTest.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.affinity.rendezvous;
+
+import org.apache.ignite.cache.affinity.AffinityFunction;
+import org.apache.ignite.cache.affinity.AffinityFunctionBackupFilterAbstractSelfTest;
+
+/**
+ * Partitioned affinity test.
+ */
+public class RendezvousAffinityFunctionBackupFilterSelfTest extends AffinityFunctionBackupFilterAbstractSelfTest {
+ /** {@inheritDoc} */
+ @Override protected AffinityFunction affinityFunction() {
+ RendezvousAffinityFunction aff = new RendezvousAffinityFunction(false);
+
+ aff.setBackupFilter(backupFilter);
+
+ return aff;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8f5a4c9d/modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunctionExcludeNeighborsSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunctionExcludeNeighborsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunctionExcludeNeighborsSelfTest.java
new file mode 100644
index 0000000..ea47c68
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunctionExcludeNeighborsSelfTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.affinity.rendezvous;
+
+import org.apache.ignite.cache.affinity.AffinityFunction;
+import org.apache.ignite.cache.affinity.AffinityFunctionExcludeNeighborsAbstractSelfTest;
+
+/**
+ * Tests exclude neighbors flag for rendezvous affinity function.
+ */
+public class RendezvousAffinityFunctionExcludeNeighborsSelfTest extends
+ AffinityFunctionExcludeNeighborsAbstractSelfTest {
+ /** {@inheritDoc} */
+ @Override protected AffinityFunction affinityFunction() {
+ return new RendezvousAffinityFunction(true);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8f5a4c9d/modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunctionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunctionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunctionSelfTest.java
new file mode 100644
index 0000000..d895315
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunctionSelfTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.affinity.rendezvous;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.cache.affinity.AbstractAffinityFunctionSelfTest;
+import org.apache.ignite.cache.affinity.AffinityFunction;
+import org.apache.ignite.testframework.GridTestUtils;
+
+/**
+ * Tests for {@link RendezvousAffinityFunction}.
+ */
+public class RendezvousAffinityFunctionSelfTest extends AbstractAffinityFunctionSelfTest {
+ /** Ignite. */
+ private static Ignite ignite;
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ ignite = startGrid();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ stopAllGrids();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected AffinityFunction affinityFunction() {
+ AffinityFunction aff = new RendezvousAffinityFunction();
+
+ GridTestUtils.setFieldValue(aff, "ignite", ignite);
+
+ return aff;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8f5a4c9d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CrossCacheTxRandomOperationsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CrossCacheTxRandomOperationsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CrossCacheTxRandomOperationsTest.java
index 1495a2b..cedb693 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CrossCacheTxRandomOperationsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CrossCacheTxRandomOperationsTest.java
@@ -124,7 +124,7 @@ public class CrossCacheTxRandomOperationsTest extends GridCommonAbstractTest {
/**
* @throws Exception If failed.
*/
- public void _testCrossCacheTxOperationsFairAffinity() throws Exception {
+ public void testCrossCacheTxOperationsFairAffinity() throws Exception {
txOperations(PARTITIONED, FULL_SYNC, true, true);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8f5a4c9d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractLocalStoreSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractLocalStoreSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractLocalStoreSelfTest.java
index a8d025c..3e12ebf 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractLocalStoreSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractLocalStoreSelfTest.java
@@ -37,6 +37,7 @@ import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMemoryMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CachePeekMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cache.store.CacheStore;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
@@ -96,8 +97,12 @@ public abstract class GridCacheAbstractLocalStoreSelfTest extends GridCommonAbst
CacheConfiguration cacheCfg = cache(gridName, null, 0);
+ cacheCfg.setAffinity(new RendezvousAffinityFunction());
+
CacheConfiguration cacheBackupCfg = cache(gridName, BACKUP_CACHE, 2);
+ cacheBackupCfg.setAffinity(new RendezvousAffinityFunction());
+
cfg.setCacheConfiguration(cacheCfg, cacheBackupCfg);
TcpDiscoverySpi spi = new TcpDiscoverySpi();
http://git-wip-us.apache.org/repos/asf/ignite/blob/8f5a4c9d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConfigurationConsistencySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConfigurationConsistencySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConfigurationConsistencySelfTest.java
index 31e34bb..e28e89f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConfigurationConsistencySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConfigurationConsistencySelfTest.java
@@ -806,23 +806,6 @@ public class GridCacheConfigurationConsistencySelfTest extends GridCommonAbstrac
public void testAffinityForReplicatedCache() throws Exception {
cacheEnabled = true;
- aff = new FairAffinityFunction(); // Check cannot use FairAffinityFunction.
-
- GridTestUtils.assertThrows(log, new Callable<Object>() {
- @Override public Object call() throws Exception {
- return startGrid(1);
- }
- }, IgniteCheckedException.class, null);
-
- aff = new RendezvousAffinityFunction(true); // Check cannot set 'excludeNeighbors' flag.
- backups = Integer.MAX_VALUE;
-
- GridTestUtils.assertThrows(log, new Callable<Object>() {
- @Override public Object call() throws Exception {
- return startGrid(1);
- }
- }, IgniteCheckedException.class, null);
-
aff = new RendezvousAffinityFunction(false, 100);
startGrid(1);
http://git-wip-us.apache.org/repos/asf/ignite/blob/8f5a4c9d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePartitionedAffinityFilterSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePartitionedAffinityFilterSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePartitionedAffinityFilterSelfTest.java
deleted file mode 100644
index cb8abec..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePartitionedAffinityFilterSelfTest.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed;
-
-import java.util.Collection;
-import org.apache.ignite.IgniteCache;
-import org.apache.ignite.cache.CacheWriteSynchronizationMode;
-import org.apache.ignite.cache.affinity.AffinityFunction;
-import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.lang.IgniteBiPredicate;
-import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-
-import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
-import static org.apache.ignite.cache.CacheMode.PARTITIONED;
-import static org.apache.ignite.cache.CacheRebalanceMode.SYNC;
-
-/**
- * Partitioned affinity test.
- */
-@SuppressWarnings({"PointlessArithmeticExpression"})
-public class GridCachePartitionedAffinityFilterSelfTest extends GridCommonAbstractTest {
- /** */
- private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
-
- /** Backup count. */
- private static final int BACKUPS = 1;
-
- /** Split attribute name. */
- private static final String SPLIT_ATTRIBUTE_NAME = "split-attribute";
-
- /** Split attribute value. */
- private String splitAttrVal;
-
- /** Test backup filter. */
- private static final IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter =
- new IgniteBiPredicate<ClusterNode, ClusterNode>() {
- @Override public boolean apply(ClusterNode primary, ClusterNode backup) {
- assert primary != null : "primary is null";
- assert backup != null : "backup is null";
-
- return !F.eq(primary.attribute(SPLIT_ATTRIBUTE_NAME), backup.attribute(SPLIT_ATTRIBUTE_NAME));
- }
- };
-
- /** {@inheritDoc} */
- @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
- RendezvousAffinityFunction aff = new RendezvousAffinityFunction();
-
- aff.setBackupFilter(backupFilter);
-
- CacheConfiguration cacheCfg = defaultCacheConfiguration();
-
- cacheCfg.setCacheMode(PARTITIONED);
- cacheCfg.setBackups(BACKUPS);
- cacheCfg.setAffinity(aff);
- cacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
- cacheCfg.setRebalanceMode(SYNC);
- cacheCfg.setAtomicityMode(TRANSACTIONAL);
-
- TcpDiscoverySpi spi = new TcpDiscoverySpi();
-
- spi.setIpFinder(IP_FINDER);
-
- IgniteConfiguration cfg = super.getConfiguration(gridName);
-
- cfg.setCacheConfiguration(cacheCfg);
- cfg.setDiscoverySpi(spi);
-
- cfg.setUserAttributes(F.asMap(SPLIT_ATTRIBUTE_NAME, splitAttrVal));
-
- return cfg;
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testPartitionDistribution() throws Exception {
- try {
- for (int i = 0; i < 3; i++) {
- splitAttrVal = "A";
-
- startGrid(2 * i);
-
- splitAttrVal = "B";
-
- startGrid(2 * i + 1);
-
- awaitPartitionMapExchange();
-
- checkPartitions();
- }
- }
- finally {
- stopAllGrids();
- }
- }
-
- /**
- * @throws Exception If failed.
- */
- private void checkPartitions() throws Exception {
- int partCnt = RendezvousAffinityFunction.DFLT_PARTITION_COUNT;
-
- AffinityFunction aff = cacheConfiguration(grid(0).configuration(), null).getAffinity();
-
- IgniteCache<Object, Object> cache = grid(0).cache(null);
-
- for (int i = 0; i < partCnt; i++) {
- assertEquals(i, aff.partition(i));
-
- Collection<ClusterNode> nodes = affinity(cache).mapKeyToPrimaryAndBackups(i);
-
- assertEquals(2, nodes.size());
-
- ClusterNode primary = F.first(nodes);
- ClusterNode backup = F.last(nodes);
-
- assertFalse(F.eq(primary.attribute(SPLIT_ATTRIBUTE_NAME), backup.attribute(SPLIT_ATTRIBUTE_NAME)));
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8f5a4c9d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadPutGetSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadPutGetSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadPutGetSelfTest.java
index fa04b6b..c12e1ba 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadPutGetSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadPutGetSelfTest.java
@@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheRebalanceMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
@@ -76,6 +77,8 @@ public class GridCacheDhtPreloadPutGetSelfTest extends GridCommonAbstractTest {
cacheCfg.setRebalanceMode(preloadMode);
cacheCfg.setBackups(backups);
+ cacheCfg.setAffinity(new RendezvousAffinityFunction());
+
TcpDiscoverySpi disco = new TcpDiscoverySpi();
disco.setIpFinder(ipFinder);
http://git-wip-us.apache.org/repos/asf/ignite/blob/8f5a4c9d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheAffinityFunctionExcludeNeighborsAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheAffinityFunctionExcludeNeighborsAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheAffinityFunctionExcludeNeighborsAbstractSelfTest.java
deleted file mode 100644
index 5b66174..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheAffinityFunctionExcludeNeighborsAbstractSelfTest.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.near;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import org.apache.ignite.Ignite;
-import org.apache.ignite.cache.affinity.Affinity;
-import org.apache.ignite.cache.affinity.AffinityFunction;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.internal.IgniteNodeAttributes;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteProductVersion;
-import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
-import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-
-import static org.apache.ignite.cache.CacheMode.PARTITIONED;
-import static org.apache.ignite.cache.CacheRebalanceMode.NONE;
-
-/**
- * Partitioned affinity test.
- */
-@SuppressWarnings({"PointlessArithmeticExpression", "FieldCanBeLocal"})
-public abstract class GridCacheAffinityFunctionExcludeNeighborsAbstractSelfTest extends GridCommonAbstractTest {
- /** Number of backups. */
- private int backups = 2;
-
- /** */
- private int gridInstanceNum;
-
- /** */
- private TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
-
- /** {@inheritDoc} */
- @Override protected IgniteConfiguration getConfiguration(final String gridName) throws Exception {
- IgniteConfiguration c = super.getConfiguration(gridName);
-
- // Override node attributes in discovery spi.
- TcpDiscoverySpi spi = new TcpDiscoverySpi() {
- @Override public void setNodeAttributes(Map<String, Object> attrs,
- IgniteProductVersion ver) {
- super.setNodeAttributes(attrs, ver);
-
- // Set unique mac addresses for every group of three nodes.
- String macAddrs = "MOCK_MACS_" + (gridInstanceNum / 3);
-
- attrs.put(IgniteNodeAttributes.ATTR_MACS, macAddrs);
-
- gridInstanceNum++;
- }
- };
-
- spi.setIpFinder(ipFinder);
-
- c.setDiscoverySpi(spi);
-
- CacheConfiguration cc = defaultCacheConfiguration();
-
- cc.setCacheMode(PARTITIONED);
-
- cc.setBackups(backups);
-
- cc.setAffinity(affinityFunction());
-
- cc.setRebalanceMode(NONE);
-
- c.setCacheConfiguration(cc);
-
- return c;
- }
-
- /**
- * @return Affinity function for test.
- */
- protected abstract AffinityFunction affinityFunction();
-
- /**
- * @param aff Affinity.
- * @param key Key.
- * @return Nodes.
- */
- private static Collection<? extends ClusterNode> nodes(Affinity<Object> aff, Object key) {
- return aff.mapKeyToPrimaryAndBackups(key);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testAffinityMultiNode() throws Exception {
- int grids = 9;
-
- startGrids(grids);
-
- try {
- Object key = 12345;
-
- int copies = backups + 1;
-
- for (int i = 0; i < grids; i++) {
- final Ignite g = grid(i);
-
- Affinity<Object> aff = g.affinity(null);
-
- List<TcpDiscoveryNode> top = new ArrayList<>();
-
- for (ClusterNode node : g.cluster().nodes())
- top.add((TcpDiscoveryNode) node);
-
- Collections.sort(top);
-
- assertEquals(grids, top.size());
-
- int idx = 1;
-
- for (ClusterNode n : top) {
- assertEquals(idx, n.order());
-
- idx++;
- }
-
- Collection<? extends ClusterNode> affNodes = nodes(aff, key);
-
- info("Affinity picture for grid [i=" + i + ", aff=" + U.toShortString(affNodes));
-
- assertEquals(copies, affNodes.size());
-
- Set<String> macs = new HashSet<>();
-
- for (ClusterNode node : affNodes)
- macs.add((String)node.attribute(IgniteNodeAttributes.ATTR_MACS));
-
- assertEquals(copies, macs.size());
- }
- }
- finally {
- stopAllGrids();
- }
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testAffinitySingleNode() throws Exception {
- Ignite g = startGrid();
-
- try {
- Object key = 12345;
-
- Collection<? extends ClusterNode> affNodes = nodes(g.affinity(null), key);
-
- info("Affinity picture for grid: " + U.toShortString(affNodes));
-
- assertEquals(1, affNodes.size());
- }
- finally {
- stopAllGrids();
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8f5a4c9d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheRendezvousAffinityFunctionExcludeNeighborsSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheRendezvousAffinityFunctionExcludeNeighborsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheRendezvousAffinityFunctionExcludeNeighborsSelfTest.java
deleted file mode 100644
index f26a1ef..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheRendezvousAffinityFunctionExcludeNeighborsSelfTest.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.near;
-
-import org.apache.ignite.cache.affinity.AffinityFunction;
-import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
-
-/**
- * Tests exclude neighbors flag for rendezvous affinity function.
- */
-public class GridCacheRendezvousAffinityFunctionExcludeNeighborsSelfTest extends
- GridCacheAffinityFunctionExcludeNeighborsAbstractSelfTest {
- /** {@inheritDoc} */
- @Override protected AffinityFunction affinityFunction() {
- return new RendezvousAffinityFunction(true);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8f5a4c9d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/CacheReplicatedFairAffinityExcludeNeighborsMultiNodeFullApiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/CacheReplicatedFairAffinityExcludeNeighborsMultiNodeFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/CacheReplicatedFairAffinityExcludeNeighborsMultiNodeFullApiSelfTest.java
new file mode 100644
index 0000000..418449a
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/CacheReplicatedFairAffinityExcludeNeighborsMultiNodeFullApiSelfTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.replicated;
+
+import org.apache.ignite.cache.affinity.fair.FairAffinityFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+
+/**
+ * Multi-node tests for partitioned cache with {@link FairAffinityFunction}.
+ */
+public class CacheReplicatedFairAffinityExcludeNeighborsMultiNodeFullApiSelfTest
+ extends GridCacheReplicatedMultiNodeFullApiSelfTest {
+ /** {@inheritDoc} */
+ @Override protected CacheConfiguration cacheConfiguration(String gridName) throws Exception {
+ CacheConfiguration cfg = super.cacheConfiguration(gridName);
+
+ cfg.setAffinity(new FairAffinityFunction(true));
+
+ return cfg;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8f5a4c9d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/CacheReplicatedFairAffinityMultiNodeFullApiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/CacheReplicatedFairAffinityMultiNodeFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/CacheReplicatedFairAffinityMultiNodeFullApiSelfTest.java
new file mode 100644
index 0000000..ea65913
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/CacheReplicatedFairAffinityMultiNodeFullApiSelfTest.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.replicated;
+
+import org.apache.ignite.cache.affinity.fair.FairAffinityFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+
+/**
+ * Multi-node tests for partitioned cache with {@link FairAffinityFunction}.
+ */
+public class CacheReplicatedFairAffinityMultiNodeFullApiSelfTest extends GridCacheReplicatedMultiNodeFullApiSelfTest {
+ /** {@inheritDoc} */
+ @Override protected CacheConfiguration cacheConfiguration(String gridName) throws Exception {
+ CacheConfiguration cfg = super.cacheConfiguration(gridName);
+
+ cfg.setAffinity(new FairAffinityFunction(false));
+
+ return cfg;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8f5a4c9d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/CacheReplicatedRendezvousAffinityExcludeNeighborsMultiNodeFullApiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/CacheReplicatedRendezvousAffinityExcludeNeighborsMultiNodeFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/CacheReplicatedRendezvousAffinityExcludeNeighborsMultiNodeFullApiSelfTest.java
new file mode 100644
index 0000000..66aeefe
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/CacheReplicatedRendezvousAffinityExcludeNeighborsMultiNodeFullApiSelfTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.replicated;
+
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+
+/**
+ * Multi-node tests for partitioned cache with {@link RendezvousAffinityFunction}.
+ */
+public class CacheReplicatedRendezvousAffinityExcludeNeighborsMultiNodeFullApiSelfTest
+ extends GridCacheReplicatedMultiNodeFullApiSelfTest {
+ /** {@inheritDoc} */
+ @Override protected CacheConfiguration cacheConfiguration(String gridName) throws Exception {
+ CacheConfiguration cfg = super.cacheConfiguration(gridName);
+
+ cfg.setAffinity(new RendezvousAffinityFunction(true));
+
+ return cfg;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8f5a4c9d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/CacheReplicatedRendezvousAffinityMultiNodeFullApiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/CacheReplicatedRendezvousAffinityMultiNodeFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/CacheReplicatedRendezvousAffinityMultiNodeFullApiSelfTest.java
new file mode 100644
index 0000000..c6900a5
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/CacheReplicatedRendezvousAffinityMultiNodeFullApiSelfTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.replicated;
+
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+
+/**
+ * Multi-node tests for partitioned cache with {@link RendezvousAffinityFunction}.
+ */
+public class CacheReplicatedRendezvousAffinityMultiNodeFullApiSelfTest
+ extends GridCacheReplicatedMultiNodeFullApiSelfTest {
+ /** {@inheritDoc} */
+ @Override protected CacheConfiguration cacheConfiguration(String gridName) throws Exception {
+ CacheConfiguration cfg = super.cacheConfiguration(gridName);
+
+ cfg.setAffinity(new RendezvousAffinityFunction());
+
+ return cfg;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8f5a4c9d/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFullApiSelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFullApiSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFullApiSelfTestSuite.java
index ff53250..78f82ad 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFullApiSelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFullApiSelfTestSuite.java
@@ -76,6 +76,10 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridCachePar
import org.apache.ignite.internal.processors.cache.distributed.near.GridCachePartitionedOffHeapMultiNodeFullApiSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridCachePartitionedOffHeapTieredFullApiSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridCachePartitionedOffHeapTieredMultiNodeFullApiSelfTest;
+import org.apache.ignite.internal.processors.cache.distributed.replicated.CacheReplicatedFairAffinityExcludeNeighborsMultiNodeFullApiSelfTest;
+import org.apache.ignite.internal.processors.cache.distributed.replicated.CacheReplicatedFairAffinityMultiNodeFullApiSelfTest;
+import org.apache.ignite.internal.processors.cache.distributed.replicated.CacheReplicatedRendezvousAffinityExcludeNeighborsMultiNodeFullApiSelfTest;
+import org.apache.ignite.internal.processors.cache.distributed.replicated.CacheReplicatedRendezvousAffinityMultiNodeFullApiSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.replicated.GridCachePartitionedFairAffinityMultiNodeFullApiSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.replicated.GridCacheReplicatedAtomicFullApiSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.replicated.GridCacheReplicatedAtomicMultiNodeFullApiSelfTest;
@@ -174,6 +178,10 @@ public class IgniteCacheFullApiSelfTestSuite extends TestSuite {
suite.addTestSuite(GridCacheAtomicNearOnlyMultiNodeFullApiSelfTest.class);
suite.addTestSuite(GridCacheAtomicNearOnlyMultiNodeP2PDisabledFullApiSelfTest.class);
+ suite.addTestSuite(CacheReplicatedFairAffinityExcludeNeighborsMultiNodeFullApiSelfTest.class);
+ suite.addTestSuite(CacheReplicatedFairAffinityMultiNodeFullApiSelfTest.class);
+ suite.addTestSuite(CacheReplicatedRendezvousAffinityExcludeNeighborsMultiNodeFullApiSelfTest.class);
+ suite.addTestSuite(CacheReplicatedRendezvousAffinityMultiNodeFullApiSelfTest.class);
suite.addTestSuite(GridCachePartitionedFairAffinityMultiNodeFullApiSelfTest.class);
suite.addTestSuite(GridCachePartitionedNearDisabledFairAffinityMultiNodeFullApiSelfTest.class);
suite.addTestSuite(GridCacheAtomicFairAffinityMultiNodeFullApiSelfTest.class);
[3/3] ignite git commit: IGNITE-1016 - Exclude neighbors flag for
fair affinity. Fixes #80.
Posted by ag...@apache.org.
IGNITE-1016 - Exclude neighbors flag for fair affinity. Fixes #80.
Signed-off-by: Alexey Goncharuk <al...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8f5a4c9d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8f5a4c9d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8f5a4c9d
Branch: refs/heads/ignite-1016
Commit: 8f5a4c9de05a98d2927c602273c967b3655c646d
Parents: 6f3ef6a
Author: Andrey Gura <ag...@gridgain.com>
Authored: Wed Sep 23 18:12:19 2015 -0700
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Wed Sep 23 18:12:19 2015 -0700
----------------------------------------------------------------------
.../ClientAbstractMultiThreadedSelfTest.java | 3 +-
.../affinity/fair/FairAffinityFunction.java | 497 ++++++++++++++-----
.../rendezvous/RendezvousAffinityFunction.java | 140 ++----
.../processors/cache/GridCacheProcessor.java | 13 -
.../processors/cache/GridCacheUtils.java | 52 +-
.../AbstractAffinityFunctionSelfTest.java | 293 +++++++++++
.../affinity/AffinityClientNodeSelfTest.java | 194 ++++++++
...ityFunctionBackupFilterAbstractSelfTest.java | 138 +++++
...unctionExcludeNeighborsAbstractSelfTest.java | 182 +++++++
.../affinity/IgniteClientNodeAffinityTest.java | 194 --------
.../fair/FairAffinityDynamicCacheSelfTest.java | 97 ++++
...airAffinityFunctionBackupFilterSelfTest.java | 35 ++
...ffinityFunctionExcludeNeighborsSelfTest.java | 31 ++
.../fair/FairAffinityFunctionNodesSelfTest.java | 245 +++++++++
.../fair/FairAffinityFunctionSelfTest.java | 31 ++
.../GridFairAffinityFunctionNodesSelfTest.java | 245 ---------
.../fair/GridFairAffinityFunctionSelfTest.java | 270 ----------
.../IgniteFairAffinityDynamicCacheSelfTest.java | 97 ----
...ousAffinityFunctionBackupFilterSelfTest.java | 35 ++
...ffinityFunctionExcludeNeighborsSelfTest.java | 32 ++
.../RendezvousAffinityFunctionSelfTest.java | 50 ++
.../cache/CrossCacheTxRandomOperationsTest.java | 2 +-
.../GridCacheAbstractLocalStoreSelfTest.java | 5 +
...idCacheConfigurationConsistencySelfTest.java | 17 -
...dCachePartitionedAffinityFilterSelfTest.java | 143 ------
.../dht/GridCacheDhtPreloadPutGetSelfTest.java | 3 +
...unctionExcludeNeighborsAbstractSelfTest.java | 184 -------
...ffinityFunctionExcludeNeighborsSelfTest.java | 32 --
...xcludeNeighborsMultiNodeFullApiSelfTest.java | 36 ++
...tedFairAffinityMultiNodeFullApiSelfTest.java | 35 ++
...xcludeNeighborsMultiNodeFullApiSelfTest.java | 36 ++
...dezvousAffinityMultiNodeFullApiSelfTest.java | 36 ++
.../IgniteCacheFullApiSelfTestSuite.java | 8 +
.../ignite/testsuites/IgniteCacheTestSuite.java | 16 +-
.../testsuites/IgniteCacheTestSuite2.java | 12 +-
.../cache/IgniteCacheAbstractQuerySelfTest.java | 4 +
36 files changed, 2022 insertions(+), 1421 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/8f5a4c9d/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientAbstractMultiThreadedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientAbstractMultiThreadedSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientAbstractMultiThreadedSelfTest.java
index 9dd4d83..9f6bf2b 100644
--- a/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientAbstractMultiThreadedSelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientAbstractMultiThreadedSelfTest.java
@@ -254,8 +254,7 @@ public abstract class ClientAbstractMultiThreadedSelfTest extends GridCommonAbst
final ConcurrentLinkedQueue<String> execQueue = new ConcurrentLinkedQueue<>();
IgniteInternalFuture<?> fut = multithreadedAsync(new Runnable() {
- @Override
- public void run() {
+ @Override public void run() {
long processed;
while ((processed = cnt.getAndIncrement()) < taskExecutionCount()) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/8f5a4c9d/modules/core/src/main/java/org/apache/ignite/cache/affinity/fair/FairAffinityFunction.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/affinity/fair/FairAffinityFunction.java b/modules/core/src/main/java/org/apache/ignite/cache/affinity/fair/FairAffinityFunction.java
index cc04875..b42b683 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/affinity/fair/FairAffinityFunction.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/affinity/fair/FairAffinityFunction.java
@@ -31,6 +31,7 @@ import java.util.Map;
import java.util.Queue;
import java.util.RandomAccess;
import java.util.UUID;
+import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.affinity.AffinityCentralizedFunction;
import org.apache.ignite.cache.affinity.AffinityFunction;
import org.apache.ignite.cache.affinity.AffinityFunctionContext;
@@ -38,15 +39,38 @@ import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.EventType;
+import org.apache.ignite.internal.processors.cache.GridCacheUtils;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.resources.LoggerResource;
+import org.jetbrains.annotations.Nullable;
/**
* Fair affinity function which tries to ensure that all nodes get equal number of partitions with
* minimum amount of reassignments between existing nodes.
+ * This function supports the following configuration:
+ * <ul>
+ * <li>
+ * {@code partitions} - Number of partitions to spread across nodes.
+ * </li>
+ * <li>
+ * {@code excludeNeighbors} - If set to {@code true}, will exclude same-host-neighbors
+ * from being backups of each other. This flag can be ignored in cases when topology has no enough nodes
+ * for assign backups.
+ * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}.
+ * </li>
+ * <li>
+ * {@code backupFilter} - Optional filter for back up nodes. If provided, then only
+ * nodes that pass this filter will be selected as backup nodes. If not provided, then
+ * primary and backup nodes will be selected out of all nodes available for this cache.
+ * </li>
+ * </ul>
* <p>
- * Cache affinity can be configured for individual caches via
- * {@link CacheConfiguration#setAffinity(AffinityFunction)} method.
+ * Cache affinity can be configured for individual caches via {@link CacheConfiguration#getAffinity()} method.
*/
@AffinityCentralizedFunction
public class FairAffinityFunction implements AffinityFunction {
@@ -62,21 +86,165 @@ public class FairAffinityFunction implements AffinityFunction {
/** Descending comparator. */
private static final Comparator<PartitionSet> DESC_CMP = Collections.reverseOrder(ASC_CMP);
- /** */
- private final int parts;
+ /** Number of partitions. */
+ private int parts;
+
+ /** Exclude neighbors flag. */
+ private boolean exclNeighbors;
+
+ /** Exclude neighbors warning. */
+ private transient boolean exclNeighborsWarn;
+
+ /** Logger instance. */
+ @LoggerResource
+ private transient IgniteLogger log;
+
+ /** Optional backup filter. First node is primary, second node is a node being tested. */
+ private IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter;
/**
- * Creates fair affinity with default partition count.
+ * Empty constructor with all defaults.
*/
public FairAffinityFunction() {
- this(DFLT_PART_CNT);
+ this(false);
+ }
+
+ /**
+ * Initializes affinity with flag to exclude same-host-neighbors from being backups of each other
+ * and specified number of backups.
+ * <p>
+ * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}.
+ *
+ * @param exclNeighbors {@code True} if nodes residing on the same host may not act as backups
+ * of each other.
+ */
+ public FairAffinityFunction(boolean exclNeighbors) {
+ this(exclNeighbors, DFLT_PART_CNT);
}
/**
* @param parts Number of partitions.
*/
public FairAffinityFunction(int parts) {
+ this(false, parts);
+ }
+
+ /**
+ * Initializes affinity with flag to exclude same-host-neighbors from being backups of each other,
+ * and specified number of backups and partitions.
+ * <p>
+ * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}.
+ *
+ * @param exclNeighbors {@code True} if nodes residing on the same host may not act as backups
+ * of each other.
+ * @param parts Total number of partitions.
+ */
+ public FairAffinityFunction(boolean exclNeighbors, int parts) {
+ this(exclNeighbors, parts, null);
+ }
+
+ /**
+ * Initializes optional counts for replicas and backups.
+ * <p>
+ * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}.
+ *
+ * @param parts Total number of partitions.
+ * @param backupFilter Optional back up filter for nodes. If provided, backups will be selected
+ * from all nodes that pass this filter. First argument for this filter is primary node, and second
+ * argument is node being tested.
+ */
+ public FairAffinityFunction(int parts, @Nullable IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter) {
+ this(false, parts, backupFilter);
+ }
+
+ /**
+ * Private constructor.
+ *
+ * @param exclNeighbors Exclude neighbors flag.
+ * @param parts Partitions count.
+ * @param backupFilter Backup filter.
+ */
+ private FairAffinityFunction(boolean exclNeighbors, int parts,
+ IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter) {
+ A.ensure(parts > 0, "parts > 0");
+
+ this.exclNeighbors = exclNeighbors;
this.parts = parts;
+ this.backupFilter = backupFilter;
+ }
+
+ /**
+ * Gets total number of key partitions. To ensure that all partitions are
+ * equally distributed across all nodes, please make sure that this
+ * number is significantly larger than a number of nodes. Also, partition
+ * size should be relatively small. Try to avoid having partitions with more
+ * than quarter million keys.
+ * <p>
+ * Note that for fully replicated caches this method should always
+ * return {@code 1}.
+ *
+ * @return Total partition count.
+ */
+ public int getPartitions() {
+ return parts;
+ }
+
+ /**
+ * Sets total number of partitions.
+ *
+ * @param parts Total number of partitions.
+ */
+ public void setPartitions(int parts) {
+ this.parts = parts;
+ }
+
+
+ /**
+ * Gets optional backup filter. If not {@code null}, backups will be selected
+ * from all nodes that pass this filter. First node passed to this filter is primary node,
+ * and second node is a node being tested.
+ * <p>
+ * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}.
+ *
+ * @return Optional backup filter.
+ */
+ @Nullable public IgniteBiPredicate<ClusterNode, ClusterNode> getBackupFilter() {
+ return backupFilter;
+ }
+
+ /**
+ * Sets optional backup filter. If provided, then backups will be selected from all
+ * nodes that pass this filter. First node being passed to this filter is primary node,
+ * and second node is a node being tested.
+ * <p>
+ * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}.
+ *
+ * @param backupFilter Optional backup filter.
+ */
+ public void setBackupFilter(@Nullable IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter) {
+ this.backupFilter = backupFilter;
+ }
+
+ /**
+ * Checks flag to exclude same-host-neighbors from being backups of each other (default is {@code false}).
+ * <p>
+ * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}.
+ *
+ * @return {@code True} if nodes residing on the same host may not act as backups of each other.
+ */
+ public boolean isExcludeNeighbors() {
+ return exclNeighbors;
+ }
+
+ /**
+ * Sets flag to exclude same-host-neighbors from being backups of each other (default is {@code false}).
+ * <p>
+ * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}.
+ *
+ * @param exclNeighbors {@code True} if nodes residing on the same host may not act as backups of each other.
+ */
+ public void setExcludeNeighbors(boolean exclNeighbors) {
+ this.exclNeighbors = exclNeighbors;
}
/** {@inheritDoc} */
@@ -89,14 +257,20 @@ public class FairAffinityFunction implements AffinityFunction {
return Collections.nCopies(parts, Collections.singletonList(primary));
}
- List<List<ClusterNode>> assignment = createCopy(ctx);
+ Map<UUID, Collection<ClusterNode>> neighborhoodMap = exclNeighbors
+ ? GridCacheUtils.neighbors(ctx.currentTopologySnapshot())
+ : null;
+
+ List<List<ClusterNode>> assignment = createCopy(ctx, neighborhoodMap);
+
+ int backups = ctx.backups();
- int tiers = Math.min(ctx.backups() + 1, topSnapshot.size());
+ int tiers = backups == Integer.MAX_VALUE ? topSnapshot.size() : Math.min(backups + 1, topSnapshot.size());
// Per tier pending partitions.
Map<Integer, Queue<Integer>> pendingParts = new HashMap<>();
- FullAssignmentMap fullMap = new FullAssignmentMap(tiers, assignment, topSnapshot);
+ FullAssignmentMap fullMap = new FullAssignmentMap(tiers, assignment, topSnapshot, neighborhoodMap);
for (int tier = 0; tier < tiers; tier++) {
// Check if this is a new tier and add pending partitions.
@@ -104,23 +278,32 @@ public class FairAffinityFunction implements AffinityFunction {
for (int part = 0; part < parts; part++) {
if (fullMap.assignments.get(part).size() < tier + 1) {
- if (pending == null) {
- pending = new LinkedList<>();
-
- pendingParts.put(tier, pending);
- }
+ if (pending == null)
+ pendingParts.put(tier, pending = new LinkedList<>());
if (!pending.contains(part))
pending.add(part);
-
}
}
// Assign pending partitions, if any.
- assignPending(tier, pendingParts, fullMap, topSnapshot);
+ assignPending(tier, pendingParts, fullMap, topSnapshot, false);
// Balance assignments.
- balance(tier, pendingParts, fullMap, topSnapshot);
+ boolean balanced = balance(tier, pendingParts, fullMap, topSnapshot, false);
+
+ if (!balanced && exclNeighbors) {
+ assignPending(tier, pendingParts, fullMap, topSnapshot, true);
+
+ balance(tier, pendingParts, fullMap, topSnapshot, true);
+
+ if (!exclNeighborsWarn) {
+ LT.warn(log, null, "Affinity function excludeNeighbors property is ignored " +
+ "because topology has no enough nodes to assign backups.");
+
+ exclNeighborsWarn = true;
+ }
+ }
}
return fullMap.assignments;
@@ -153,9 +336,14 @@ public class FairAffinityFunction implements AffinityFunction {
* @param pendingMap Pending partitions per tier.
* @param fullMap Full assignment map to modify.
* @param topSnapshot Topology snapshot.
+ * @param allowNeighbors Allow neighbors nodes for partition.
*/
- private void assignPending(int tier, Map<Integer, Queue<Integer>> pendingMap, FullAssignmentMap fullMap,
- List<ClusterNode> topSnapshot) {
+ private void assignPending(int tier,
+ Map<Integer, Queue<Integer>> pendingMap,
+ FullAssignmentMap fullMap,
+ List<ClusterNode> topSnapshot,
+ boolean allowNeighbors)
+ {
Queue<Integer> pending = pendingMap.get(tier);
if (F.isEmpty(pending))
@@ -168,19 +356,18 @@ public class FairAffinityFunction implements AffinityFunction {
PrioritizedPartitionMap underloadedNodes = filterNodes(tierMapping, idealPartCnt, false);
// First iterate over underloaded nodes.
- assignPendingToUnderloaded(tier, pendingMap, fullMap, underloadedNodes, topSnapshot, false);
+ assignPendingToUnderloaded(tier, pendingMap, fullMap, underloadedNodes, topSnapshot, false, allowNeighbors);
if (!pending.isEmpty() && !underloadedNodes.isEmpty()) {
// Same, forcing updates.
- assignPendingToUnderloaded(tier, pendingMap, fullMap, underloadedNodes, topSnapshot, true);
+ assignPendingToUnderloaded(tier, pendingMap, fullMap, underloadedNodes, topSnapshot, true, allowNeighbors);
}
if (!pending.isEmpty())
- assignPendingToNodes(tier, pendingMap, fullMap, topSnapshot);
-
- assert pending.isEmpty();
+ assignPendingToNodes(tier, pendingMap, fullMap, topSnapshot, allowNeighbors);
- pendingMap.remove(tier);
+ if (pending.isEmpty())
+ pendingMap.remove(tier);
}
/**
@@ -192,6 +379,7 @@ public class FairAffinityFunction implements AffinityFunction {
* @param underloadedNodes Underloaded nodes.
* @param topSnapshot Topology snapshot.
* @param force {@code True} if partitions should be moved.
+ * @param allowNeighbors Allow neighbors nodes for partition.
*/
private void assignPendingToUnderloaded(
int tier,
@@ -199,7 +387,8 @@ public class FairAffinityFunction implements AffinityFunction {
FullAssignmentMap fullMap,
PrioritizedPartitionMap underloadedNodes,
Collection<ClusterNode> topSnapshot,
- boolean force) {
+ boolean force,
+ boolean allowNeighbors) {
Iterator<Integer> it = pendingMap.get(tier).iterator();
int ideal = parts / topSnapshot.size();
@@ -212,7 +401,7 @@ public class FairAffinityFunction implements AffinityFunction {
assert node != null;
- if (fullMap.assign(part, tier, node, force, pendingMap)) {
+ if (fullMap.assign(part, tier, node, pendingMap, force, allowNeighbors)) {
// We could add partition to partition map without forcing, remove partition from pending.
it.remove();
@@ -237,9 +426,10 @@ public class FairAffinityFunction implements AffinityFunction {
* @param pendingMap Pending partitions per tier.
* @param fullMap Full assignment map to modify.
* @param topSnapshot Topology snapshot.
+ * @param allowNeighbors Allow neighbors nodes for partition.
*/
private void assignPendingToNodes(int tier, Map<Integer, Queue<Integer>> pendingMap,
- FullAssignmentMap fullMap, List<ClusterNode> topSnapshot) {
+ FullAssignmentMap fullMap, List<ClusterNode> topSnapshot, boolean allowNeighbors) {
Iterator<Integer> it = pendingMap.get(tier).iterator();
int idx = 0;
@@ -254,7 +444,7 @@ public class FairAffinityFunction implements AffinityFunction {
do {
ClusterNode node = topSnapshot.get(i);
- if (fullMap.assign(part, tier, node, false, pendingMap)) {
+ if (fullMap.assign(part, tier, node, pendingMap, false, allowNeighbors)) {
it.remove();
assigned = true;
@@ -270,7 +460,7 @@ public class FairAffinityFunction implements AffinityFunction {
do {
ClusterNode node = topSnapshot.get(i);
- if (fullMap.assign(part, tier, node, true, pendingMap)) {
+ if (fullMap.assign(part, tier, node, pendingMap, true, allowNeighbors)) {
it.remove();
assigned = true;
@@ -283,7 +473,7 @@ public class FairAffinityFunction implements AffinityFunction {
} while (i != idx);
}
- if (!assigned)
+ if (!assigned && (!exclNeighbors || exclNeighbors && allowNeighbors))
throw new IllegalStateException("Failed to find assignable node for partition.");
}
}
@@ -295,9 +485,10 @@ public class FairAffinityFunction implements AffinityFunction {
* @param pendingParts Pending partitions per tier.
* @param fullMap Full assignment map to modify.
* @param topSnapshot Topology snapshot.
+ * @param allowNeighbors Allow neighbors nodes for partition.
*/
- private void balance(int tier, Map<Integer, Queue<Integer>> pendingParts, FullAssignmentMap fullMap,
- Collection<ClusterNode> topSnapshot) {
+ private boolean balance(int tier, Map<Integer, Queue<Integer>> pendingParts, FullAssignmentMap fullMap,
+ Collection<ClusterNode> topSnapshot, boolean allowNeighbors) {
int idealPartCnt = parts / topSnapshot.size();
Map<UUID, PartitionSet> mapping = fullMap.tierMapping(tier);
@@ -313,7 +504,7 @@ public class FairAffinityFunction implements AffinityFunction {
boolean assigned = false;
for (PartitionSet underloaded : underloadedNodes.assignments()) {
- if (fullMap.assign(part, tier, underloaded.node(), false, pendingParts)) {
+ if (fullMap.assign(part, tier, underloaded.node(), pendingParts, false, allowNeighbors)) {
// Size of partition sets has changed.
if (overloaded.size() <= idealPartCnt)
overloadedNodes.remove(overloaded.nodeId());
@@ -335,7 +526,7 @@ public class FairAffinityFunction implements AffinityFunction {
if (!assigned) {
for (PartitionSet underloaded : underloadedNodes.assignments()) {
- if (fullMap.assign(part, tier, underloaded.node(), true, pendingParts)) {
+ if (fullMap.assign(part, tier, underloaded.node(), pendingParts, true, allowNeighbors)) {
// Size of partition sets has changed.
if (overloaded.size() <= idealPartCnt)
overloadedNodes.remove(overloaded.nodeId());
@@ -366,6 +557,8 @@ public class FairAffinityFunction implements AffinityFunction {
break;
}
while (true);
+
+ return underloadedNodes.isEmpty();
}
/**
@@ -393,9 +586,12 @@ public class FairAffinityFunction implements AffinityFunction {
* Creates copy of previous partition assignment.
*
* @param ctx Affinity function context.
+ * @param neighborhoodMap Neighbors nodes grouped by target node.
* @return Assignment copy and per node partition map.
*/
- private List<List<ClusterNode>> createCopy(AffinityFunctionContext ctx) {
+ private List<List<ClusterNode>> createCopy(AffinityFunctionContext ctx,
+ Map<UUID, Collection<ClusterNode>> neighborhoodMap)
+ {
DiscoveryEvent discoEvt = ctx.discoveryEvent();
UUID leftNodeId = (discoEvt == null || discoEvt.type() == EventType.EVT_NODE_JOINED)
@@ -411,26 +607,42 @@ public class FairAffinityFunction implements AffinityFunction {
if (partNodes == null)
partNodesCp = new ArrayList<>();
- else {
- if (leftNodeId == null) {
- partNodesCp = new ArrayList<>(partNodes.size() + 1); // Node joined.
+ else
+ partNodesCp = copyAssigments(neighborhoodMap, partNodes, leftNodeId);
- partNodesCp.addAll(partNodes);
- }
- else {
- partNodesCp = new ArrayList<>(partNodes.size());
+ cp.add(partNodesCp);
+ }
+
+ return cp;
+ }
+
+ /**
+ * @param neighborhoodMap Neighbors nodes grouped by target node.
+ * @param partNodes Partition nodes.
+ * @param leftNodeId Left node id.
+ */
+ private List<ClusterNode> copyAssigments(Map<UUID, Collection<ClusterNode>> neighborhoodMap,
+ List<ClusterNode> partNodes, UUID leftNodeId) {
+ final List<ClusterNode> partNodesCp = new ArrayList<>(partNodes.size());
+
+ for (ClusterNode node : partNodes) {
+ if (node.id().equals(leftNodeId))
+ continue;
+
+ boolean containsNeighbor = false;
- for (ClusterNode affNode : partNodes) {
- if (!affNode.id().equals(leftNodeId))
- partNodesCp.add(affNode);
+ if (neighborhoodMap != null)
+ containsNeighbor = F.exist(neighborhoodMap.get(node.id()), new IgnitePredicate<ClusterNode>() {
+ @Override public boolean apply(ClusterNode node) {
+ return partNodesCp.contains(node);
}
- }
- }
+ });
- cp.add(partNodesCp);
+ if (!containsNeighbor)
+ partNodesCp.add(node);
}
- return cp;
+ return partNodesCp;
}
/**
@@ -512,59 +724,11 @@ public class FairAffinityFunction implements AffinityFunction {
}
/**
- * Constructs assignment map for specified tier.
- *
- * @param tier Tier number, -1 for all tiers altogether.
- * @param assignment Assignment to construct map from.
- * @param topSnapshot Topology snapshot.
- * @return Assignment map.
- */
- private static Map<UUID, PartitionSet> assignments(int tier, List<List<ClusterNode>> assignment,
- Collection<ClusterNode> topSnapshot) {
- Map<UUID, PartitionSet> tmp = new LinkedHashMap<>();
-
- for (int part = 0; part < assignment.size(); part++) {
- List<ClusterNode> nodes = assignment.get(part);
-
- assert nodes instanceof RandomAccess;
-
- if (nodes.size() <= tier)
- continue;
-
- int start = tier < 0 ? 0 : tier;
- int end = tier < 0 ? nodes.size() : tier + 1;
-
- for (int i = start; i < end; i++) {
- ClusterNode n = nodes.get(i);
-
- PartitionSet set = tmp.get(n.id());
-
- if (set == null) {
- set = new PartitionSet(n);
-
- tmp.put(n.id(), set);
- }
-
- set.add(part);
- }
- }
-
- if (tmp.size() < topSnapshot.size()) {
- for (ClusterNode node : topSnapshot) {
- if (!tmp.containsKey(node.id()))
- tmp.put(node.id(), new PartitionSet(node));
- }
- }
-
- return tmp;
- }
-
- /**
* Full assignment map. Auxiliary data structure which maintains resulting assignment and temporary
* maps consistent.
*/
@SuppressWarnings("unchecked")
- private static class FullAssignmentMap {
+ private class FullAssignmentMap {
/** Per-tier assignment maps. */
private Map<UUID, PartitionSet>[] tierMaps;
@@ -574,20 +738,28 @@ public class FairAffinityFunction implements AffinityFunction {
/** Resulting assignment. */
private List<List<ClusterNode>> assignments;
+ /** Neighborhood map. */
+ private final Map<UUID, Collection<ClusterNode>> neighborhoodMap;
+
/**
* @param tiers Number of tiers.
* @param assignments Assignments to modify.
* @param topSnapshot Topology snapshot.
+ * @param neighborhoodMap Neighbors nodes grouped by target node.
*/
- private FullAssignmentMap(int tiers, List<List<ClusterNode>> assignments, Collection<ClusterNode> topSnapshot) {
+ private FullAssignmentMap(int tiers,
+ List<List<ClusterNode>> assignments,
+ Collection<ClusterNode> topSnapshot,
+ Map<UUID, Collection<ClusterNode>> neighborhoodMap)
+ {
this.assignments = assignments;
-
- tierMaps = new Map[tiers];
+ this.neighborhoodMap = neighborhoodMap;
+ this.tierMaps = new Map[tiers];
for (int tier = 0; tier < tiers; tier++)
- tierMaps[tier] = assignments(tier, assignments, topSnapshot);
+ tierMaps[tier] = assignments(tier, topSnapshot);
- fullMap = assignments(-1, assignments, topSnapshot);
+ fullMap = assignments(-1, topSnapshot);
}
/**
@@ -599,14 +771,20 @@ public class FairAffinityFunction implements AffinityFunction {
* @param part Partition to assign.
* @param tier Tier number to assign.
* @param node Node to move partition to.
- * @param force Force flag.
* @param pendingParts per tier pending partitions map.
+ * @param force Force flag.
+ * @param allowNeighbors Allow neighbors nodes for partition.
* @return {@code True} if assignment succeeded.
*/
- boolean assign(int part, int tier, ClusterNode node, boolean force, Map<Integer, Queue<Integer>> pendingParts) {
+ boolean assign(int part,
+ int tier,
+ ClusterNode node,
+ Map<Integer, Queue<Integer>> pendingParts, boolean force,
+ boolean allowNeighbors)
+ {
UUID nodeId = node.id();
- if (!fullMap.get(nodeId).contains(part)) {
+ if (isAssignable(part, tier, node, allowNeighbors)) {
tierMaps[tier].get(nodeId).add(part);
fullMap.get(nodeId).add(part);
@@ -656,11 +834,8 @@ public class FairAffinityFunction implements AffinityFunction {
Queue<Integer> pending = pendingParts.get(t);
- if (pending == null) {
- pending = new LinkedList<>();
-
- pendingParts.put(t, pending);
- }
+ if (pending == null)
+ pendingParts.put(t, pending = new LinkedList<>());
pending.add(part);
@@ -668,7 +843,7 @@ public class FairAffinityFunction implements AffinityFunction {
}
}
- throw new IllegalStateException("Unable to assign partition to node while force is true.");
+ return false;
}
// !force.
@@ -684,6 +859,102 @@ public class FairAffinityFunction implements AffinityFunction {
public Map<UUID, PartitionSet> tierMapping(int tier) {
return tierMaps[tier];
}
+
+ /**
+ * @param part Partition.
+ * @param tier Tier.
+ * @param node Node.
+ * @param allowNeighbors Allow neighbors.
+ */
+ private boolean isAssignable(int part, int tier, final ClusterNode node, boolean allowNeighbors) {
+ if (containsPartition(part, node))
+ return false;
+
+ if (exclNeighbors)
+ return allowNeighbors || !neighborsContainPartition(node, part);
+ else if (backupFilter == null)
+ return true;
+ else {
+ if (tier == 0) {
+ List<ClusterNode> assigment = assignments.get(part);
+
+ assert assigment.size() > 0;
+
+ List<ClusterNode> backups = assigment.subList(1, assigment.size());
+
+ return !F.exist(backups, new IgnitePredicate<ClusterNode>() {
+ @Override public boolean apply(ClusterNode n) {
+ return !backupFilter.apply(node, n);
+ }
+ });
+ }
+ else
+ return (backupFilter.apply(assignments.get(part).get(0), node));
+ }
+ }
+
+ /**
+ * @param part Partition.
+ * @param node Node.
+ */
+ private boolean containsPartition(int part, ClusterNode node) {
+ return fullMap.get(node.id()).contains(part);
+ }
+
+ /**
+ * @param node Node.
+ * @param part Partition.
+ */
+ private boolean neighborsContainPartition(ClusterNode node, final int part) {
+ return F.exist(neighborhoodMap.get(node.id()), new IgnitePredicate<ClusterNode>() {
+ @Override public boolean apply(ClusterNode n) {
+ return fullMap.get(n.id()).contains(part);
+ }
+ });
+ }
+
+ /**
+ * Constructs assignments map for specified tier.
+ *
+ * @param tier Tier number, -1 for all tiers altogether.
+ * @param topSnapshot Topology snapshot.
+ * @return Assignment map.
+ */
+ private Map<UUID, PartitionSet> assignments(int tier, Collection<ClusterNode> topSnapshot) {
+ Map<UUID, PartitionSet> tmp = new LinkedHashMap<>();
+
+ for (int part = 0; part < assignments.size(); part++) {
+ List<ClusterNode> nodes = assignments.get(part);
+
+ assert nodes instanceof RandomAccess;
+
+ if (nodes.size() <= tier)
+ continue;
+
+ int start = tier < 0 ? 0 : tier;
+ int end = tier < 0 ? nodes.size() : tier + 1;
+
+ for (int i = start; i < end; i++) {
+ ClusterNode n = nodes.get(i);
+
+ PartitionSet set = tmp.get(n.id());
+
+ if (set == null)
+ tmp.put(n.id(), set = new PartitionSet(n));
+
+ set.add(part);
+ }
+ }
+
+ if (tmp.size() < topSnapshot.size()) {
+ for (ClusterNode node : topSnapshot) {
+ if (!tmp.containsKey(node.id()))
+ tmp.put(node.id(), new PartitionSet(node));
+ }
+ }
+
+ return tmp;
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/8f5a4c9d/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java b/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java
index fd07eb9..61a21d3 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java
@@ -29,26 +29,28 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.affinity.AffinityFunction;
import org.apache.ignite.cache.affinity.AffinityFunctionContext;
import org.apache.ignite.cache.affinity.AffinityNodeHashResolver;
import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.internal.IgniteNodeAttributes;
+import org.apache.ignite.internal.processors.cache.GridCacheUtils;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.resources.LoggerResource;
import org.jetbrains.annotations.Nullable;
/**
@@ -60,8 +62,9 @@ import org.jetbrains.annotations.Nullable;
* </li>
* <li>
* {@code excludeNeighbors} - If set to {@code true}, will exclude same-host-neighbors
- * from being backups of each other. Note that {@code backupFilter} is ignored if
- * {@code excludeNeighbors} is set to {@code true}.
+ * from being backups of each other. This flag can be ignored in cases when topology has no enough nodes
+ * for assign backups.
+ * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}.
* </li>
* <li>
* {@code backupFilter} - Optional filter for back up nodes. If provided, then only
@@ -70,7 +73,7 @@ import org.jetbrains.annotations.Nullable;
* </li>
* </ul>
* <p>
- * Cache affinity can be configured for individual caches via {@link org.apache.ignite.configuration.CacheConfiguration#getAffinity()} method.
+ * Cache affinity can be configured for individual caches via {@link CacheConfiguration#getAffinity()} method.
*/
public class RendezvousAffinityFunction implements AffinityFunction, Externalizable {
/** */
@@ -80,8 +83,7 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza
public static final int DFLT_PARTITION_COUNT = 1024;
/** Comparator. */
- private static final Comparator<IgniteBiTuple<Long, ClusterNode>> COMPARATOR =
- new HashComparator();
+ private static final Comparator<IgniteBiTuple<Long, ClusterNode>> COMPARATOR = new HashComparator();
/** Thread local message digest. */
private ThreadLocal<MessageDigest> digest = new ThreadLocal<MessageDigest>() {
@@ -92,8 +94,7 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza
catch (NoSuchAlgorithmException e) {
assert false : "Should have failed in constructor";
- throw new IgniteException("Failed to obtain message digest (digest was available in constructor)",
- e);
+ throw new IgniteException("Failed to obtain message digest (digest was available in constructor)", e);
}
}
};
@@ -104,6 +105,9 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza
/** Exclude neighbors flag. */
private boolean exclNeighbors;
+ /** Exclude neighbors warning. */
+ private transient boolean exclNeighborsWarn;
+
/** Optional backup filter. First node is primary, second node is a node being tested. */
private IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter;
@@ -114,6 +118,10 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza
@IgniteInstanceResource
private Ignite ignite;
+ /** Logger instance. */
+ @LoggerResource
+ private transient IgniteLogger log;
+
/**
* Empty constructor with all defaults.
*/
@@ -125,7 +133,7 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza
* Initializes affinity with flag to exclude same-host-neighbors from being backups of each other
* and specified number of backups.
* <p>
- * Note that {@code excludeNeighbors} parameter is ignored if {@code #getBackupFilter()} is set.
+ * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}.
*
* @param exclNeighbors {@code True} if nodes residing on the same host may not act as backups
* of each other.
@@ -138,7 +146,7 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza
* Initializes affinity with flag to exclude same-host-neighbors from being backups of each other,
* and specified number of backups and partitions.
* <p>
- * Note that {@code excludeNeighbors} parameter is ignored if {@code #getBackupFilter()} is set.
+ * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}.
*
* @param exclNeighbors {@code True} if nodes residing on the same host may not act as backups
* of each other.
@@ -151,14 +159,14 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza
/**
* Initializes optional counts for replicas and backups.
* <p>
- * Note that {@code excludeNeighbors} parameter is ignored if {@code backupFilter} is set.
+ * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}.
*
* @param parts Total number of partitions.
* @param backupFilter Optional back up filter for nodes. If provided, backups will be selected
* from all nodes that pass this filter. First argument for this filter is primary node, and second
* argument is node being tested.
* <p>
- * Note that {@code excludeNeighbors} parameter is ignored if {@code backupFilter} is set.
+ * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}.
*/
public RendezvousAffinityFunction(int parts, @Nullable IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter) {
this(false, parts, backupFilter);
@@ -173,7 +181,7 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza
*/
private RendezvousAffinityFunction(boolean exclNeighbors, int parts,
IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter) {
- A.ensure(parts != 0, "parts != 0");
+ A.ensure(parts > 0, "parts > 0");
this.exclNeighbors = exclNeighbors;
this.parts = parts;
@@ -253,7 +261,7 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza
* from all nodes that pass this filter. First node passed to this filter is primary node,
* and second node is a node being tested.
* <p>
- * Note that {@code excludeNeighbors} parameter is ignored if {@code backupFilter} is set.
+ * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}.
*
* @return Optional backup filter.
*/
@@ -266,7 +274,7 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza
* nodes that pass this filter. First node being passed to this filter is primary node,
* and second node is a node being tested.
* <p>
- * Note that {@code excludeNeighbors} parameter is ignored if {@code backupFilter} is set.
+ * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}.
*
* @param backupFilter Optional backup filter.
*/
@@ -277,7 +285,7 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza
/**
* Checks flag to exclude same-host-neighbors from being backups of each other (default is {@code false}).
* <p>
- * Note that {@code excludeNeighbors} parameter is ignored if {@code #getBackupFilter()} is set.
+ * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}.
*
* @return {@code True} if nodes residing on the same host may not act as backups of each other.
*/
@@ -288,7 +296,7 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza
/**
* Sets flag to exclude same-host-neighbors from being backups of each other (default is {@code false}).
* <p>
- * Note that {@code excludeNeighbors} parameter is ignored if {@code #getBackupFilter()} is set.
+ * Note that {@code backupFilter} is ignored if {@code excludeNeighbors} is set to {@code true}.
*
* @param exclNeighbors {@code True} if nodes residing on the same host may not act as backups of each other.
*/
@@ -355,20 +363,9 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza
Collections.sort(lst, COMPARATOR);
- int primaryAndBackups;
-
- List<ClusterNode> res;
-
- if (backups == Integer.MAX_VALUE) {
- primaryAndBackups = Integer.MAX_VALUE;
-
- res = new ArrayList<>();
- }
- else {
- primaryAndBackups = backups + 1;
+ int primaryAndBackups = backups == Integer.MAX_VALUE ? nodes.size() : Math.min(backups + 1, nodes.size());
- res = new ArrayList<>(primaryAndBackups);
- }
+ List<ClusterNode> res = new ArrayList<>(primaryAndBackups);
ClusterNode primary = lst.get(0).get2();
@@ -376,39 +373,38 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza
// Select backups.
if (backups > 0) {
- for (int i = 1; i < lst.size(); i++) {
+ for (int i = 1; i < lst.size() && res.size() < primaryAndBackups; i++) {
IgniteBiTuple<Long, ClusterNode> next = lst.get(i);
ClusterNode node = next.get2();
if (exclNeighbors) {
- Collection<ClusterNode> allNeighbors = allNeighbors(neighborhoodCache, res);
+ Collection<ClusterNode> allNeighbors = GridCacheUtils.neighborsForNodes(neighborhoodCache, res);
if (!allNeighbors.contains(node))
res.add(node);
}
- else {
- if (!res.contains(node) && (backupFilter == null || backupFilter.apply(primary, node)))
- res.add(next.get2());
- }
-
- if (res.size() == primaryAndBackups)
- break;
+ else if (backupFilter == null || backupFilter.apply(primary, node))
+ res.add(next.get2());
}
}
if (res.size() < primaryAndBackups && nodes.size() >= primaryAndBackups && exclNeighbors) {
- // Need to iterate one more time in case if there are no nodes which pass exclude backups criteria.
- for (int i = 1; i < lst.size(); i++) {
+ // Need to iterate again in case if there are no nodes which pass exclude neighbors backups criteria.
+ for (int i = 1; i < lst.size() && res.size() < primaryAndBackups; i++) {
IgniteBiTuple<Long, ClusterNode> next = lst.get(i);
ClusterNode node = next.get2();
if (!res.contains(node))
res.add(next.get2());
+ }
+
+ if (!exclNeighborsWarn) {
+ LT.warn(log, null, "Affinity function excludeNeighbors property is ignored " +
+ "because topology has no enough nodes to assign backups.");
- if (res.size() == primaryAndBackups)
- break;
+ exclNeighborsWarn = true;
}
}
@@ -437,7 +433,7 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza
List<List<ClusterNode>> assignments = new ArrayList<>(parts);
Map<UUID, Collection<ClusterNode>> neighborhoodCache = exclNeighbors ?
- neighbors(affCtx.currentTopologySnapshot()) : null;
+ GridCacheUtils.neighbors(affCtx.currentTopologySnapshot()) : null;
for (int i = 0; i < parts; i++) {
List<ClusterNode> partAssignment = assignPartition(i, affCtx.currentTopologySnapshot(), affCtx.backups(),
@@ -463,6 +459,7 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza
}
/** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
@Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
parts = in.readInt();
exclNeighbors = in.readBoolean();
@@ -471,57 +468,6 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza
}
/**
- * Builds neighborhood map for all nodes in snapshot.
- *
- * @param topSnapshot Topology snapshot.
- * @return Neighbors map.
- */
- private Map<UUID, Collection<ClusterNode>> neighbors(Collection<ClusterNode> topSnapshot) {
- Map<String, Collection<ClusterNode>> macMap = new HashMap<>(topSnapshot.size(), 1.0f);
-
- // Group by mac addresses.
- for (ClusterNode node : topSnapshot) {
- String macs = node.attribute(IgniteNodeAttributes.ATTR_MACS);
-
- Collection<ClusterNode> nodes = macMap.get(macs);
-
- if (nodes == null) {
- nodes = new HashSet<>();
-
- macMap.put(macs, nodes);
- }
-
- nodes.add(node);
- }
-
- Map<UUID, Collection<ClusterNode>> neighbors = new HashMap<>(topSnapshot.size(), 1.0f);
-
- for (Collection<ClusterNode> group : macMap.values()) {
- for (ClusterNode node : group)
- neighbors.put(node.id(), group);
- }
-
- return neighbors;
- }
-
- /**
- * @param neighborhoodCache Neighborhood cache.
- * @param nodes Nodes.
- * @return All neighbors for given nodes.
- */
- private Collection<ClusterNode> allNeighbors(Map<UUID, Collection<ClusterNode>> neighborhoodCache,
- Iterable<ClusterNode> nodes) {
- Collection<ClusterNode> res = new HashSet<>();
-
- for (ClusterNode node : nodes) {
- if (!res.contains(node))
- res.addAll(neighborhoodCache.get(node.id()));
- }
-
- return res;
- }
-
- /**
*
*/
private static class HashComparator implements Comparator<IgniteBiTuple<Long, ClusterNode>>, Serializable {
http://git-wip-us.apache.org/repos/asf/ignite/blob/8f5a4c9d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 74124bf..e0286ca 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -52,7 +52,6 @@ import org.apache.ignite.cache.CacheRebalanceMode;
import org.apache.ignite.cache.affinity.AffinityFunction;
import org.apache.ignite.cache.affinity.AffinityFunctionContext;
import org.apache.ignite.cache.affinity.AffinityNodeAddressHashResolver;
-import org.apache.ignite.cache.affinity.fair.FairAffinityFunction;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cache.store.CacheStore;
import org.apache.ignite.cache.store.CacheStoreSessionListener;
@@ -364,18 +363,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
CacheType cacheType,
@Nullable CacheStore cfgStore) throws IgniteCheckedException {
if (cc.getCacheMode() == REPLICATED) {
- if (cc.getAffinity() instanceof FairAffinityFunction)
- throw new IgniteCheckedException("REPLICATED cache can not be started with FairAffinityFunction" +
- " [cacheName=" + U.maskName(cc.getName()) + ']');
-
- if (cc.getAffinity() instanceof RendezvousAffinityFunction) {
- RendezvousAffinityFunction aff = (RendezvousAffinityFunction)cc.getAffinity();
-
- if (aff.isExcludeNeighbors())
- throw new IgniteCheckedException("For REPLICATED cache flag 'excludeNeighbors' in " +
- "RendezvousAffinityFunction cannot be set [cacheName=" + U.maskName(cc.getName()) + ']');
- }
-
if (cc.getNearConfiguration() != null &&
ctx.discovery().cacheAffinityNode(ctx.discovery().localNode(), cc.getName())) {
U.warn(log, "Near cache cannot be used with REPLICATED cache, " +
http://git-wip-us.apache.org/repos/asf/ignite/blob/8f5a4c9d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index 2d5698a..f8dbd4e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -27,6 +27,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -1805,4 +1806,53 @@ public class GridCacheUtils {
}
};
}
-}
\ No newline at end of file
+
+ /**
+ * Builds neighborhood map for all nodes in snapshot.
+ *
+ * @param topSnapshot Topology snapshot.
+ * @return Neighbors map.
+ */
+ public static Map<UUID, Collection<ClusterNode>> neighbors(Collection<ClusterNode> topSnapshot) {
+ Map<String, Collection<ClusterNode>> macMap = new HashMap<>(topSnapshot.size(), 1.0f);
+
+ // Group by mac addresses.
+ for (ClusterNode node : topSnapshot) {
+ String macs = node.attribute(IgniteNodeAttributes.ATTR_MACS);
+
+ Collection<ClusterNode> nodes = macMap.get(macs);
+
+ if (nodes == null)
+ macMap.put(macs, nodes = new HashSet<>());
+
+ nodes.add(node);
+ }
+
+ Map<UUID, Collection<ClusterNode>> neighbors = new HashMap<>(topSnapshot.size(), 1.0f);
+
+ for (Collection<ClusterNode> group : macMap.values())
+ for (ClusterNode node : group)
+ neighbors.put(node.id(), group);
+
+ return neighbors;
+ }
+
+ /**
+ * Returns neighbors for all {@code nodes}.
+ *
+ * @param neighborhood Neighborhood cache.
+ * @param nodes Nodes.
+ * @return All neighbors for given nodes.
+ */
+ public static Collection<ClusterNode> neighborsForNodes(Map<UUID, Collection<ClusterNode>> neighborhood,
+ Iterable<ClusterNode> nodes) {
+ Collection<ClusterNode> res = new HashSet<>();
+
+ for (ClusterNode node : nodes) {
+ if (!res.contains(node))
+ res.addAll(neighborhood.get(node.id()));
+ }
+
+ return res;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8f5a4c9d/modules/core/src/test/java/org/apache/ignite/cache/affinity/AbstractAffinityFunctionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/AbstractAffinityFunctionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/AbstractAffinityFunctionSelfTest.java
new file mode 100644
index 0000000..878d7d1
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/cache/affinity/AbstractAffinityFunctionSelfTest.java
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.affinity;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.events.EventType;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.affinity.GridAffinityFunctionContextImpl;
+import org.apache.ignite.testframework.GridTestNode;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ *
+ */
+public abstract class AbstractAffinityFunctionSelfTest extends GridCommonAbstractTest {
+ /** MAC prefix. */
+ private static final String MAC_PREF = "MAC";
+
+ /**
+ * Returns affinity function.
+ *
+ * @return Affinity function.
+ */
+ protected abstract AffinityFunction affinityFunction();
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testNodeRemovedNoBackups() throws Exception {
+ checkNodeRemoved(0);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testNodeRemovedOneBackup() throws Exception {
+ checkNodeRemoved(1);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testNodeRemovedTwoBackups() throws Exception {
+ checkNodeRemoved(2);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testNodeRemovedThreeBackups() throws Exception {
+ checkNodeRemoved(3);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testRandomReassignmentNoBackups() throws Exception {
+ checkRandomReassignment(0);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testRandomReassignmentOneBackup() throws Exception {
+ checkRandomReassignment(1);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testRandomReassignmentTwoBackups() throws Exception {
+ checkRandomReassignment(2);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testRandomReassignmentThreeBackups() throws Exception {
+ checkRandomReassignment(3);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ protected void checkNodeRemoved(int backups) throws Exception {
+ checkNodeRemoved(backups, 1, 1);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ protected void checkNodeRemoved(int backups, int neighborsPerHost, int neighborsPeriod) throws Exception {
+
+ AffinityFunction aff = affinityFunction();
+
+ int nodesCnt = 50;
+
+ List<ClusterNode> nodes = new ArrayList<>(nodesCnt);
+
+ List<List<ClusterNode>> prev = null;
+
+ for (int i = 0; i < nodesCnt; i++) {
+ info("======================================");
+ info("Assigning partitions: " + i);
+ info("======================================");
+
+ ClusterNode node = new GridTestNode(UUID.randomUUID());
+
+ if (neighborsPerHost > 0)
+ node.attribute(MAC_PREF + ((i / neighborsPeriod) % (nodesCnt / neighborsPerHost)));
+
+ nodes.add(node);
+
+ DiscoveryEvent discoEvt = new DiscoveryEvent(node, "", EventType.EVT_NODE_JOINED, node);
+
+ GridAffinityFunctionContextImpl ctx =
+ new GridAffinityFunctionContextImpl(nodes, prev, discoEvt, new AffinityTopologyVersion(i), backups);
+
+ List<List<ClusterNode>> assignment = aff.assignPartitions(ctx);
+
+ info("Assigned.");
+
+ verifyAssignment(assignment, backups, aff.partitions(), nodes.size());
+
+ prev = assignment;
+ }
+
+ info("======================================");
+ info("Will remove nodes.");
+ info("======================================");
+
+ for (int i = 0; i < nodesCnt - 1; i++) {
+ info("======================================");
+ info("Assigning partitions: " + i);
+ info("======================================");
+
+ ClusterNode rmv = nodes.remove(nodes.size() - 1);
+
+ DiscoveryEvent discoEvt = new DiscoveryEvent(rmv, "", EventType.EVT_NODE_LEFT, rmv);
+
+ List<List<ClusterNode>> assignment = aff.assignPartitions(
+ new GridAffinityFunctionContextImpl(nodes, prev, discoEvt, new AffinityTopologyVersion(i),
+ backups));
+
+ info("Assigned.");
+
+ verifyAssignment(assignment, backups, aff.partitions(), nodes.size());
+
+ prev = assignment;
+ }
+ }
+
+ /**
+ * @param backups Backups.
+ */
+ protected void checkRandomReassignment(int backups) {
+ AffinityFunction aff = affinityFunction();
+
+ Random rnd = new Random();
+
+ int maxNodes = 50;
+
+ List<ClusterNode> nodes = new ArrayList<>(maxNodes);
+
+ List<List<ClusterNode>> prev = null;
+
+ int state = 0;
+
+ int i = 0;
+
+ while (true) {
+ boolean add;
+
+ if (nodes.size() < 2) {
+ // Returned back to one node?
+ if (state == 1)
+ return;
+
+ add = true;
+ }
+ else if (nodes.size() == maxNodes) {
+ if (state == 0)
+ state = 1;
+
+ add = false;
+ }
+ else {
+ // Nodes size in [2, maxNodes - 1].
+ if (state == 0)
+ add = rnd.nextInt(3) != 0; // 66% to add, 33% to remove.
+ else
+ add = rnd.nextInt(3) == 0; // 33% to add, 66% to remove.
+ }
+
+ DiscoveryEvent discoEvt;
+
+ if (add) {
+ ClusterNode addedNode = new GridTestNode(UUID.randomUUID());
+
+ nodes.add(addedNode);
+
+ discoEvt = new DiscoveryEvent(addedNode, "", EventType.EVT_NODE_JOINED, addedNode);
+ }
+ else {
+ ClusterNode rmvNode = nodes.remove(rnd.nextInt(nodes.size()));
+
+ discoEvt = new DiscoveryEvent(rmvNode, "", EventType.EVT_NODE_LEFT, rmvNode);
+ }
+
+ info("======================================");
+ info("Assigning partitions [iter=" + i + ", discoEvt=" + discoEvt + ", nodesSize=" + nodes.size() + ']');
+ info("======================================");
+
+ List<List<ClusterNode>> assignment = aff.assignPartitions(
+ new GridAffinityFunctionContextImpl(nodes, prev, discoEvt, new AffinityTopologyVersion(i),
+ backups));
+
+ verifyAssignment(assignment, backups, aff.partitions(), nodes.size());
+
+ prev = assignment;
+
+ i++;
+ }
+ }
+
+
+ /**
+ * @param assignment Assignment to verify.
+ */
+ private void verifyAssignment(List<List<ClusterNode>> assignment, int keyBackups, int partsCnt, int topSize) {
+ Map<UUID, Collection<Integer>> mapping = new HashMap<>();
+
+ int ideal = Math.round((float)partsCnt / topSize * Math.min(keyBackups + 1, topSize));
+
+ for (int part = 0; part < assignment.size(); part++) {
+ for (ClusterNode node : assignment.get(part)) {
+ assert node != null;
+
+ Collection<Integer> parts = mapping.get(node.id());
+
+ if (parts == null) {
+ parts = new HashSet<>();
+
+ mapping.put(node.id(), parts);
+ }
+
+ assertTrue(parts.add(part));
+ }
+ }
+
+ int max = -1, min = Integer.MAX_VALUE;
+
+ for (Collection<Integer> parts : mapping.values()) {
+ max = Math.max(max, parts.size());
+ min = Math.min(min, parts.size());
+ }
+
+ log().warning("max=" + max + ", min=" + min + ", ideal=" + ideal + ", minDev=" + deviation(min, ideal) + "%, " +
+ "maxDev=" + deviation(max, ideal) + "%");
+ }
+
+ /**
+ * @param val Value.
+ * @param ideal Ideal.
+ */
+ private static int deviation(int val, int ideal) {
+ return Math.round(Math.abs(((float)val - ideal) / ideal * 100));
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8f5a4c9d/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityClientNodeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityClientNodeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityClientNodeSelfTest.java
new file mode 100644
index 0000000..24704ed
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityClientNodeSelfTest.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.affinity;
+
+import java.util.Collection;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.affinity.fair.FairAffinityFunction;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.IgniteNodeAttributes;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+
+/**
+ *
+ */
+public class AffinityClientNodeSelfTest extends GridCommonAbstractTest {
+ /** */
+ protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private static final int NODE_CNT = 4;
+
+ /** */
+ private static final String CACHE1 = "cache1";
+
+ /** */
+ private static final String CACHE2 = "cache2";
+
+ /** */
+ private static final String CACHE3 = "cache3";
+
+ /** */
+ private static final String CACHE4 = "cache4";
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+ if (gridName.equals(getTestGridName(NODE_CNT - 1)))
+ cfg.setClientMode(true);
+
+ CacheConfiguration ccfg1 = new CacheConfiguration();
+
+ ccfg1.setBackups(1);
+ ccfg1.setName(CACHE1);
+ ccfg1.setAffinity(new RendezvousAffinityFunction());
+ ccfg1.setNodeFilter(new TestNodesFilter());
+
+ CacheConfiguration ccfg2 = new CacheConfiguration();
+
+ ccfg2.setBackups(1);
+ ccfg2.setName(CACHE2);
+ ccfg2.setAffinity(new RendezvousAffinityFunction());
+
+ CacheConfiguration ccfg3 = new CacheConfiguration();
+
+ ccfg3.setBackups(1);
+ ccfg3.setName(CACHE3);
+ ccfg3.setAffinity(new FairAffinityFunction());
+ ccfg3.setNodeFilter(new TestNodesFilter());
+
+ CacheConfiguration ccfg4 = new CacheConfiguration();
+
+ ccfg4.setCacheMode(REPLICATED);
+ ccfg4.setName(CACHE4);
+ ccfg4.setNodeFilter(new TestNodesFilter());
+
+ cfg.setCacheConfiguration(ccfg1, ccfg2, ccfg3, ccfg4);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ startGrids(NODE_CNT);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ stopAllGrids();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testClientNodeNotInAffinity() throws Exception {
+ checkCache(CACHE1, 2);
+
+ checkCache(CACHE2, 2);
+
+ checkCache(CACHE3, 2);
+
+ checkCache(CACHE4, 3);
+
+ Ignite client = ignite(NODE_CNT - 1);
+
+ CacheConfiguration ccfg = new CacheConfiguration();
+
+ ccfg.setBackups(0);
+
+ ccfg.setNodeFilter(new TestNodesFilter());
+
+ IgniteCache<Integer, Integer> cache = client.createCache(ccfg);
+
+ try {
+ checkCache(null, 1);
+ }
+ finally {
+ cache.destroy();
+ }
+
+ cache = client.createCache(ccfg, new NearCacheConfiguration());
+
+ try {
+ checkCache(null, 1);
+ }
+ finally {
+ cache.destroy();
+ }
+ }
+
+ /**
+ * @param cacheName Cache name.
+ * @param expNodes Expected number of nodes per partition.
+ */
+ private void checkCache(String cacheName, int expNodes) {
+ log.info("Test cache: " + cacheName);
+
+ Ignite client = ignite(NODE_CNT - 1);
+
+ assertTrue(client.configuration().isClientMode());
+
+ ClusterNode clientNode = client.cluster().localNode();
+
+ for (int i = 0; i < NODE_CNT; i++) {
+ Ignite ignite = ignite(i);
+
+ Affinity<Integer> aff = ignite.affinity(cacheName);
+
+ for (int part = 0; part < aff.partitions(); part++) {
+ Collection<ClusterNode> nodes = aff.mapPartitionToPrimaryAndBackups(part);
+
+ assertEquals(expNodes, nodes.size());
+
+ assertFalse(nodes.contains(clientNode));
+ }
+ }
+ }
+
+ /**
+ *
+ */
+ private static class TestNodesFilter implements IgnitePredicate<ClusterNode> {
+ /** {@inheritDoc} */
+ @Override public boolean apply(ClusterNode clusterNode) {
+ Boolean attr = clusterNode.attribute(IgniteNodeAttributes.ATTR_CLIENT_MODE);
+
+ assertNotNull(attr);
+
+ assertFalse(attr);
+
+ return true;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8f5a4c9d/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityFunctionBackupFilterAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityFunctionBackupFilterAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityFunctionBackupFilterAbstractSelfTest.java
new file mode 100644
index 0000000..3bf41c1
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityFunctionBackupFilterAbstractSelfTest.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.affinity;
+
+import java.util.Collection;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheRebalanceMode.SYNC;
+
+/**
+ * Base tests of {@link AffinityFunction} implementations with user provided backup filter.
+ */
+public abstract class AffinityFunctionBackupFilterAbstractSelfTest extends GridCommonAbstractTest {
+ /** Ip finder. */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** Backup count. */
+ private static final int BACKUPS = 1;
+
+ /** Split attribute name. */
+ private static final String SPLIT_ATTRIBUTE_NAME = "split-attribute";
+
+ /** Split attribute value. */
+ private String splitAttrVal;
+
+ /** Test backup filter. */
+ protected static final IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter =
+ new IgniteBiPredicate<ClusterNode, ClusterNode>() {
+ @Override public boolean apply(ClusterNode primary, ClusterNode backup) {
+ assert primary != null : "primary is null";
+ assert backup != null : "backup is null";
+
+ return !F.eq(primary.attribute(SPLIT_ATTRIBUTE_NAME), backup.attribute(SPLIT_ATTRIBUTE_NAME));
+ }
+ };
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ CacheConfiguration cacheCfg = defaultCacheConfiguration();
+
+ cacheCfg.setCacheMode(PARTITIONED);
+ cacheCfg.setBackups(BACKUPS);
+ cacheCfg.setAffinity(affinityFunction());
+ cacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+ cacheCfg.setRebalanceMode(SYNC);
+ cacheCfg.setAtomicityMode(TRANSACTIONAL);
+
+ TcpDiscoverySpi spi = new TcpDiscoverySpi();
+ spi.setIpFinder(IP_FINDER);
+
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.setCacheConfiguration(cacheCfg);
+ cfg.setDiscoverySpi(spi);
+ cfg.setUserAttributes(F.asMap(SPLIT_ATTRIBUTE_NAME, splitAttrVal));
+
+ return cfg;
+ }
+
+ /**
+ * @return Affinity function for test.
+ */
+ protected abstract AffinityFunction affinityFunction();
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPartitionDistribution() throws Exception {
+ try {
+ for (int i = 0; i < 3; i++) {
+ splitAttrVal = "A";
+
+ startGrid(2 * i);
+
+ splitAttrVal = "B";
+
+ startGrid(2 * i + 1);
+
+ awaitPartitionMapExchange();
+
+ checkPartitions();
+ }
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @SuppressWarnings("ConstantConditions")
+ private void checkPartitions() throws Exception {
+ AffinityFunction aff = cacheConfiguration(grid(0).configuration(), null).getAffinity();
+
+ int partCnt = aff.partitions();
+
+ IgniteCache<Object, Object> cache = grid(0).cache(null);
+
+ for (int i = 0; i < partCnt; i++) {
+ Collection<ClusterNode> nodes = affinity(cache).mapKeyToPrimaryAndBackups(i);
+
+ assertEquals(2, nodes.size());
+
+ ClusterNode primary = F.first(nodes);
+ ClusterNode backup = F.last(nodes);
+
+ assertFalse(F.eq(primary.attribute(SPLIT_ATTRIBUTE_NAME), backup.attribute(SPLIT_ATTRIBUTE_NAME)));
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8f5a4c9d/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityFunctionExcludeNeighborsAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityFunctionExcludeNeighborsAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityFunctionExcludeNeighborsAbstractSelfTest.java
new file mode 100644
index 0000000..10cb5a5
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityFunctionExcludeNeighborsAbstractSelfTest.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.affinity;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteNodeAttributes;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteProductVersion;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheRebalanceMode.NONE;
+
+/**
+ * Partitioned affinity test.
+ */
+@SuppressWarnings({"PointlessArithmeticExpression", "FieldCanBeLocal"})
+public abstract class AffinityFunctionExcludeNeighborsAbstractSelfTest extends GridCommonAbstractTest {
+ /** Number of backups. */
+ private int backups = 2;
+
+ /** Number of girds. */
+ private int gridInstanceNum;
+
+ /** Ip finder. */
+ private TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(final String gridName) throws Exception {
+ IgniteConfiguration c = super.getConfiguration(gridName);
+
+ // Override node attributes in discovery spi.
+ TcpDiscoverySpi spi = new TcpDiscoverySpi() {
+ @Override public void setNodeAttributes(Map<String, Object> attrs,
+ IgniteProductVersion ver) {
+ super.setNodeAttributes(attrs, ver);
+
+ // Set unique mac addresses for every group of three nodes.
+ String macAddrs = "MOCK_MACS_" + (gridInstanceNum / 3);
+
+ attrs.put(IgniteNodeAttributes.ATTR_MACS, macAddrs);
+
+ gridInstanceNum++;
+ }
+ };
+
+ spi.setIpFinder(ipFinder);
+
+ c.setDiscoverySpi(spi);
+
+ CacheConfiguration cc = defaultCacheConfiguration();
+
+ cc.setCacheMode(PARTITIONED);
+
+ cc.setBackups(backups);
+
+ cc.setAffinity(affinityFunction());
+
+ cc.setRebalanceMode(NONE);
+
+ c.setCacheConfiguration(cc);
+
+ return c;
+ }
+
+ /**
+ * @return Affinity function for test.
+ */
+ protected abstract AffinityFunction affinityFunction();
+
+ /**
+ * @param aff Affinity.
+ * @param key Key.
+ * @return Nodes.
+ */
+ private static Collection<? extends ClusterNode> nodes(Affinity<Object> aff, Object key) {
+ return aff.mapKeyToPrimaryAndBackups(key);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAffinityMultiNode() throws Exception {
+ int grids = 9;
+
+ startGrids(grids);
+
+ try {
+ Object key = 12345;
+
+ int copies = backups + 1;
+
+ for (int i = 0; i < grids; i++) {
+ final Ignite g = grid(i);
+
+ Affinity<Object> aff = g.affinity(null);
+
+ List<TcpDiscoveryNode> top = new ArrayList<>();
+
+ for (ClusterNode node : g.cluster().nodes())
+ top.add((TcpDiscoveryNode) node);
+
+ Collections.sort(top);
+
+ assertEquals(grids, top.size());
+
+ int idx = 1;
+
+ for (ClusterNode n : top) {
+ assertEquals(idx, n.order());
+
+ idx++;
+ }
+
+ Collection<? extends ClusterNode> affNodes = nodes(aff, key);
+
+ info("Affinity picture for grid [i=" + i + ", aff=" + U.toShortString(affNodes));
+
+ assertEquals(copies, affNodes.size());
+
+ Set<String> macs = new HashSet<>();
+
+ for (ClusterNode node : affNodes)
+ macs.add((String)node.attribute(IgniteNodeAttributes.ATTR_MACS));
+
+ assertEquals(copies, macs.size());
+ }
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAffinitySingleNode() throws Exception {
+ Ignite g = startGrid();
+
+ try {
+ Object key = 12345;
+
+ Collection<? extends ClusterNode> affNodes = nodes(g.affinity(null), key);
+
+ info("Affinity picture for grid: " + U.toShortString(affNodes));
+
+ assertEquals(1, affNodes.size());
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
+}
\ No newline at end of file