You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by "alievmirza (via GitHub)" <gi...@apache.org> on 2023/01/31 07:46:45 UTC

[GitHub] [ignite-3] alievmirza commented on a diff in pull request #1572: IGNITE-18088 Trigger rebalance on zone.dataNodes change.

alievmirza commented on code in PR #1572:
URL: https://github.com/apache/ignite-3/pull/1572#discussion_r1091530288


##########
modules/affinity/src/test/java/org/apache/ignite/internal/affinity/RendezvousAffinityFunctionTest.java:
##########
@@ -51,32 +49,32 @@ public class RendezvousAffinityFunctionTest {
 
     @Test
     public void testPartitionDistribution() {
-        int nodes = 50;
+        int nodeCount = 50;
 
         int parts = 10_000;
 
         int replicas = 4;
 
-        List<ClusterNode> clusterNodes = prepareNetworkTopology(nodes);
+        List<String> nodes = prepareNetworkTopology(nodeCount);
 
-        assertTrue(parts > nodes, "Partitions should be more that nodes");
+        assertTrue(parts > nodeCount, "Partitions should be more that nodes");

Review Comment:
   than



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItDistributionZonesTest.java:
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.distributionzones;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.Cluster.NodeKnockout.PARTITION_NETWORK;
+import static org.apache.ignite.internal.SessionUtils.executeUpdate;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.function.BooleanSupplier;
+import java.util.stream.Collectors;
+import org.apache.ignite.internal.Cluster;
+import org.apache.ignite.internal.affinity.Assignment;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.raft.RaftNodeId;
+import org.apache.ignite.internal.replicator.exception.ReplicaUnavailableException;
+import org.apache.ignite.internal.schema.BinaryRowEx;
+import org.apache.ignite.internal.schema.configuration.ExtendedTableConfiguration;
+import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
+import org.apache.ignite.internal.schema.marshaller.TupleMarshallerImpl;
+import org.apache.ignite.internal.table.TableImpl;
+import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.table.Tuple;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * Test suite for rebalance process.
+ */
+@ExtendWith(WorkDirectoryExtension.class)
+@Timeout(90)
+public class ItDistributionZonesTest {
+    private static final IgniteLogger LOG = Loggers.forClass(ItDistributionZonesTest.class);
+
+    /**
+     * Nodes bootstrap configuration pattern.
+     *
+     * <p>rpcInstallSnapshotTimeout is changed to 10 seconds so that sporadic snapshot installation failures still
+     * allow tests pass thanks to retries.
+     */
+    private static final String NODE_BOOTSTRAP_CFG = "{\n"
+            + "  network: {\n"
+            + "    port:{},\n"
+            + "    nodeFinder:{\n"
+            + "      netClusterNodes: [ {} ]\n"
+            + "    }\n"
+            + "  },\n"
+            + "  raft.rpcInstallSnapshotTimeout: 10000"
+            + "}";
+
+    @WorkDirectory
+    private Path workDir;
+
+    private Cluster cluster;
+
+    @BeforeEach
+    void createCluster(TestInfo testInfo) {
+        cluster = new Cluster(testInfo, workDir, NODE_BOOTSTRAP_CFG);
+    }
+
+    @AfterEach
+    @Timeout(60)
+    void shutdownCluster() {
+        cluster.shutdown();
+    }
+
+    @Test
+    @Disabled
+    void assingmentsChangingOnNodeLeaveNodeJoin() throws Exception {
+        cluster.startAndInit(4);
+
+        createTestTable();
+
+        assertTrue(waitAssingments(List.of(
+                Set.of(0, 1, 2),
+                Set.of(0, 1, 2),
+                Set.of(0, 1, 2),
+                Set.of(0, 1, 2)
+        )));
+
+        TableImpl table = (TableImpl) cluster.node(0).tables().table("TEST");
+
+        BinaryRowEx key = new TupleMarshallerImpl(table.schemaView()).marshal(Tuple.create().set("id", 1));
+
+        assertNull(table.internalTable().get(key, new HybridClockImpl().now(), cluster.node(0).node()).get());
+        assertNull(table.internalTable().get(key, new HybridClockImpl().now(), cluster.node(1).node()).get());
+        assertNull(table.internalTable().get(key, new HybridClockImpl().now(), cluster.node(2).node()).get());
+
+        putData();
+
+        assertNotNull(table.internalTable().get(key, new HybridClockImpl().now(), cluster.node(0).node()).get());
+        assertNotNull(table.internalTable().get(key, new HybridClockImpl().now(), cluster.node(1).node()).get());
+        assertNotNull(table.internalTable().get(key, new HybridClockImpl().now(), cluster.node(2).node()).get());
+
+        try {
+            table.internalTable().get(key, new HybridClockImpl().now(), cluster.node(3).node()).get();
+
+            fail();
+        } catch (Exception e) {
+            assertInstanceOf(ExecutionException.class, e);
+
+            assertInstanceOf(ReplicaUnavailableException.class, e.getCause());
+        }
+
+        cluster.knockOutNode(2, PARTITION_NETWORK);
+
+        assertTrue(waitAssingments(List.of(
+                Set.of(0, 1, 3),
+                Set.of(0, 1, 3),
+                Set.of(0, 1, 2),
+                Set.of(0, 1, 3)
+        )));
+
+        assertNotNull(table.internalTable().get(key, new HybridClockImpl().now(), cluster.node(0).node()).get());
+        assertNotNull(table.internalTable().get(key, new HybridClockImpl().now(), cluster.node(1).node()).get());
+        assertNotNull(table.internalTable().get(key, new HybridClockImpl().now(), cluster.node(3).node()).get());
+
+        cluster.reanimateNode(2, PARTITION_NETWORK);
+
+        assertTrue(waitAssingments(List.of(
+                Set.of(0, 1, 2),
+                Set.of(0, 1, 2),
+                Set.of(0, 1, 2),
+                Set.of(0, 1, 2)
+        )));
+
+        assertNotNull(table.internalTable().get(key, new HybridClockImpl().now(), cluster.node(0).node()).get());
+        assertNotNull(table.internalTable().get(key, new HybridClockImpl().now(), cluster.node(1).node()).get());
+        assertNotNull(table.internalTable().get(key, new HybridClockImpl().now(), cluster.node(2).node()).get());
+
+        try {
+            table.internalTable().get(key, new HybridClockImpl().now(), cluster.node(3).node()).get();
+
+            fail();
+        } catch (Exception e) {
+            assertInstanceOf(ExecutionException.class, e);
+
+            assertInstanceOf(ReplicaUnavailableException.class, e.getCause());
+        }
+    }
+
+    private boolean waitAssingments(List<Set<Integer>> nodes) throws InterruptedException {

Review Comment:
   `waitAssignments`
   
   Could you please write a javadoc explaining what is going on in this method? 



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -425,11 +431,48 @@ public TableManager(
                 new LinkedBlockingQueue<>(),
                 NamedThreadFactory.create(nodeName, "incoming-raft-snapshot", LOG)
         );
+
+        zonesWatchListener = new WatchListener() {
+            @Override
+            public void onUpdate(@NotNull WatchEvent evt) {

Review Comment:
   we do not use NotNull
   



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItDistributionZonesTest.java:
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.distributionzones;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.Cluster.NodeKnockout.PARTITION_NETWORK;
+import static org.apache.ignite.internal.SessionUtils.executeUpdate;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.function.BooleanSupplier;
+import java.util.stream.Collectors;
+import org.apache.ignite.internal.Cluster;
+import org.apache.ignite.internal.affinity.Assignment;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.raft.RaftNodeId;
+import org.apache.ignite.internal.replicator.exception.ReplicaUnavailableException;
+import org.apache.ignite.internal.schema.BinaryRowEx;
+import org.apache.ignite.internal.schema.configuration.ExtendedTableConfiguration;
+import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
+import org.apache.ignite.internal.schema.marshaller.TupleMarshallerImpl;
+import org.apache.ignite.internal.table.TableImpl;
+import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.table.Tuple;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * Test suite for rebalance process.
+ */
+@ExtendWith(WorkDirectoryExtension.class)
+@Timeout(90)
+public class ItDistributionZonesTest {
+    private static final IgniteLogger LOG = Loggers.forClass(ItDistributionZonesTest.class);
+
+    /**
+     * Nodes bootstrap configuration pattern.
+     *
+     * <p>rpcInstallSnapshotTimeout is changed to 10 seconds so that sporadic snapshot installation failures still
+     * allow tests pass thanks to retries.
+     */
+    private static final String NODE_BOOTSTRAP_CFG = "{\n"
+            + "  network: {\n"
+            + "    port:{},\n"
+            + "    nodeFinder:{\n"
+            + "      netClusterNodes: [ {} ]\n"
+            + "    }\n"
+            + "  },\n"
+            + "  raft.rpcInstallSnapshotTimeout: 10000"
+            + "}";
+
+    @WorkDirectory
+    private Path workDir;
+
+    private Cluster cluster;
+
+    @BeforeEach
+    void createCluster(TestInfo testInfo) {
+        cluster = new Cluster(testInfo, workDir, NODE_BOOTSTRAP_CFG);
+    }
+
+    @AfterEach
+    @Timeout(60)
+    void shutdownCluster() {
+        cluster.shutdown();
+    }
+
+    @Test
+    @Disabled
+    void assingmentsChangingOnNodeLeaveNodeJoin() throws Exception {
+        cluster.startAndInit(4);
+
+        createTestTable();
+
+        assertTrue(waitAssingments(List.of(
+                Set.of(0, 1, 2),
+                Set.of(0, 1, 2),
+                Set.of(0, 1, 2),
+                Set.of(0, 1, 2)
+        )));
+
+        TableImpl table = (TableImpl) cluster.node(0).tables().table("TEST");
+
+        BinaryRowEx key = new TupleMarshallerImpl(table.schemaView()).marshal(Tuple.create().set("id", 1));
+
+        assertNull(table.internalTable().get(key, new HybridClockImpl().now(), cluster.node(0).node()).get());
+        assertNull(table.internalTable().get(key, new HybridClockImpl().now(), cluster.node(1).node()).get());
+        assertNull(table.internalTable().get(key, new HybridClockImpl().now(), cluster.node(2).node()).get());
+
+        putData();
+
+        assertNotNull(table.internalTable().get(key, new HybridClockImpl().now(), cluster.node(0).node()).get());
+        assertNotNull(table.internalTable().get(key, new HybridClockImpl().now(), cluster.node(1).node()).get());
+        assertNotNull(table.internalTable().get(key, new HybridClockImpl().now(), cluster.node(2).node()).get());
+
+        try {
+            table.internalTable().get(key, new HybridClockImpl().now(), cluster.node(3).node()).get();
+
+            fail();
+        } catch (Exception e) {
+            assertInstanceOf(ExecutionException.class, e);
+
+            assertInstanceOf(ReplicaUnavailableException.class, e.getCause());
+        }
+
+        cluster.knockOutNode(2, PARTITION_NETWORK);
+
+        assertTrue(waitAssingments(List.of(
+                Set.of(0, 1, 3),
+                Set.of(0, 1, 3),
+                Set.of(0, 1, 2),
+                Set.of(0, 1, 3)
+        )));
+
+        assertNotNull(table.internalTable().get(key, new HybridClockImpl().now(), cluster.node(0).node()).get());
+        assertNotNull(table.internalTable().get(key, new HybridClockImpl().now(), cluster.node(1).node()).get());
+        assertNotNull(table.internalTable().get(key, new HybridClockImpl().now(), cluster.node(3).node()).get());
+
+        cluster.reanimateNode(2, PARTITION_NETWORK);
+
+        assertTrue(waitAssingments(List.of(
+                Set.of(0, 1, 2),
+                Set.of(0, 1, 2),
+                Set.of(0, 1, 2),
+                Set.of(0, 1, 2)
+        )));
+
+        assertNotNull(table.internalTable().get(key, new HybridClockImpl().now(), cluster.node(0).node()).get());
+        assertNotNull(table.internalTable().get(key, new HybridClockImpl().now(), cluster.node(1).node()).get());
+        assertNotNull(table.internalTable().get(key, new HybridClockImpl().now(), cluster.node(2).node()).get());
+
+        try {
+            table.internalTable().get(key, new HybridClockImpl().now(), cluster.node(3).node()).get();
+
+            fail();
+        } catch (Exception e) {
+            assertInstanceOf(ExecutionException.class, e);
+
+            assertInstanceOf(ReplicaUnavailableException.class, e.getCause());
+        }
+    }
+
+    private boolean waitAssingments(List<Set<Integer>> nodes) throws InterruptedException {
+        return waitForCondition(() -> {
+            for (int i = 0; i < nodes.size(); i++) {
+                Set<Integer> excpectedAssignments = nodes.get(i);

Review Comment:
   `expectedAssignments`



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItDistributionZonesTest.java:
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.distributionzones;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.Cluster.NodeKnockout.PARTITION_NETWORK;
+import static org.apache.ignite.internal.SessionUtils.executeUpdate;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.function.BooleanSupplier;
+import java.util.stream.Collectors;
+import org.apache.ignite.internal.Cluster;
+import org.apache.ignite.internal.affinity.Assignment;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.raft.RaftNodeId;
+import org.apache.ignite.internal.replicator.exception.ReplicaUnavailableException;
+import org.apache.ignite.internal.schema.BinaryRowEx;
+import org.apache.ignite.internal.schema.configuration.ExtendedTableConfiguration;
+import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
+import org.apache.ignite.internal.schema.marshaller.TupleMarshallerImpl;
+import org.apache.ignite.internal.table.TableImpl;
+import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.table.Tuple;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * Test suite for rebalance process.
+ */
+@ExtendWith(WorkDirectoryExtension.class)
+@Timeout(90)
+public class ItDistributionZonesTest {

Review Comment:
   Should this test extend some basic test class like `BaseIgniteAbstractTest`? You can reuse `NODE_BOOTSTRAP_CFG` at least



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItDistributionZonesTest.java:
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.distributionzones;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.Cluster.NodeKnockout.PARTITION_NETWORK;
+import static org.apache.ignite.internal.SessionUtils.executeUpdate;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.function.BooleanSupplier;
+import java.util.stream.Collectors;
+import org.apache.ignite.internal.Cluster;
+import org.apache.ignite.internal.affinity.Assignment;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.raft.RaftNodeId;
+import org.apache.ignite.internal.replicator.exception.ReplicaUnavailableException;
+import org.apache.ignite.internal.schema.BinaryRowEx;
+import org.apache.ignite.internal.schema.configuration.ExtendedTableConfiguration;
+import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
+import org.apache.ignite.internal.schema.marshaller.TupleMarshallerImpl;
+import org.apache.ignite.internal.table.TableImpl;
+import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.table.Tuple;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * Test suite for rebalance process.
+ */
+@ExtendWith(WorkDirectoryExtension.class)
+@Timeout(90)
+public class ItDistributionZonesTest {
+    private static final IgniteLogger LOG = Loggers.forClass(ItDistributionZonesTest.class);
+
+    /**
+     * Nodes bootstrap configuration pattern.
+     *
+     * <p>rpcInstallSnapshotTimeout is changed to 10 seconds so that sporadic snapshot installation failures still
+     * allow tests pass thanks to retries.
+     */
+    private static final String NODE_BOOTSTRAP_CFG = "{\n"
+            + "  network: {\n"
+            + "    port:{},\n"
+            + "    nodeFinder:{\n"
+            + "      netClusterNodes: [ {} ]\n"
+            + "    }\n"
+            + "  },\n"
+            + "  raft.rpcInstallSnapshotTimeout: 10000"
+            + "}";
+
+    @WorkDirectory
+    private Path workDir;
+
+    private Cluster cluster;
+
+    @BeforeEach
+    void createCluster(TestInfo testInfo) {
+        cluster = new Cluster(testInfo, workDir, NODE_BOOTSTRAP_CFG);
+    }
+
+    @AfterEach
+    @Timeout(60)
+    void shutdownCluster() {
+        cluster.shutdown();
+    }
+
+    @Test
+    @Disabled

Review Comment:
   Why is this disabled without a ticket? 



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -425,11 +431,48 @@ public TableManager(
                 new LinkedBlockingQueue<>(),
                 NamedThreadFactory.create(nodeName, "incoming-raft-snapshot", LOG)
         );
+
+        zonesWatchListener = new WatchListener() {
+            @Override
+            public void onUpdate(@NotNull WatchEvent evt) {
+                NamedConfigurationTree<TableConfiguration, TableView, TableChange> tables = tablesCfg.tables();
+
+                int zoneId = extractZoneId(evt.entryEvent().newEntry().key());
+
+                Set<String> nodesIds = ByteUtils.fromBytes(evt.entryEvent().newEntry().value());
+
+                for (int i = 0; i < tables.value().size(); i++) {
+                    TableView tableView = tables.value().get(i);
+
+                    int tableZoneId = tableView.zoneId();
+
+                    if (zoneId == tableZoneId) {
+                        TableConfiguration tableCfg = tables.get(tableView.name());
+
+                        for (int part = 0; part < tableView.partitions(); part++) {
+                            UUID tableId = ((ExtendedTableConfiguration) tableCfg).id().value();
+
+                            TablePartitionId replicaGrpId = new TablePartitionId(tableId, part);
+
+                            updatePendingAssignmentsKeys(tableView.name(), replicaGrpId, nodesIds, tableView.replicas(),
+                                    evt.entryEvent().newEntry().revision(), metaStorageMgr, part);
+                        }
+                    }
+                }
+            }
+
+            @Override
+            public void onError(@NotNull Throwable e) {
+                LOG.warn("Unable to process stable assignments event", e);

Review Comment:
   This is not a table assignments event



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItDistributionZonesTest.java:
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.distributionzones;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.Cluster.NodeKnockout.PARTITION_NETWORK;
+import static org.apache.ignite.internal.SessionUtils.executeUpdate;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.function.BooleanSupplier;
+import java.util.stream.Collectors;
+import org.apache.ignite.internal.Cluster;
+import org.apache.ignite.internal.affinity.Assignment;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.raft.RaftNodeId;
+import org.apache.ignite.internal.replicator.exception.ReplicaUnavailableException;
+import org.apache.ignite.internal.schema.BinaryRowEx;
+import org.apache.ignite.internal.schema.configuration.ExtendedTableConfiguration;
+import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
+import org.apache.ignite.internal.schema.marshaller.TupleMarshallerImpl;
+import org.apache.ignite.internal.table.TableImpl;
+import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.table.Tuple;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * Test suite for rebalance process.
+ */
+@ExtendWith(WorkDirectoryExtension.class)
+@Timeout(90)
+public class ItDistributionZonesTest {
+    private static final IgniteLogger LOG = Loggers.forClass(ItDistributionZonesTest.class);
+
+    /**
+     * Nodes bootstrap configuration pattern.
+     *
+     * <p>rpcInstallSnapshotTimeout is changed to 10 seconds so that sporadic snapshot installation failures still
+     * allow tests pass thanks to retries.
+     */
+    private static final String NODE_BOOTSTRAP_CFG = "{\n"
+            + "  network: {\n"
+            + "    port:{},\n"
+            + "    nodeFinder:{\n"
+            + "      netClusterNodes: [ {} ]\n"
+            + "    }\n"
+            + "  },\n"
+            + "  raft.rpcInstallSnapshotTimeout: 10000"
+            + "}";
+
+    @WorkDirectory
+    private Path workDir;
+
+    private Cluster cluster;
+
+    @BeforeEach
+    void createCluster(TestInfo testInfo) {
+        cluster = new Cluster(testInfo, workDir, NODE_BOOTSTRAP_CFG);
+    }
+
+    @AfterEach
+    @Timeout(60)
+    void shutdownCluster() {
+        cluster.shutdown();
+    }
+
+    @Test
+    @Disabled
+    void assingmentsChangingOnNodeLeaveNodeJoin() throws Exception {

Review Comment:
   typo



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItDistributionZonesTest.java:
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.distributionzones;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.Cluster.NodeKnockout.PARTITION_NETWORK;
+import static org.apache.ignite.internal.SessionUtils.executeUpdate;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.function.BooleanSupplier;
+import java.util.stream.Collectors;
+import org.apache.ignite.internal.Cluster;
+import org.apache.ignite.internal.affinity.Assignment;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.raft.RaftNodeId;
+import org.apache.ignite.internal.replicator.exception.ReplicaUnavailableException;
+import org.apache.ignite.internal.schema.BinaryRowEx;
+import org.apache.ignite.internal.schema.configuration.ExtendedTableConfiguration;
+import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
+import org.apache.ignite.internal.schema.marshaller.TupleMarshallerImpl;
+import org.apache.ignite.internal.table.TableImpl;
+import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.table.Tuple;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * Test suite for rebalance process.
+ */
+@ExtendWith(WorkDirectoryExtension.class)
+@Timeout(90)
+public class ItDistributionZonesTest {
+    private static final IgniteLogger LOG = Loggers.forClass(ItDistributionZonesTest.class);
+
+    /**
+     * Nodes bootstrap configuration pattern.
+     *
+     * <p>rpcInstallSnapshotTimeout is changed to 10 seconds so that sporadic snapshot installation failures still
+     * allow tests pass thanks to retries.
+     */
+    private static final String NODE_BOOTSTRAP_CFG = "{\n"
+            + "  network: {\n"
+            + "    port:{},\n"
+            + "    nodeFinder:{\n"
+            + "      netClusterNodes: [ {} ]\n"
+            + "    }\n"
+            + "  },\n"
+            + "  raft.rpcInstallSnapshotTimeout: 10000"
+            + "}";
+
+    @WorkDirectory
+    private Path workDir;
+
+    private Cluster cluster;
+
+    @BeforeEach
+    void createCluster(TestInfo testInfo) {
+        cluster = new Cluster(testInfo, workDir, NODE_BOOTSTRAP_CFG);
+    }
+
+    @AfterEach
+    @Timeout(60)
+    void shutdownCluster() {
+        cluster.shutdown();
+    }
+
+    @Test
+    @Disabled
+    void assingmentsChangingOnNodeLeaveNodeJoin() throws Exception {
+        cluster.startAndInit(4);
+
+        createTestTable();
+
+        assertTrue(waitAssingments(List.of(
+                Set.of(0, 1, 2),
+                Set.of(0, 1, 2),
+                Set.of(0, 1, 2),
+                Set.of(0, 1, 2)
+        )));
+
+        TableImpl table = (TableImpl) cluster.node(0).tables().table("TEST");
+
+        BinaryRowEx key = new TupleMarshallerImpl(table.schemaView()).marshal(Tuple.create().set("id", 1));
+
+        assertNull(table.internalTable().get(key, new HybridClockImpl().now(), cluster.node(0).node()).get());
+        assertNull(table.internalTable().get(key, new HybridClockImpl().now(), cluster.node(1).node()).get());
+        assertNull(table.internalTable().get(key, new HybridClockImpl().now(), cluster.node(2).node()).get());
+
+        putData();
+
+        assertNotNull(table.internalTable().get(key, new HybridClockImpl().now(), cluster.node(0).node()).get());
+        assertNotNull(table.internalTable().get(key, new HybridClockImpl().now(), cluster.node(1).node()).get());
+        assertNotNull(table.internalTable().get(key, new HybridClockImpl().now(), cluster.node(2).node()).get());
+
+        try {
+            table.internalTable().get(key, new HybridClockImpl().now(), cluster.node(3).node()).get();
+
+            fail();
+        } catch (Exception e) {
+            assertInstanceOf(ExecutionException.class, e);
+
+            assertInstanceOf(ReplicaUnavailableException.class, e.getCause());
+        }
+
+        cluster.knockOutNode(2, PARTITION_NETWORK);
+
+        assertTrue(waitAssingments(List.of(
+                Set.of(0, 1, 3),
+                Set.of(0, 1, 3),
+                Set.of(0, 1, 2),

Review Comment:
   Why do we have `Set.of(0, 1, 2)` here?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -425,11 +431,48 @@ public TableManager(
                 new LinkedBlockingQueue<>(),
                 NamedThreadFactory.create(nodeName, "incoming-raft-snapshot", LOG)
         );
+
+        zonesWatchListener = new WatchListener() {
+            @Override
+            public void onUpdate(@NotNull WatchEvent evt) {
+                NamedConfigurationTree<TableConfiguration, TableView, TableChange> tables = tablesCfg.tables();
+
+                int zoneId = extractZoneId(evt.entryEvent().newEntry().key());
+
+                Set<String> nodesIds = ByteUtils.fromBytes(evt.entryEvent().newEntry().value());
+
+                for (int i = 0; i < tables.value().size(); i++) {
+                    TableView tableView = tables.value().get(i);
+
+                    int tableZoneId = tableView.zoneId();
+
+                    if (zoneId == tableZoneId) {
+                        TableConfiguration tableCfg = tables.get(tableView.name());
+
+                        for (int part = 0; part < tableView.partitions(); part++) {
+                            UUID tableId = ((ExtendedTableConfiguration) tableCfg).id().value();
+
+                            TablePartitionId replicaGrpId = new TablePartitionId(tableId, part);
+
+                            updatePendingAssignmentsKeys(tableView.name(), replicaGrpId, nodesIds, tableView.replicas(),
+                                    evt.entryEvent().newEntry().revision(), metaStorageMgr, part);
+                        }
+                    }
+                }
+            }
+
+            @Override
+            public void onError(@NotNull Throwable e) {

Review Comment:
   NotNull



##########
modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerDistrubutionZoneTest.java:
##########
@@ -0,0 +1,460 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import static java.util.Collections.emptySet;
+import static org.apache.ignite.configuration.annotation.ConfigurationType.DISTRIBUTED;
+import static org.apache.ignite.internal.affinity.AffinityUtils.calculateAssignmentForPartition;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey;
+import static org.apache.ignite.internal.metastorage.impl.MetaStorageServiceImpl.toIfInfo;
+import static org.apache.ignite.internal.util.ByteUtils.fromBytes;
+import static org.apache.ignite.internal.util.ByteUtils.toBytes;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.ignite.configuration.ConfigurationValue;
+import org.apache.ignite.configuration.NamedConfigurationTree;
+import org.apache.ignite.configuration.NamedListView;
+import org.apache.ignite.internal.affinity.Assignment;
+import org.apache.ignite.internal.configuration.ConfigurationManager;
+import org.apache.ignite.internal.configuration.storage.TestConfigurationStorage;
+import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import org.apache.ignite.internal.distributionzones.configuration.DistributionZonesConfiguration;
+import org.apache.ignite.internal.metastorage.Entry;
+import org.apache.ignite.internal.metastorage.EntryEvent;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.WatchEvent;
+import org.apache.ignite.internal.metastorage.WatchListener;
+import org.apache.ignite.internal.metastorage.command.MetaStorageCommandsFactory;
+import org.apache.ignite.internal.metastorage.command.MultiInvokeCommand;
+import org.apache.ignite.internal.metastorage.command.info.StatementResultInfo;
+import org.apache.ignite.internal.metastorage.dsl.If;
+import org.apache.ignite.internal.metastorage.dsl.StatementResult;
+import org.apache.ignite.internal.metastorage.impl.EntryImpl;
+import org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
+import org.apache.ignite.internal.metastorage.server.raft.MetaStorageListener;
+import org.apache.ignite.internal.raft.Command;
+import org.apache.ignite.internal.raft.WriteCommand;
+import org.apache.ignite.internal.raft.service.CommandClosure;
+import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.schema.SchemaManager;
+import org.apache.ignite.internal.schema.configuration.ExtendedTableConfiguration;
+import org.apache.ignite.internal.schema.configuration.TableChange;
+import org.apache.ignite.internal.schema.configuration.TableConfiguration;
+import org.apache.ignite.internal.schema.configuration.TableView;
+import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
+import org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.OutgoingSnapshotsManager;
+import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.internal.utils.RebalanceUtil;
+import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.MessagingService;
+import org.apache.ignite.network.TopologyService;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
+
+
+/**
+ * Tests the distribution zone watch listener in {@link TableManager}.
+ */
+@ExtendWith({MockitoExtension.class, ConfigurationExtension.class})
+@MockitoSettings(strictness = Strictness.LENIENT)
+public class TableManagerDistrubutionZoneTest extends IgniteAbstractTest {

Review Comment:
   Typo



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org