You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by al...@apache.org on 2021/02/20 06:52:03 UTC
[ignite] branch master updated: IGNITE-13560 Add node attribute
colocated affinity backup filter - Fixes #8668.
This is an automated email from the ASF dual-hosted git repository.
alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 1a64ddc IGNITE-13560 Add node attribute colocated affinity backup filter - Fixes #8668.
1a64ddc is described below
commit 1a64ddc25e9c60b835312467b760bfaf1dd541cd
Author: Aleksey Plekhanov <pl...@gmail.com>
AuthorDate: Sat Feb 20 09:45:32 2021 +0300
IGNITE-13560 Add node attribute colocated affinity backup filter - Fixes #8668.
Signed-off-by: Aleksey Plekhanov <pl...@gmail.com>
---
.../ClusterNodeAttributeColocatedBackupFilter.java | 125 +++++++++++
.../rendezvous/RendezvousAffinityFunction.java | 35 ++-
.../cache/GridCachePartitionExchangeManager.java | 2 +-
...finityFunctionBackupFilterAbstractSelfTest.java | 24 +-
...tyFunctionExcludeNeighborsAbstractSelfTest.java | 2 +-
...NodeAttributeColocatedBackupFilterSelfTest.java | 250 +++++++++++++++++++++
...usAffinityFunctionExcludeNeighborsSelfTest.java | 64 +++++-
.../GridOffHeapPartitionedMapAbstractSelfTest.java | 2 -
.../testsuites/IgniteCacheMvccTestSuite2.java | 2 +
.../ignite/testsuites/IgniteCacheTestSuite2.java | 2 +
10 files changed, 486 insertions(+), 22 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/ClusterNodeAttributeColocatedBackupFilter.java b/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/ClusterNodeAttributeColocatedBackupFilter.java
new file mode 100644
index 0000000..6f70610
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/ClusterNodeAttributeColocatedBackupFilter.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.affinity.rendezvous;
+
+import java.util.List;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.lang.IgniteBiPredicate;
+
+/**
+ * This class can be used as a {@link RendezvousAffinityFunction#affinityBackupFilter } to create
+ * cache templates in Spring that force each partition's primary and backup to be co-located on nodes with the same
+ * attribute value.
+ * <p>
+ *
+ * Partition copies co-location can be helpful to group nodes into cells when fixed baseline topology is used. If all
+ * copies of each partition are located inside only one cell, in case of {@code backup + 1} nodes leave the cluster
+ * there will be data lost only if all leaving nodes belong to the same cell. Without partition copies co-location
+ * within a cell, most probably there will be data lost if any {@code backup + 1} nodes leave the cluster.
+ *
+ * Note: Baseline topology change can lead to inter-cell partitions migration, i.e. rebalance can affect all copies
+ * of some partitions even if only one node is changed in the baseline topology.
+ * <p>
+ *
+ * This implementation will discard backups rather than place copies on nodes with different attribute values. This
+ * avoids trying to cram more data onto remaining nodes when some have failed.
+ * <p>
+ * A node attribute to compare is provided on construction. Attribute name shouldn't be null.
+ *
+ * Note: All cluster nodes, on startup, automatically register all the environment and system properties as node
+ * attributes.
+ *
+ * Note: All nodes should have not an empty co-location attribute value. The absence of the attribute on some nodes
+ * will trigger the failure handler.
+ *
+ * Note: Node attributes persisted in baseline topology at the time of baseline topology change. If the co-location
+ * attribute of some node was updated, but the baseline topology wasn't changed, the outdated attribute value can be
+ * used by the backup filter when this node left the cluster. To avoid this, the baseline topology should be updated
+ * after changing the co-location attribute.
+ * <p>
+ * This class is constructed with a node attribute name, and a candidate node will be rejected if previously selected
+ * nodes for a partition have a different value for attribute on the candidate node.
+ * </pre>
+ * <h2 class="header">Spring Example</h2>
+ * Create a partitioned cache template plate with 1 backup, where the backup will be placed in the same cell
+ * as the primary. Note: This example requires that the environment variable "CELL" be set appropriately on
+ * each node via some means external to Ignite.
+ * <pre name="code" class="xml">
+ * <property name="cacheConfiguration">
+ * <list>
+ * <bean id="cache-template-bean" abstract="true" class="org.apache.ignite.configuration.CacheConfiguration">
+ * <property name="name" value="JobcaseDefaultCacheConfig*"/>
+ * <property name="cacheMode" value="PARTITIONED" />
+ * <property name="backups" value="1" />
+ * <property name="affinity">
+ * <bean class="org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction">
+ * <property name="affinityBackupFilter">
+ * <bean class="org.apache.ignite.cache.affinity.rendezvous.ClusterNodeAttributeColocatedBackupFilter">
+ * <!-- Backups must go to the same CELL as primary -->
+ * <constructor-arg value="CELL" />
+ * </bean>
+ * </property>
+ * </bean>
+ * </property>
+ * </bean>
+ * </list>
+ * </property>
+ * </pre>
+ * <p>
+ */
+public class ClusterNodeAttributeColocatedBackupFilter implements IgniteBiPredicate<ClusterNode, List<ClusterNode>> {
+ /** */
+ private static final long serialVersionUID = 1L;
+
+ /** Attribute name. */
+ private final String attrName;
+
+ /**
+ * @param attrName The attribute name for the attribute to compare.
+ */
+ public ClusterNodeAttributeColocatedBackupFilter(String attrName) {
+ A.notNullOrEmpty(attrName, "attrName");
+
+ this.attrName = attrName;
+ }
+
+ /**
+ * Defines a predicate which returns {@code true} if a node is acceptable for a backup
+ * or {@code false} otherwise. An acceptable node is one where its attribute value
+ * is exact match with previously selected nodes. If an attribute does not
+ * exist on candidate node, then the attribute matches any attribute values of previously
+ * selected nodes. If the attribute does not exist on primary node, then the attribute matches
+ * any attribute value of candidate node.
+ *
+ * @param candidate A node that is a candidate for becoming a backup node for a partition.
+ * @param previouslySelected A list of primary/backup nodes already chosen for a partition.
+ * The primary is first.
+ */
+ @Override public boolean apply(ClusterNode candidate, List<ClusterNode> previouslySelected) {
+ A.notEmpty(previouslySelected, "previouslySelected");
+
+ String primaryAttrVal = previouslySelected.get(0).attribute(attrName);
+ String candidateAttrVal = candidate.attribute(attrName);
+
+ if (primaryAttrVal == null || candidateAttrVal == null)
+ throw new IllegalStateException("Empty co-location attribute value");
+
+ return primaryAttrVal.equals(candidateAttrVal);
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java b/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java
index d5ace1d..4cb2cdd 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java
@@ -33,6 +33,9 @@ import org.apache.ignite.cache.affinity.AffinityFunction;
import org.apache.ignite.cache.affinity.AffinityFunctionContext;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.failure.FailureContext;
+import org.apache.ignite.failure.FailureType;
+import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.processors.cache.GridCacheUtils;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.A;
@@ -41,6 +44,7 @@ import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.resources.LoggerResource;
import org.jetbrains.annotations.Nullable;
@@ -100,6 +104,10 @@ public class RendezvousAffinityFunction implements AffinityFunction, Serializabl
@LoggerResource
private transient IgniteLogger log;
+ /** Ignite instance. */
+ @IgniteInstanceResource
+ private transient IgniteEx ignite;
+
/**
* Helper method to calculates mask.
*
@@ -386,20 +394,23 @@ public class RendezvousAffinityFunction implements AffinityFunction, Serializabl
while (it.hasNext() && res.size() < primaryAndBackups) {
ClusterNode node = it.next();
- if (exclNeighbors) {
- if (!allNeighbors.contains(node)) {
- res.add(node);
-
- allNeighbors.addAll(neighborhoodCache.get(node.id()));
+ try {
+ if ((backupFilter != null && backupFilter.apply(primary, node))
+ || (affinityBackupFilter != null && affinityBackupFilter.apply(node, res))
+ || (affinityBackupFilter == null && backupFilter == null)) {
+ if (exclNeighbors) {
+ if (!allNeighbors.contains(node)) {
+ res.add(node);
+
+ allNeighbors.addAll(neighborhoodCache.get(node.id()));
+ }
+ }
+ else
+ res.add(node);
}
}
- else if ((backupFilter != null && backupFilter.apply(primary, node))
- || (affinityBackupFilter != null && affinityBackupFilter.apply(node, res))
- || (affinityBackupFilter == null && backupFilter == null) ) {
- res.add(node);
-
- if (exclNeighbors)
- allNeighbors.addAll(neighborhoodCache.get(node.id()));
+ catch (Exception ex) {
+ ignite.context().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, ex));
}
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 126a27e..6659c08 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -1385,7 +1385,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
// No need to send to nodes which did not finish their first exchange.
AffinityTopologyVersion rmtTopVer =
lastFut != null ?
- (lastFut.isDone() ? lastFut.topologyVersion() : lastFut.initialVersion())
+ (lastFut.isDone() && lastFut.error() == null ? lastFut.topologyVersion() : lastFut.initialVersion())
: AffinityTopologyVersion.NONE;
Collection<ClusterNode> rmts = cctx.discovery().remoteAliveNodesWithCaches(rmtTopVer);
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityFunctionBackupFilterAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityFunctionBackupFilterAbstractSelfTest.java
index 78b6c46..2301a62 100644
--- a/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityFunctionBackupFilterAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityFunctionBackupFilterAbstractSelfTest.java
@@ -27,6 +27,7 @@ import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
@@ -42,7 +43,7 @@ import static org.apache.ignite.cache.CacheRebalanceMode.SYNC;
*/
public abstract class AffinityFunctionBackupFilterAbstractSelfTest extends GridCommonAbstractTest {
/** Split attribute name. */
- private static final String SPLIT_ATTRIBUTE_NAME = "split-attribute";
+ protected static final String SPLIT_ATTRIBUTE_NAME = "split-attribute";
/** Split attribute value. */
private String splitAttrVal;
@@ -51,7 +52,7 @@ public abstract class AffinityFunctionBackupFilterAbstractSelfTest extends GridC
public static final String FIRST_NODE_GROUP = "A";
/** Backup count. */
- private int backups = 1;
+ protected int backups = 1;
/** Test backup filter. */
protected static final IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter =
@@ -87,7 +88,7 @@ public abstract class AffinityFunctionBackupFilterAbstractSelfTest extends GridC
* @param nodes List of cluster nodes.
* @return Statistic.
*/
- @NotNull private static Map<String, Integer> getAttributeStatistic(Collection<ClusterNode> nodes) {
+ @NotNull protected static Map<String, Integer> getAttributeStatistic(Collection<ClusterNode> nodes) {
Map<String, Integer> backupAssignedAttribute = new HashMap<>();
backupAssignedAttribute.put(FIRST_NODE_GROUP, 0);
@@ -130,6 +131,7 @@ public abstract class AffinityFunctionBackupFilterAbstractSelfTest extends GridC
cfg.setCacheConfiguration(cacheCfg);
cfg.setUserAttributes(F.asMap(SPLIT_ATTRIBUTE_NAME, splitAttrVal));
+ cfg.setConsistentId(igniteInstanceName);
return cfg;
}
@@ -175,7 +177,7 @@ public abstract class AffinityFunctionBackupFilterAbstractSelfTest extends GridC
* @throws Exception If failed.
*/
@SuppressWarnings("ConstantConditions")
- private void checkPartitions() throws Exception {
+ protected void checkPartitions() throws Exception {
AffinityFunction aff = cacheConfiguration(grid(0).configuration(), DEFAULT_CACHE_NAME).getAffinity();
int partCnt = aff.partitions();
@@ -227,6 +229,18 @@ public abstract class AffinityFunctionBackupFilterAbstractSelfTest extends GridC
}
}
+ /**
+ * Start grid with split attribute value.
+ *
+ * @param gridIdx Grid index.
+ * @param splitAttrVal Split attribute value.
+ */
+ protected IgniteEx startGrid(int gridIdx, String splitAttrVal) throws Exception {
+ this.splitAttrVal = splitAttrVal;
+
+ return startGrid(gridIdx);
+ }
+
/* Different affinityBackupFilters have different goals */
protected int expectedNodesForEachPartition() {
return backups + 1;
@@ -235,7 +249,7 @@ public abstract class AffinityFunctionBackupFilterAbstractSelfTest extends GridC
/**
* @throws Exception If failed.
*/
- private void checkPartitionsWithAffinityBackupFilter() throws Exception {
+ protected void checkPartitionsWithAffinityBackupFilter() throws Exception {
AffinityFunction aff = cacheConfiguration(grid(0).configuration(), DEFAULT_CACHE_NAME).getAffinity();
int partCnt = aff.partitions();
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityFunctionExcludeNeighborsAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityFunctionExcludeNeighborsAbstractSelfTest.java
index 70ad722..6669935 100644
--- a/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityFunctionExcludeNeighborsAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityFunctionExcludeNeighborsAbstractSelfTest.java
@@ -43,7 +43,7 @@ import static org.apache.ignite.cache.CacheRebalanceMode.NONE;
*/
public abstract class AffinityFunctionExcludeNeighborsAbstractSelfTest extends GridCommonAbstractTest {
/** Number of backups. */
- private int backups = 2;
+ protected int backups = 2;
/** Number of girds. */
private int gridInstanceNum;
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/ClusterNodeAttributeColocatedBackupFilterSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/ClusterNodeAttributeColocatedBackupFilterSelfTest.java
new file mode 100644
index 0000000..36831e0
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/ClusterNodeAttributeColocatedBackupFilterSelfTest.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.affinity.rendezvous;
+
+import java.util.Collection;
+import java.util.Map;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.affinity.AffinityFunction;
+import org.apache.ignite.cache.affinity.AffinityFunctionBackupFilterAbstractSelfTest;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Test;
+
+import static org.junit.Assert.assertNotEquals;
+
+/**
+ * Tests of {@link AffinityFunction} implementations with {@link ClusterNodeAttributeColocatedBackupFilter}.
+ */
+public class ClusterNodeAttributeColocatedBackupFilterSelfTest extends AffinityFunctionBackupFilterAbstractSelfTest {
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ return super.getConfiguration(igniteInstanceName).setFailureHandler((i, f) -> true);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected AffinityFunction affinityFunction() {
+ return affinityFunctionWithAffinityBackupFilter(SPLIT_ATTRIBUTE_NAME);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected AffinityFunction affinityFunctionWithAffinityBackupFilter(String attrName) {
+ RendezvousAffinityFunction aff = new RendezvousAffinityFunction(false);
+
+ aff.setAffinityBackupFilter(new ClusterNodeAttributeColocatedBackupFilter(attrName));
+
+ return aff;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void checkPartitionsWithAffinityBackupFilter() {
+ AffinityFunction aff = cacheConfiguration(grid(0).configuration(), DEFAULT_CACHE_NAME).getAffinity();
+
+ int partCnt = aff.partitions();
+
+ int iter = grid(0).cluster().nodes().size() / 4;
+
+ IgniteCache<Object, Object> cache = grid(0).cache(DEFAULT_CACHE_NAME);
+
+ for (int i = 0; i < partCnt; i++) {
+ Collection<ClusterNode> nodes = affinity(cache).mapKeyToPrimaryAndBackups(i);
+
+ Map<String, Integer> stat = getAttributeStatistic(nodes);
+
+ if (stat.get(FIRST_NODE_GROUP) > 0) {
+ assertEquals((Integer)Math.min(backups + 1, iter * 2), stat.get(FIRST_NODE_GROUP));
+ assertEquals((Integer)0, stat.get("B"));
+ assertEquals((Integer)0, stat.get("C"));
+ }
+ else if (stat.get("B") > 0) {
+ assertEquals((Integer)0, stat.get(FIRST_NODE_GROUP));
+ assertEquals((Integer)iter, stat.get("B"));
+ assertEquals((Integer)0, stat.get("C"));
+ }
+ else if (stat.get("C") > 0) {
+ assertEquals((Integer)0, stat.get(FIRST_NODE_GROUP));
+ assertEquals((Integer)0, stat.get("B"));
+ assertEquals((Integer)iter, stat.get("C"));
+ }
+ else
+ fail("Unexpected partition assignment");
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void checkPartitions() throws Exception {
+ int iter = grid(0).cluster().nodes().size() / 2;
+
+ AffinityFunction aff = cacheConfiguration(grid(0).configuration(), DEFAULT_CACHE_NAME).getAffinity();
+
+ Map<Integer, String> partToAttr = partToAttribute(grid(0).cache(DEFAULT_CACHE_NAME), aff.partitions());
+
+ assertTrue(F.exist(partToAttr.values(), "A"::equals));
+ assertTrue(F.exist(partToAttr.values(), "B"::equals));
+ assertFalse(F.exist(partToAttr.values(), v -> !"A".equals(v) && !"B".equals(v)));
+ }
+
+ /** {@inheritDoc} */
+ @Override public void testPartitionDistributionWithAffinityBackupFilter() throws Exception {
+ backups = 2;
+
+ super.testPartitionDistributionWithAffinityBackupFilter();
+ }
+
+ /** */
+ @Test
+ public void testBackupFilterWithBaseline() throws Exception {
+ backups = 1;
+
+ try {
+ startGrid(0, "A");
+ startGrid(1, "B");
+ startGrid(2, "C");
+
+ startGrid(3, "A");
+ startGrid(4, "B");
+ startGrid(5, "C");
+
+ awaitPartitionMapExchange();
+
+ AffinityFunction aff = cacheConfiguration(grid(0).configuration(), DEFAULT_CACHE_NAME).getAffinity();
+
+ Map<Integer, String> partToAttr = partToAttribute(grid(0).cache(DEFAULT_CACHE_NAME), aff.partitions());
+
+ grid(0).cluster().baselineAutoAdjustEnabled(false);
+
+ // Check that we have the same distribution if some BLT nodes are offline.
+ stopGrid(3);
+ stopGrid(4);
+ stopGrid(5);
+
+ awaitPartitionMapExchange();
+
+ assertEquals(partToAttr, partToAttribute(grid(0).cache(DEFAULT_CACHE_NAME), aff.partitions()));
+
+ // Check that not BLT nodes do not affect distribution.
+ startGrid(6, "D");
+
+ awaitPartitionMapExchange();
+
+ assertEquals(partToAttr, partToAttribute(grid(0).cache(DEFAULT_CACHE_NAME), aff.partitions()));
+
+ // Check that distribution is recalculated after BLT change.
+ long topVer = grid(0).cluster().topologyVersion();
+
+ grid(0).cluster().setBaselineTopology(topVer);
+
+ // Wait for rebalance and assignment change to ideal assignment.
+ assertTrue(GridTestUtils.waitForCondition(() -> F.eq(grid(0).context().discovery().topologyVersionEx(),
+ new AffinityTopologyVersion(topVer, 2)), 5_000L));
+
+ assertNotEquals(partToAttr, partToAttribute(grid(0).cache(DEFAULT_CACHE_NAME), aff.partitions()));
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
+
+ /** */
+ @Test
+ public void testBackupFilterNullAttributeBltChange() throws Exception {
+ backups = 1;
+
+ try {
+ startGrid(0, "A");
+ startGrid(1, "A");
+
+ awaitPartitionMapExchange();
+
+ grid(0).cluster().baselineAutoAdjustEnabled(false);
+
+ // Join of non-BLT node with the empty attribute should not trigger failure handler.
+ startGrid(2, (String)null);
+
+ assertNull(grid(0).context().failure().failureContext());
+ assertNull(grid(1).context().failure().failureContext());
+ assertNull(grid(2).context().failure().failureContext());
+
+ // Include node with the empty attribute to the BLT should trigger failure handler on all nodes.
+ resetBaselineTopology();
+
+ assertNotNull(grid(0).context().failure().failureContext());
+ assertNotNull(grid(1).context().failure().failureContext());
+ assertNotNull(grid(2).context().failure().failureContext());
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
+
+ /** */
+ @Test
+ public void testBackupFilterNullAttributeBltNodeJoin() throws Exception {
+ backups = 1;
+
+ try {
+ startGrid(0, "A");
+ startGrid(1, "A");
+
+ awaitPartitionMapExchange();
+
+ grid(0).cluster().baselineAutoAdjustEnabled(false);
+
+ stopGrid(1);
+
+ awaitPartitionMapExchange();
+
+ // Join of BLT node with the empty attribute should trigger failure handler on this node.
+ startGrid(1, (String)null);
+
+ assertNull(grid(0).context().failure().failureContext());
+ assertNotNull(grid(1).context().failure().failureContext());
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
+
+ /**
+ * Determine split attribute value for each partition and check that this value is the same for all nodes for
+ * this partition.
+ */
+ private Map<Integer, String> partToAttribute(IgniteCache<Object, Object> cache, int partCnt) {
+ Map<Integer, String> partToAttr = U.newHashMap(partCnt);
+
+ for (int i = 0; i < partCnt; i++) {
+ Collection<ClusterNode> nodes = affinity(cache).mapPartitionToPrimaryAndBackups(i);
+
+ assertFalse(F.isEmpty(nodes));
+ assertFalse(F.size(nodes) > backups + 1);
+
+ String attrVal = F.first(nodes).attribute(SPLIT_ATTRIBUTE_NAME);
+
+ partToAttr.put(i, attrVal);
+
+ for (ClusterNode node : nodes)
+ assertEquals(attrVal, node.attribute(SPLIT_ATTRIBUTE_NAME));
+ }
+
+ return partToAttr;
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunctionExcludeNeighborsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunctionExcludeNeighborsSelfTest.java
index 2c6c206..4508db8 100644
--- a/modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunctionExcludeNeighborsSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunctionExcludeNeighborsSelfTest.java
@@ -17,16 +17,78 @@
package org.apache.ignite.cache.affinity.rendezvous;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.ignite.cache.affinity.Affinity;
import org.apache.ignite.cache.affinity.AffinityFunction;
import org.apache.ignite.cache.affinity.AffinityFunctionExcludeNeighborsAbstractSelfTest;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.IgniteNodeAttributes;
+import org.junit.Test;
/**
* Tests exclude neighbors flag for rendezvous affinity function.
*/
public class RendezvousAffinityFunctionExcludeNeighborsSelfTest extends
AffinityFunctionExcludeNeighborsAbstractSelfTest {
+ /** With backup filter flag. */
+ private boolean withBackupFilter;
+
/** {@inheritDoc} */
@Override protected AffinityFunction affinityFunction() {
- return new RendezvousAffinityFunction(true);
+ if (withBackupFilter) {
+ return new RendezvousAffinityFunction(true)
+ .setAffinityBackupFilter((c, l) -> backupFilter(l.get(0)) == backupFilter(c));
+ }
+ else
+ return new RendezvousAffinityFunction(true);
+ }
+
+ /** Checks that exclude neighbors flag and backup filter works together. */
+ @Test
+ public void testAffinityWithBackupFilter() throws Exception {
+ int grids = 9;
+ withBackupFilter = true;
+ int copies = backups + 1;
+
+ try {
+ startGrids(grids);
+
+ awaitPartitionMapExchange();
+
+ Affinity<Integer> aff = grid(0).affinity(DEFAULT_CACHE_NAME);
+
+ for (int i = 0; i < aff.partitions(); i++) {
+ Collection<ClusterNode> affNodes = aff.mapKeyToPrimaryAndBackups(i);
+
+ assertEquals(copies, affNodes.size());
+
+ Set<String> macs = new HashSet<>();
+
+ long backupFilterKey = -1L;
+
+ for (ClusterNode node : affNodes) {
+ macs.add(node.attribute(IgniteNodeAttributes.ATTR_MACS));
+
+ if (backupFilterKey < 0)
+ backupFilterKey = backupFilter(node);
+ else
+ assertEquals(backupFilterKey, backupFilter(node));
+ }
+
+ assertEquals(copies, macs.size());
+ }
+ }
+ finally {
+ withBackupFilter = false;
+
+ stopAllGrids();
+ }
+ }
+
+ /** Value for filtering nodes by backup filter. */
+ private static long backupFilter(ClusterNode node) {
+ return node.order() % 3;
}
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/offheap/GridOffHeapPartitionedMapAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/offheap/GridOffHeapPartitionedMapAbstractSelfTest.java
index c585528..ae3a0c9 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/offheap/GridOffHeapPartitionedMapAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/offheap/GridOffHeapPartitionedMapAbstractSelfTest.java
@@ -255,8 +255,6 @@ public abstract class GridOffHeapPartitionedMapAbstractSelfTest extends GridComm
AffinityFunction aff = new RendezvousAffinityFunction(parts, null);
- getTestResources().inject(aff);
-
GridByteArrayWrapper[] keys = new GridByteArrayWrapper[512];
Random rnd = new Random();
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite2.java
index b0e7df4..b9768c2 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite2.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite2.java
@@ -22,6 +22,7 @@ import java.util.HashSet;
import java.util.List;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cache.affinity.rendezvous.ClusterNodeAttributeAffinityBackupFilterSelfTest;
+import org.apache.ignite.cache.affinity.rendezvous.ClusterNodeAttributeColocatedBackupFilterSelfTest;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunctionBackupFilterSelfTest;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunctionExcludeNeighborsSelfTest;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunctionFastPowerOfTwoHashSelfTest;
@@ -168,6 +169,7 @@ public class IgniteCacheMvccTestSuite2 {
ignoredTests.add(GridCachePartitionedProjectionAffinitySelfTest.class);
ignoredTests.add(RendezvousAffinityFunctionBackupFilterSelfTest.class);
ignoredTests.add(ClusterNodeAttributeAffinityBackupFilterSelfTest.class);
+ ignoredTests.add(ClusterNodeAttributeColocatedBackupFilterSelfTest.class);
ignoredTests.add(NonAffinityCoordinatorDynamicStartStopTest.class);
ignoredTests.add(NoneRebalanceModeSelfTest.class);
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
index bdb174a..87ef591 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.apache.ignite.cache.affinity.rendezvous.ClusterNodeAttributeAffinityBackupFilterSelfTest;
+import org.apache.ignite.cache.affinity.rendezvous.ClusterNodeAttributeColocatedBackupFilterSelfTest;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunctionBackupFilterSelfTest;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunctionExcludeNeighborsSelfTest;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunctionFastPowerOfTwoHashSelfTest;
@@ -378,6 +379,7 @@ public class IgniteCacheTestSuite2 {
GridTestUtils.addTestIfNeeded(suite, RendezvousAffinityFunctionBackupFilterSelfTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, ClusterNodeAttributeAffinityBackupFilterSelfTest.class, ignoredTests);
+ GridTestUtils.addTestIfNeeded(suite, ClusterNodeAttributeColocatedBackupFilterSelfTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, CachePartitionStateTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, CacheComparatorTest.class, ignoredTests);