You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by al...@apache.org on 2021/02/20 06:52:03 UTC

[ignite] branch master updated: IGNITE-13560 Add node attribute colocated affinity backup filter - Fixes #8668.

This is an automated email from the ASF dual-hosted git repository.

alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 1a64ddc  IGNITE-13560 Add node attribute colocated affinity backup filter - Fixes #8668.
1a64ddc is described below

commit 1a64ddc25e9c60b835312467b760bfaf1dd541cd
Author: Aleksey Plekhanov <pl...@gmail.com>
AuthorDate: Sat Feb 20 09:45:32 2021 +0300

    IGNITE-13560 Add node attribute colocated affinity backup filter - Fixes #8668.
    
    Signed-off-by: Aleksey Plekhanov <pl...@gmail.com>
---
 .../ClusterNodeAttributeColocatedBackupFilter.java | 125 +++++++++++
 .../rendezvous/RendezvousAffinityFunction.java     |  35 ++-
 .../cache/GridCachePartitionExchangeManager.java   |   2 +-
 ...finityFunctionBackupFilterAbstractSelfTest.java |  24 +-
 ...tyFunctionExcludeNeighborsAbstractSelfTest.java |   2 +-
 ...NodeAttributeColocatedBackupFilterSelfTest.java | 250 +++++++++++++++++++++
 ...usAffinityFunctionExcludeNeighborsSelfTest.java |  64 +++++-
 .../GridOffHeapPartitionedMapAbstractSelfTest.java |   2 -
 .../testsuites/IgniteCacheMvccTestSuite2.java      |   2 +
 .../ignite/testsuites/IgniteCacheTestSuite2.java   |   2 +
 10 files changed, 486 insertions(+), 22 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/ClusterNodeAttributeColocatedBackupFilter.java b/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/ClusterNodeAttributeColocatedBackupFilter.java
new file mode 100644
index 0000000..6f70610
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/ClusterNodeAttributeColocatedBackupFilter.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.affinity.rendezvous;
+
+import java.util.List;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.lang.IgniteBiPredicate;
+
+/**
+ * This class can be used as a {@link RendezvousAffinityFunction#affinityBackupFilter } to create
+ * cache templates in Spring that force each partition's primary and backup to be co-located on nodes with the same
+ * attribute value.
+ * <p>
+ *
+ * Partition copies co-location can be helpful to group nodes into cells when fixed baseline topology is used. If all
+ * copies of each partition are located inside only one cell, in case of {@code backup + 1} nodes leave the cluster
+ * there will be data lost only if all leaving nodes belong to the same cell. Without partition copies co-location
+ * within a cell, most probably there will be data lost if any {@code backup + 1} nodes leave the cluster.
+ *
+ * Note: Baseline topology change can lead to inter-cell partitions migration, i.e. rebalance can affect all copies
+ * of some partitions even if only one node is changed in the baseline topology.
+ * <p>
+ *
+ * This implementation will discard backups rather than place copies on nodes with different attribute values. This
+ * avoids trying to cram more data onto remaining nodes when some have failed.
+ * <p>
+ * A node attribute to compare is provided on construction. Attribute name shouldn't be null.
+ *
+ * Note: All cluster nodes, on startup, automatically register all the environment and system properties as node
+ * attributes.
+ *
+ * Note: All nodes should have not an empty co-location attribute value. The absence of the attribute on some nodes
+ * will trigger the failure handler.
+ *
+ * Note: Node attributes persisted in baseline topology at the time of baseline topology change. If the co-location
+ * attribute of some node was updated, but the baseline topology wasn't changed, the outdated attribute value can be
+ * used by the backup filter when this node left the cluster. To avoid this, the baseline topology should be updated
+ * after changing the co-location attribute.
+ * <p>
+ * This class is constructed with a node attribute name, and a candidate node will be rejected if previously selected
+ * nodes for a partition have a different value for attribute on the candidate node.
+ * </pre>
+ * <h2 class="header">Spring Example</h2>
+ * Create a partitioned cache template plate with 1 backup, where the backup will be placed in the same cell
+ * as the primary.   Note: This example requires that the environment variable "CELL" be set appropriately on
+ * each node via some means external to Ignite.
+ * <pre name="code" class="xml">
+ * &lt;property name="cacheConfiguration"&gt;
+ *     &lt;list&gt;
+ *         &lt;bean id="cache-template-bean" abstract="true" class="org.apache.ignite.configuration.CacheConfiguration"&gt;
+ *             &lt;property name="name" value="JobcaseDefaultCacheConfig*"/&gt;
+ *             &lt;property name="cacheMode" value="PARTITIONED" /&gt;
+ *             &lt;property name="backups" value="1" /&gt;
+ *             &lt;property name="affinity"&gt;
+ *                 &lt;bean class="org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction"&gt;
+ *                     &lt;property name="affinityBackupFilter"&gt;
+ *                         &lt;bean class="org.apache.ignite.cache.affinity.rendezvous.ClusterNodeAttributeColocatedBackupFilter"&gt;
+ *                             &lt;!-- Backups must go to the same CELL as primary --&gt;
+ *                             &lt;constructor-arg value="CELL" /&gt;
+ *                         &lt;/bean&gt;
+ *                     &lt;/property&gt;
+ *                 &lt;/bean&gt;
+ *             &lt;/property&gt;
+ *         &lt;/bean&gt;
+ *     &lt;/list&gt;
+ * &lt;/property&gt;
+ * </pre>
+ * <p>
+ */
+public class ClusterNodeAttributeColocatedBackupFilter implements IgniteBiPredicate<ClusterNode, List<ClusterNode>> {
+    /** */
+    private static final long serialVersionUID = 1L;
+
+    /** Attribute name. */
+    private final String attrName;
+
+    /**
+     * @param attrName The attribute name for the attribute to compare.
+     */
+    public ClusterNodeAttributeColocatedBackupFilter(String attrName) {
+        A.notNullOrEmpty(attrName, "attrName");
+
+        this.attrName = attrName;
+    }
+
+    /**
+     * Defines a predicate which returns {@code true} if a node is acceptable for a backup
+     * or {@code false} otherwise. An acceptable node is one where its attribute value
+     * is exact match with previously selected nodes. If an attribute does not
+     * exist on candidate node, then the attribute matches any attribute values of previously
+     * selected nodes. If the attribute does not exist on primary node, then the attribute matches
+     * any attribute value of candidate node.
+     *
+     * @param candidate          A node that is a candidate for becoming a backup node for a partition.
+     * @param previouslySelected A list of primary/backup nodes already chosen for a partition.
+     *                           The primary is first.
+     */
+    @Override public boolean apply(ClusterNode candidate, List<ClusterNode> previouslySelected) {
+        A.notEmpty(previouslySelected, "previouslySelected");
+
+        String primaryAttrVal = previouslySelected.get(0).attribute(attrName);
+        String candidateAttrVal = candidate.attribute(attrName);
+
+        if (primaryAttrVal == null || candidateAttrVal == null)
+            throw new IllegalStateException("Empty co-location attribute value");
+
+        return primaryAttrVal.equals(candidateAttrVal);
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java b/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java
index d5ace1d..4cb2cdd 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java
@@ -33,6 +33,9 @@ import org.apache.ignite.cache.affinity.AffinityFunction;
 import org.apache.ignite.cache.affinity.AffinityFunctionContext;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.failure.FailureContext;
+import org.apache.ignite.failure.FailureType;
+import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.processors.cache.GridCacheUtils;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.A;
@@ -41,6 +44,7 @@ import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.resources.IgniteInstanceResource;
 import org.apache.ignite.resources.LoggerResource;
 import org.jetbrains.annotations.Nullable;
 
@@ -100,6 +104,10 @@ public class RendezvousAffinityFunction implements AffinityFunction, Serializabl
     @LoggerResource
     private transient IgniteLogger log;
 
+    /** Ignite instance. */
+    @IgniteInstanceResource
+    private transient IgniteEx ignite;
+
     /**
      * Helper method to calculates mask.
      *
@@ -386,20 +394,23 @@ public class RendezvousAffinityFunction implements AffinityFunction, Serializabl
             while (it.hasNext() && res.size() < primaryAndBackups) {
                 ClusterNode node = it.next();
 
-                if (exclNeighbors) {
-                    if (!allNeighbors.contains(node)) {
-                        res.add(node);
-
-                        allNeighbors.addAll(neighborhoodCache.get(node.id()));
+                try {
+                    if ((backupFilter != null && backupFilter.apply(primary, node))
+                            || (affinityBackupFilter != null && affinityBackupFilter.apply(node, res))
+                            || (affinityBackupFilter == null && backupFilter == null)) {
+                        if (exclNeighbors) {
+                            if (!allNeighbors.contains(node)) {
+                                res.add(node);
+
+                                allNeighbors.addAll(neighborhoodCache.get(node.id()));
+                            }
+                        }
+                        else
+                            res.add(node);
                     }
                 }
-                else if ((backupFilter != null && backupFilter.apply(primary, node))
-                    || (affinityBackupFilter != null && affinityBackupFilter.apply(node, res))
-                    || (affinityBackupFilter == null && backupFilter == null) ) {
-                    res.add(node);
-
-                    if (exclNeighbors)
-                        allNeighbors.addAll(neighborhoodCache.get(node.id()));
+                catch (Exception ex) {
+                    ignite.context().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, ex));
                 }
             }
         }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 126a27e..6659c08 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -1385,7 +1385,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
             // No need to send to nodes which did not finish their first exchange.
             AffinityTopologyVersion rmtTopVer =
                 lastFut != null ?
-                    (lastFut.isDone() ? lastFut.topologyVersion() : lastFut.initialVersion())
+                    (lastFut.isDone() && lastFut.error() == null ? lastFut.topologyVersion() : lastFut.initialVersion())
                     : AffinityTopologyVersion.NONE;
 
             Collection<ClusterNode> rmts = cctx.discovery().remoteAliveNodesWithCaches(rmtTopVer);
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityFunctionBackupFilterAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityFunctionBackupFilterAbstractSelfTest.java
index 78b6c46..2301a62 100644
--- a/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityFunctionBackupFilterAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityFunctionBackupFilterAbstractSelfTest.java
@@ -27,6 +27,7 @@ import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
@@ -42,7 +43,7 @@ import static org.apache.ignite.cache.CacheRebalanceMode.SYNC;
  */
 public abstract class AffinityFunctionBackupFilterAbstractSelfTest extends GridCommonAbstractTest {
     /** Split attribute name. */
-    private static final String SPLIT_ATTRIBUTE_NAME = "split-attribute";
+    protected static final String SPLIT_ATTRIBUTE_NAME = "split-attribute";
 
     /** Split attribute value. */
     private String splitAttrVal;
@@ -51,7 +52,7 @@ public abstract class AffinityFunctionBackupFilterAbstractSelfTest extends GridC
     public static final String FIRST_NODE_GROUP = "A";
 
     /** Backup count. */
-    private int backups = 1;
+    protected int backups = 1;
 
     /** Test backup filter. */
     protected static final IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter =
@@ -87,7 +88,7 @@ public abstract class AffinityFunctionBackupFilterAbstractSelfTest extends GridC
      * @param nodes List of cluster nodes.
      * @return Statistic.
      */
-    @NotNull private static Map<String, Integer> getAttributeStatistic(Collection<ClusterNode> nodes) {
+    @NotNull protected static Map<String, Integer> getAttributeStatistic(Collection<ClusterNode> nodes) {
         Map<String, Integer> backupAssignedAttribute = new HashMap<>();
 
         backupAssignedAttribute.put(FIRST_NODE_GROUP, 0);
@@ -130,6 +131,7 @@ public abstract class AffinityFunctionBackupFilterAbstractSelfTest extends GridC
 
         cfg.setCacheConfiguration(cacheCfg);
         cfg.setUserAttributes(F.asMap(SPLIT_ATTRIBUTE_NAME, splitAttrVal));
+        cfg.setConsistentId(igniteInstanceName);
 
         return cfg;
     }
@@ -175,7 +177,7 @@ public abstract class AffinityFunctionBackupFilterAbstractSelfTest extends GridC
      * @throws Exception If failed.
      */
     @SuppressWarnings("ConstantConditions")
-    private void checkPartitions() throws Exception {
+    protected void checkPartitions() throws Exception {
         AffinityFunction aff = cacheConfiguration(grid(0).configuration(), DEFAULT_CACHE_NAME).getAffinity();
 
         int partCnt = aff.partitions();
@@ -227,6 +229,18 @@ public abstract class AffinityFunctionBackupFilterAbstractSelfTest extends GridC
         }
     }
 
+    /**
+     * Start grid with split attribute value.
+     *
+     * @param gridIdx Grid index.
+     * @param splitAttrVal Split attribute value.
+     */
+    protected IgniteEx startGrid(int gridIdx, String splitAttrVal) throws Exception {
+        this.splitAttrVal = splitAttrVal;
+
+        return startGrid(gridIdx);
+    }
+
     /* Different affinityBackupFilters have different goals */
     protected int expectedNodesForEachPartition() {
        return backups + 1;
@@ -235,7 +249,7 @@ public abstract class AffinityFunctionBackupFilterAbstractSelfTest extends GridC
     /**
      * @throws Exception If failed.
      */
-    private void checkPartitionsWithAffinityBackupFilter() throws Exception {
+    protected void checkPartitionsWithAffinityBackupFilter() throws Exception {
         AffinityFunction aff = cacheConfiguration(grid(0).configuration(), DEFAULT_CACHE_NAME).getAffinity();
 
         int partCnt = aff.partitions();
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityFunctionExcludeNeighborsAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityFunctionExcludeNeighborsAbstractSelfTest.java
index 70ad722..6669935 100644
--- a/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityFunctionExcludeNeighborsAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityFunctionExcludeNeighborsAbstractSelfTest.java
@@ -43,7 +43,7 @@ import static org.apache.ignite.cache.CacheRebalanceMode.NONE;
  */
 public abstract class AffinityFunctionExcludeNeighborsAbstractSelfTest extends GridCommonAbstractTest {
     /** Number of backups. */
-    private int backups = 2;
+    protected int backups = 2;
 
     /** Number of girds. */
     private int gridInstanceNum;
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/ClusterNodeAttributeColocatedBackupFilterSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/ClusterNodeAttributeColocatedBackupFilterSelfTest.java
new file mode 100644
index 0000000..36831e0
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/ClusterNodeAttributeColocatedBackupFilterSelfTest.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.affinity.rendezvous;
+
+import java.util.Collection;
+import java.util.Map;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.affinity.AffinityFunction;
+import org.apache.ignite.cache.affinity.AffinityFunctionBackupFilterAbstractSelfTest;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Test;
+
+import static org.junit.Assert.assertNotEquals;
+
+/**
+ * Tests of {@link AffinityFunction} implementations with {@link ClusterNodeAttributeColocatedBackupFilter}.
+ */
+public class ClusterNodeAttributeColocatedBackupFilterSelfTest extends AffinityFunctionBackupFilterAbstractSelfTest {
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        return super.getConfiguration(igniteInstanceName).setFailureHandler((i, f) -> true);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected AffinityFunction affinityFunction() {
+        return affinityFunctionWithAffinityBackupFilter(SPLIT_ATTRIBUTE_NAME);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected AffinityFunction affinityFunctionWithAffinityBackupFilter(String attrName) {
+        RendezvousAffinityFunction aff = new RendezvousAffinityFunction(false);
+
+        aff.setAffinityBackupFilter(new ClusterNodeAttributeColocatedBackupFilter(attrName));
+
+        return aff;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void checkPartitionsWithAffinityBackupFilter() {
+        AffinityFunction aff = cacheConfiguration(grid(0).configuration(), DEFAULT_CACHE_NAME).getAffinity();
+
+        int partCnt = aff.partitions();
+
+        int iter = grid(0).cluster().nodes().size() / 4;
+
+        IgniteCache<Object, Object> cache = grid(0).cache(DEFAULT_CACHE_NAME);
+
+        for (int i = 0; i < partCnt; i++) {
+            Collection<ClusterNode> nodes = affinity(cache).mapKeyToPrimaryAndBackups(i);
+
+            Map<String, Integer> stat = getAttributeStatistic(nodes);
+
+            if (stat.get(FIRST_NODE_GROUP) > 0) {
+                assertEquals((Integer)Math.min(backups + 1, iter * 2), stat.get(FIRST_NODE_GROUP));
+                assertEquals((Integer)0, stat.get("B"));
+                assertEquals((Integer)0, stat.get("C"));
+            }
+            else if (stat.get("B") > 0) {
+                assertEquals((Integer)0, stat.get(FIRST_NODE_GROUP));
+                assertEquals((Integer)iter, stat.get("B"));
+                assertEquals((Integer)0, stat.get("C"));
+            }
+            else if (stat.get("C") > 0) {
+                assertEquals((Integer)0, stat.get(FIRST_NODE_GROUP));
+                assertEquals((Integer)0, stat.get("B"));
+                assertEquals((Integer)iter, stat.get("C"));
+            }
+            else
+                fail("Unexpected partition assignment");
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void checkPartitions() throws Exception {
+        int iter = grid(0).cluster().nodes().size() / 2;
+
+        AffinityFunction aff = cacheConfiguration(grid(0).configuration(), DEFAULT_CACHE_NAME).getAffinity();
+
+        Map<Integer, String> partToAttr = partToAttribute(grid(0).cache(DEFAULT_CACHE_NAME), aff.partitions());
+
+        assertTrue(F.exist(partToAttr.values(), "A"::equals));
+        assertTrue(F.exist(partToAttr.values(), "B"::equals));
+        assertFalse(F.exist(partToAttr.values(), v -> !"A".equals(v) && !"B".equals(v)));
+    }
+
+    /** {@inheritDoc} */
+    @Override public void testPartitionDistributionWithAffinityBackupFilter() throws Exception {
+        backups = 2;
+
+        super.testPartitionDistributionWithAffinityBackupFilter();
+    }
+
+    /** */
+    @Test
+    public void testBackupFilterWithBaseline() throws Exception {
+        backups = 1;
+
+        try {
+            startGrid(0, "A");
+            startGrid(1, "B");
+            startGrid(2, "C");
+
+            startGrid(3, "A");
+            startGrid(4, "B");
+            startGrid(5, "C");
+
+            awaitPartitionMapExchange();
+
+            AffinityFunction aff = cacheConfiguration(grid(0).configuration(), DEFAULT_CACHE_NAME).getAffinity();
+
+            Map<Integer, String> partToAttr = partToAttribute(grid(0).cache(DEFAULT_CACHE_NAME), aff.partitions());
+
+            grid(0).cluster().baselineAutoAdjustEnabled(false);
+
+            // Check that we have the same distribution if some BLT nodes are offline.
+            stopGrid(3);
+            stopGrid(4);
+            stopGrid(5);
+
+            awaitPartitionMapExchange();
+
+            assertEquals(partToAttr, partToAttribute(grid(0).cache(DEFAULT_CACHE_NAME), aff.partitions()));
+
+            // Check that not BLT nodes do not affect distribution.
+            startGrid(6, "D");
+
+            awaitPartitionMapExchange();
+
+            assertEquals(partToAttr, partToAttribute(grid(0).cache(DEFAULT_CACHE_NAME), aff.partitions()));
+
+            // Check that distribution is recalculated after BLT change.
+            long topVer = grid(0).cluster().topologyVersion();
+
+            grid(0).cluster().setBaselineTopology(topVer);
+
+            // Wait for rebalance and assignment change to ideal assignment.
+            assertTrue(GridTestUtils.waitForCondition(() -> F.eq(grid(0).context().discovery().topologyVersionEx(),
+                    new AffinityTopologyVersion(topVer, 2)), 5_000L));
+
+            assertNotEquals(partToAttr, partToAttribute(grid(0).cache(DEFAULT_CACHE_NAME), aff.partitions()));
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /** */
+    @Test
+    public void testBackupFilterNullAttributeBltChange() throws Exception {
+        backups = 1;
+
+        try {
+            startGrid(0, "A");
+            startGrid(1, "A");
+
+            awaitPartitionMapExchange();
+
+            grid(0).cluster().baselineAutoAdjustEnabled(false);
+
+            // Join of non-BLT node with the empty attribute should not trigger failure handler.
+            startGrid(2, (String)null);
+
+            assertNull(grid(0).context().failure().failureContext());
+            assertNull(grid(1).context().failure().failureContext());
+            assertNull(grid(2).context().failure().failureContext());
+
+            // Include node with the empty attribute to the BLT should trigger failure handler on all nodes.
+            resetBaselineTopology();
+
+            assertNotNull(grid(0).context().failure().failureContext());
+            assertNotNull(grid(1).context().failure().failureContext());
+            assertNotNull(grid(2).context().failure().failureContext());
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /** */
+    @Test
+    public void testBackupFilterNullAttributeBltNodeJoin() throws Exception {
+        backups = 1;
+
+        try {
+            startGrid(0, "A");
+            startGrid(1, "A");
+
+            awaitPartitionMapExchange();
+
+            grid(0).cluster().baselineAutoAdjustEnabled(false);
+
+            stopGrid(1);
+
+            awaitPartitionMapExchange();
+
+            // Join of BLT node with the empty attribute should trigger failure handler on this node.
+            startGrid(1, (String)null);
+
+            assertNull(grid(0).context().failure().failureContext());
+            assertNotNull(grid(1).context().failure().failureContext());
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * Determine split attribute value for each partition and check that this value is the same for all nodes for
+     * this partition.
+     */
+    private Map<Integer, String> partToAttribute(IgniteCache<Object, Object> cache, int partCnt) {
+        Map<Integer, String> partToAttr = U.newHashMap(partCnt);
+
+        for (int i = 0; i < partCnt; i++) {
+            Collection<ClusterNode> nodes = affinity(cache).mapPartitionToPrimaryAndBackups(i);
+
+            assertFalse(F.isEmpty(nodes));
+            assertFalse(F.size(nodes) > backups + 1);
+
+            String attrVal = F.first(nodes).attribute(SPLIT_ATTRIBUTE_NAME);
+
+            partToAttr.put(i, attrVal);
+
+            for (ClusterNode node : nodes)
+                assertEquals(attrVal, node.attribute(SPLIT_ATTRIBUTE_NAME));
+        }
+
+        return partToAttr;
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunctionExcludeNeighborsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunctionExcludeNeighborsSelfTest.java
index 2c6c206..4508db8 100644
--- a/modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunctionExcludeNeighborsSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunctionExcludeNeighborsSelfTest.java
@@ -17,16 +17,78 @@
 
 package org.apache.ignite.cache.affinity.rendezvous;
 
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.ignite.cache.affinity.Affinity;
 import org.apache.ignite.cache.affinity.AffinityFunction;
 import org.apache.ignite.cache.affinity.AffinityFunctionExcludeNeighborsAbstractSelfTest;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.IgniteNodeAttributes;
+import org.junit.Test;
 
 /**
  * Tests exclude neighbors flag for rendezvous affinity function.
  */
 public class RendezvousAffinityFunctionExcludeNeighborsSelfTest extends
     AffinityFunctionExcludeNeighborsAbstractSelfTest {
+    /** With backup filter flag. */
+    private boolean withBackupFilter;
+
     /** {@inheritDoc} */
     @Override protected AffinityFunction affinityFunction() {
-        return new RendezvousAffinityFunction(true);
+        if (withBackupFilter) {
+            return new RendezvousAffinityFunction(true)
+                    .setAffinityBackupFilter((c, l) -> backupFilter(l.get(0)) == backupFilter(c));
+        }
+        else
+            return new RendezvousAffinityFunction(true);
+    }
+
+    /** Checks that exclude neighbors flag and backup filter works together. */
+    @Test
+    public void testAffinityWithBackupFilter() throws Exception {
+        int grids = 9;
+        withBackupFilter = true;
+        int copies = backups + 1;
+
+        try {
+            startGrids(grids);
+
+            awaitPartitionMapExchange();
+
+            Affinity<Integer> aff = grid(0).affinity(DEFAULT_CACHE_NAME);
+
+            for (int i = 0; i < aff.partitions(); i++) {
+                Collection<ClusterNode> affNodes = aff.mapKeyToPrimaryAndBackups(i);
+
+                assertEquals(copies, affNodes.size());
+
+                Set<String> macs = new HashSet<>();
+
+                long backupFilterKey = -1L;
+
+                for (ClusterNode node : affNodes) {
+                    macs.add(node.attribute(IgniteNodeAttributes.ATTR_MACS));
+
+                    if (backupFilterKey < 0)
+                        backupFilterKey = backupFilter(node);
+                    else
+                        assertEquals(backupFilterKey, backupFilter(node));
+                }
+
+                assertEquals(copies, macs.size());
+            }
+        }
+        finally {
+            withBackupFilter = false;
+
+            stopAllGrids();
+        }
+    }
+
+    /** Value for filtering nodes by backup filter. */
+    private static long backupFilter(ClusterNode node) {
+        return node.order() % 3;
     }
 }
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/offheap/GridOffHeapPartitionedMapAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/offheap/GridOffHeapPartitionedMapAbstractSelfTest.java
index c585528..ae3a0c9 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/offheap/GridOffHeapPartitionedMapAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/offheap/GridOffHeapPartitionedMapAbstractSelfTest.java
@@ -255,8 +255,6 @@ public abstract class GridOffHeapPartitionedMapAbstractSelfTest extends GridComm
 
         AffinityFunction aff = new RendezvousAffinityFunction(parts, null);
 
-        getTestResources().inject(aff);
-
         GridByteArrayWrapper[] keys = new GridByteArrayWrapper[512];
         Random rnd = new Random();
 
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite2.java
index b0e7df4..b9768c2 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite2.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite2.java
@@ -22,6 +22,7 @@ import java.util.HashSet;
 import java.util.List;
 import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cache.affinity.rendezvous.ClusterNodeAttributeAffinityBackupFilterSelfTest;
+import org.apache.ignite.cache.affinity.rendezvous.ClusterNodeAttributeColocatedBackupFilterSelfTest;
 import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunctionBackupFilterSelfTest;
 import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunctionExcludeNeighborsSelfTest;
 import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunctionFastPowerOfTwoHashSelfTest;
@@ -168,6 +169,7 @@ public class IgniteCacheMvccTestSuite2 {
         ignoredTests.add(GridCachePartitionedProjectionAffinitySelfTest.class);
         ignoredTests.add(RendezvousAffinityFunctionBackupFilterSelfTest.class);
         ignoredTests.add(ClusterNodeAttributeAffinityBackupFilterSelfTest.class);
+        ignoredTests.add(ClusterNodeAttributeColocatedBackupFilterSelfTest.class);
         ignoredTests.add(NonAffinityCoordinatorDynamicStartStopTest.class);
 
         ignoredTests.add(NoneRebalanceModeSelfTest.class);
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
index bdb174a..87ef591 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import org.apache.ignite.cache.affinity.rendezvous.ClusterNodeAttributeAffinityBackupFilterSelfTest;
+import org.apache.ignite.cache.affinity.rendezvous.ClusterNodeAttributeColocatedBackupFilterSelfTest;
 import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunctionBackupFilterSelfTest;
 import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunctionExcludeNeighborsSelfTest;
 import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunctionFastPowerOfTwoHashSelfTest;
@@ -378,6 +379,7 @@ public class IgniteCacheTestSuite2 {
 
         GridTestUtils.addTestIfNeeded(suite, RendezvousAffinityFunctionBackupFilterSelfTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, ClusterNodeAttributeAffinityBackupFilterSelfTest.class, ignoredTests);
+        GridTestUtils.addTestIfNeeded(suite, ClusterNodeAttributeColocatedBackupFilterSelfTest.class, ignoredTests);
 
         GridTestUtils.addTestIfNeeded(suite, CachePartitionStateTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, CacheComparatorTest.class, ignoredTests);