You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2021/01/26 09:45:42 UTC

[GitHub] [ignite] ascherbakoff commented on a change in pull request #8668: IGNITE-13560 Add node attribute colocated affinity backup filter

ascherbakoff commented on a change in pull request #8668:
URL: https://github.com/apache/ignite/pull/8668#discussion_r564338368



##########
File path: modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/ClusterNodeAttributeColocatedBackupFilter.java
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.affinity.rendezvous;
+
+import java.util.List;
+import java.util.Objects;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.lang.IgniteBiPredicate;
+
+/**
+ * This class can be used as a {@link RendezvousAffinityFunction#affinityBackupFilter } to create
+ * cache templates in Spring that force each partition's primary and backup to be co-located on nodes with the same
+ * attribute value.
+ * <p>
+ *
+ * Partition copies co-location can be helpful to group nodes into cells when fixed baseline topology is used. If all
+ * copies of each partition are located inside only one cell, in case of {@code backup + 1} nodes leave the cluster
+ * there will be data lost only if all leaving nodes belong to the same cell. Without partition copies co-location
+ * within a cell, most probably there will be data lost if any {@code backup + 1} nodes leave the cluster.
+ *
+ * Note: Baseline topology change can lead to inter-cell partitions migration, i.e. rebalance can affect all copies
+ * of some partitions even if only one node is changed in the baseline topology.
+ * <p>
+ *
+ * This implementation will discard backups rather than place copies on nodes with different attribute values. This
+ * avoids trying to cram more data onto remaining nodes when some have failed.
+ * <p>
+ * A node attribute to compare is provided on construction.
+ *
+ * Note: All cluster nodes, on startup, automatically register all the environment and system properties as node
+ * attributes.
+ *
+ * Note: Node attributes persisted in baseline topology at the time of baseline topology change. If the co-location
+ * attribute of some node was updated, but the baseline topology wasn't changed, the outdated attribute value can be
+ * used by the backup filter when this node left the cluster. To avoid this, the baseline topology should be updated
+ * after changing the co-location attribute.
+ * <p>
+ * This class is constructed with a node attribute name, and a candidate node will be rejected if previously selected
+ * nodes for a partition have a different value for attribute on the candidate node.
+ * </pre>
+ * <h2 class="header">Spring Example</h2>
+ * Create a partitioned cache template plate with 1 backup, where the backup will be placed in the same cell
+ * as the primary.   Note: This example requires that the environment variable "CELL" be set appropriately on
+ * each node via some means external to Ignite.
+ * <pre name="code" class="xml">
+ * &lt;property name="cacheConfiguration"&gt;
+ *     &lt;list&gt;
+ *         &lt;bean id="cache-template-bean" abstract="true" class="org.apache.ignite.configuration.CacheConfiguration"&gt;
+ *             &lt;property name="name" value="JobcaseDefaultCacheConfig*"/&gt;
+ *             &lt;property name="cacheMode" value="PARTITIONED" /&gt;
+ *             &lt;property name="backups" value="1" /&gt;
+ *             &lt;property name="affinity"&gt;
+ *                 &lt;bean class="org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction"&gt;
+ *                     &lt;property name="affinityBackupFilter"&gt;
+ *                         &lt;bean class="org.apache.ignite.cache.affinity.rendezvous.ClusterNodeAttributeColocatedBackupFilter"&gt;
+ *                             &lt;!-- Backups must go to the same CELL as primary --&gt;
+ *                             &lt;constructor-arg value="CELL" /&gt;
+ *                         &lt;/bean&gt;
+ *                     &lt;/property&gt;
+ *                 &lt;/bean&gt;
+ *             &lt;/property&gt;
+ *         &lt;/bean&gt;
+ *     &lt;/list&gt;
+ * &lt;/property&gt;
+ * </pre>
+ * <p>
+ */
+public class ClusterNodeAttributeColocatedBackupFilter implements IgniteBiPredicate<ClusterNode, List<ClusterNode>> {
+    /** */
+    private static final long serialVersionUID = 1L;
+
+    /** Attribute name. */
+    private final String attrName;
+
+    /**
+     * @param attrName The attribute name for the attribute to compare.
+     */
+    public ClusterNodeAttributeColocatedBackupFilter(String attrName) {
+        this.attrName = attrName;
+    }
+
+    /**
+     * Defines a predicate which returns {@code true} if a node is acceptable for a backup
+     * or {@code false} otherwise. An acceptable node is one where its attribute value
+     * is exact match with previously selected nodes.  If an attribute does not
+     * exist on exactly one node of a pair, then the attribute does not match.  If the attribute
+     * does not exist both nodes of a pair, then the attribute matches.
+     *
+     * @param candidate          A node that is a candidate for becoming a backup node for a partition.
+     * @param previouslySelected A list of primary/backup nodes already chosen for a partition.
+     *                           The primary is first.
+     */
+    @Override public boolean apply(ClusterNode candidate, List<ClusterNode> previouslySelected) {
+        for (ClusterNode node : previouslySelected)
+            return Objects.equals(candidate.attribute(attrName), node.attribute(attrName));
+
+        return true;
+    }

Review comment:
       This can be simplfied to 
   `return Objects.equals(candidate.attribute(attrName), previouslySelected.get(0).attribute(attrName));`

##########
File path: modules/core/src/test/java/org/apache/ignite/cache/affinity/rendezvous/ClusterNodeAttributeColocatedBackupFilterSelfTest.java
##########
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.affinity.rendezvous;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.affinity.AffinityFunction;
+import org.apache.ignite.cache.affinity.AffinityFunctionBackupFilterAbstractSelfTest;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Test;
+
+import static org.junit.Assert.assertNotEquals;
+
+/**
+ * Partitioned affinity test.
+ */
+public class ClusterNodeAttributeColocatedBackupFilterSelfTest extends AffinityFunctionBackupFilterAbstractSelfTest {
+    /** {@inheritDoc} */
+    @Override protected AffinityFunction affinityFunction() {
+        return affinityFunctionWithAffinityBackupFilter(SPLIT_ATTRIBUTE_NAME);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected AffinityFunction affinityFunctionWithAffinityBackupFilter(String attrName) {
+        RendezvousAffinityFunction aff = new RendezvousAffinityFunction(false);
+
+        aff.setAffinityBackupFilter(new ClusterNodeAttributeColocatedBackupFilter(attrName));
+
+        return aff;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void checkPartitionsWithAffinityBackupFilter() {
+        AffinityFunction aff = cacheConfiguration(grid(0).configuration(), DEFAULT_CACHE_NAME).getAffinity();
+
+        int partCnt = aff.partitions();
+
+        int iter = grid(0).cluster().nodes().size() / 4;
+
+        IgniteCache<Object, Object> cache = grid(0).cache(DEFAULT_CACHE_NAME);
+
+        for (int i = 0; i < partCnt; i++) {
+            Collection<ClusterNode> nodes = affinity(cache).mapKeyToPrimaryAndBackups(i);
+
+            Map<String, Integer> stat = getAttributeStatistic(nodes);
+
+            if (stat.get(FIRST_NODE_GROUP) > 0) {
+                assertEquals((Integer)Math.min(backups + 1, iter * 2), stat.get(FIRST_NODE_GROUP));
+                assertEquals((Integer)0, stat.get("B"));
+                assertEquals((Integer)0, stat.get("C"));
+            }
+            else if (stat.get("B") > 0) {
+                assertEquals((Integer)0, stat.get(FIRST_NODE_GROUP));
+                assertEquals((Integer)iter, stat.get("B"));
+                assertEquals((Integer)0, stat.get("C"));
+            }
+            else if (stat.get("C") > 0) {
+                assertEquals((Integer)0, stat.get(FIRST_NODE_GROUP));
+                assertEquals((Integer)0, stat.get("B"));
+                assertEquals((Integer)iter, stat.get("C"));
+            }
+            else
+                fail("Unexpected partition assignment");
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void checkPartitions() throws Exception {
+        int iter = grid(0).cluster().nodes().size() / 2;
+
+        AffinityFunction aff = cacheConfiguration(grid(0).configuration(), DEFAULT_CACHE_NAME).getAffinity();
+
+        Map<Integer, String> partToAttr = partToAttr(grid(0).cache(DEFAULT_CACHE_NAME), aff.partitions());
+
+        assertTrue(F.exist(partToAttr.values(), "A"::equals));
+        assertTrue(F.exist(partToAttr.values(), "B"::equals));
+        assertFalse(F.exist(partToAttr.values(), v -> !"A".equals(v) && !"B".equals(v)));
+    }
+
+    /** {@inheritDoc} */
+    @Override public void testPartitionDistributionWithAffinityBackupFilter() throws Exception {
+        backups = 2;
+
+        super.testPartitionDistributionWithAffinityBackupFilter();
+    }
+
+    /** */
+    @Test
+    public void testBackupFilterWithBaseline() throws Exception {
+        backups = 1;
+
+        try {
+            startGrid(0, "A");
+            startGrid(1, "B");
+            startGrid(2, "C");
+
+            startGrid(3, "A");
+            startGrid(4, "B");
+            startGrid(5, "C");
+
+            awaitPartitionMapExchange();
+
+            AffinityFunction aff = cacheConfiguration(grid(0).configuration(), DEFAULT_CACHE_NAME).getAffinity();
+
+            Map<Integer, String> partToAttr = partToAttr(grid(0).cache(DEFAULT_CACHE_NAME), aff.partitions());
+
+            grid(0).cluster().baselineAutoAdjustEnabled(false);
+
+            // Check that we have the same distribution if some BLT nodes are offline.
+            stopGrid(3);
+            stopGrid(4);
+            stopGrid(5);
+
+            awaitPartitionMapExchange();
+
+            assertEquals(partToAttr, partToAttr(grid(0).cache(DEFAULT_CACHE_NAME), aff.partitions()));
+
+            // Check that not BLT nodes do not affect distribution.
+            startGrid(6, "D");
+
+            awaitPartitionMapExchange();
+
+            assertEquals(partToAttr, partToAttr(grid(0).cache(DEFAULT_CACHE_NAME), aff.partitions()));
+
+            // Check that distribution is recalculated after BLT change.
+            long topVer = grid(0).cluster().topologyVersion();
+
+            grid(0).cluster().setBaselineTopology(topVer);
+
+            // Wait for rebalance and assignment change to ideal assignment.
+            assertTrue(GridTestUtils.waitForCondition(() -> F.eq(grid(0).context().discovery().topologyVersionEx(),
+                    new AffinityTopologyVersion(topVer, 2)), 5_000L));
+
+            assertNotEquals(partToAttr, partToAttr(grid(0).cache(DEFAULT_CACHE_NAME), aff.partitions()));
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /** */
+    @Test
+    public void testBackupFilterWithNullAttribute() throws Exception {
+        backups = 1;
+
+        try {
+            startGrid(0, "A");
+            startGrid(1, "A");
+            startGrid(2, (String)null);
+            startGrid(3, (String)null);
+
+            awaitPartitionMapExchange();
+
+            AffinityFunction aff = cacheConfiguration(grid(0).configuration(), DEFAULT_CACHE_NAME).getAffinity();
+
+            Map<Integer, String> partToAttr = partToAttr(grid(0).cache(DEFAULT_CACHE_NAME), aff.partitions());
+
+            assertTrue(F.exist(partToAttr.values(), Objects::isNull));
+            assertTrue(F.exist(partToAttr.values(), "A"::equals));
+            assertFalse(F.exist(partToAttr.values(), v -> !(v == null) && !"A".equals(v)));
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * Determine split attribute value for each partition and check that this value is the same for all nodes for
+     * this partition.
+     */
+    private Map<Integer, String> partToAttr(IgniteCache<Object, Object> cache, int partCnt) {

Review comment:
       Method abbreviations are not allowed in code style.

##########
File path: modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/ClusterNodeAttributeColocatedBackupFilter.java
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.affinity.rendezvous;
+
+import java.util.List;
+import java.util.Objects;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.lang.IgniteBiPredicate;
+
+/**
+ * This class can be used as a {@link RendezvousAffinityFunction#affinityBackupFilter } to create
+ * cache templates in Spring that force each partition's primary and backup to be co-located on nodes with the same
+ * attribute value.
+ * <p>
+ *
+ * Partition copies co-location can be helpful to group nodes into cells when fixed baseline topology is used. If all
+ * copies of each partition are located inside only one cell, in case of {@code backup + 1} nodes leave the cluster
+ * there will be data lost only if all leaving nodes belong to the same cell. Without partition copies co-location
+ * within a cell, most probably there will be data lost if any {@code backup + 1} nodes leave the cluster.
+ *
+ * Note: Baseline topology change can lead to inter-cell partitions migration, i.e. rebalance can affect all copies
+ * of some partitions even if only one node is changed in the baseline topology.
+ * <p>
+ *
+ * This implementation will discard backups rather than place copies on nodes with different attribute values. This
+ * avoids trying to cram more data onto remaining nodes when some have failed.
+ * <p>
+ * A node attribute to compare is provided on construction.
+ *
+ * Note: All cluster nodes, on startup, automatically register all the environment and system properties as node
+ * attributes.
+ *
+ * Note: Node attributes persisted in baseline topology at the time of baseline topology change. If the co-location
+ * attribute of some node was updated, but the baseline topology wasn't changed, the outdated attribute value can be
+ * used by the backup filter when this node left the cluster. To avoid this, the baseline topology should be updated
+ * after changing the co-location attribute.
+ * <p>
+ * This class is constructed with a node attribute name, and a candidate node will be rejected if previously selected
+ * nodes for a partition have a different value for attribute on the candidate node.
+ * </pre>
+ * <h2 class="header">Spring Example</h2>
+ * Create a partitioned cache template plate with 1 backup, where the backup will be placed in the same cell
+ * as the primary.   Note: This example requires that the environment variable "CELL" be set appropriately on
+ * each node via some means external to Ignite.
+ * <pre name="code" class="xml">
+ * &lt;property name="cacheConfiguration"&gt;
+ *     &lt;list&gt;
+ *         &lt;bean id="cache-template-bean" abstract="true" class="org.apache.ignite.configuration.CacheConfiguration"&gt;
+ *             &lt;property name="name" value="JobcaseDefaultCacheConfig*"/&gt;
+ *             &lt;property name="cacheMode" value="PARTITIONED" /&gt;
+ *             &lt;property name="backups" value="1" /&gt;
+ *             &lt;property name="affinity"&gt;
+ *                 &lt;bean class="org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction"&gt;
+ *                     &lt;property name="affinityBackupFilter"&gt;
+ *                         &lt;bean class="org.apache.ignite.cache.affinity.rendezvous.ClusterNodeAttributeColocatedBackupFilter"&gt;
+ *                             &lt;!-- Backups must go to the same CELL as primary --&gt;
+ *                             &lt;constructor-arg value="CELL" /&gt;
+ *                         &lt;/bean&gt;
+ *                     &lt;/property&gt;
+ *                 &lt;/bean&gt;
+ *             &lt;/property&gt;
+ *         &lt;/bean&gt;
+ *     &lt;/list&gt;
+ * &lt;/property&gt;
+ * </pre>
+ * <p>
+ */
+public class ClusterNodeAttributeColocatedBackupFilter implements IgniteBiPredicate<ClusterNode, List<ClusterNode>> {
+    /** */
+    private static final long serialVersionUID = 1L;
+
+    /** Attribute name. */
+    private final String attrName;
+
+    /**
+     * @param attrName The attribute name for the attribute to compare.
+     */
+    public ClusterNodeAttributeColocatedBackupFilter(String attrName) {
+        this.attrName = attrName;
+    }
+
+    /**
+     * Defines a predicate which returns {@code true} if a node is acceptable for a backup
+     * or {@code false} otherwise. An acceptable node is one where its attribute value
+     * is exact match with previously selected nodes.  If an attribute does not

Review comment:
       I strongly suggest to deny null attributes in this filter and add the validation of attribute value existence, otherwise it's possible to lose data if a node was restarted without an attribute due to loss of owners.
   
   Checking not null value and throwing an error to trigger failure handler in case of absence will do.
   
   Javadoc should be modified to reflect a requirement of the attribute.
   
   Even better would be to replace attribute filter closure with something like this for correct attribute validation:
   ```
   /**
    * A basic class for node attribute aware backup filter implementations.
    * Listed attributes are required to be configured on new cluster node joining or first activation.
    */
   public abstract class NodeAttributeAwareBackupFilter implements IgniteBiPredicate<ClusterNode, List<ClusterNode>> {
       /** Attribute names. */
       protected final Set<String> attributeNames;
   
       /**
        * @param attributeNames The list of attribute names for the set of attributes to compare. Must be at least one.
        */
       protected NodeAttributeAwareBackupFilter(String... attributeNames) {
           A.ensure(attributeNames.length > 0, "attributeNames.length > 0");
   
           this.attributeNames = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(attributeNames)));
       }
   
       /**
        * @return The set of used cluster node attributes for this filter.
        */
       public Set<String> attributeNames() {
           return attributeNames;
       }
   }
   ```
   
   Ideally attribute values for backup filter should be persisted in the node's metastore to avoid accidental changes (without direct user request), or maybe persisted in cache configuration after cache creation.
   
   These improvements can be done in separate tickets.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org