You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2022/12/30 11:22:27 UTC

[GitHub] [ignite-3] ibessonov commented on a diff in pull request #1461: IGNITE-18079 Integrate RAFT streaming snapshots

ibessonov commented on code in PR #1461:
URL: https://github.com/apache/ignite-3/pull/1461#discussion_r1059290852


##########
modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/MockNode.java:
##########
@@ -77,7 +82,13 @@ private void init(int port) throws IOException {
 
         this.clusterService = ClusterServiceTestUtils.clusterService(testInfo, port, nodeFinder);
 
-        Loza raftManager = new Loza(clusterService, null, workDir, new HybridClockImpl());
+        RaftConfiguration raftConfiguration = mock(RaftConfiguration.class);

Review Comment:
   I wonder why we don't use configuration extension here. It would require only a single value, with configuration as a string, for example



##########
modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/MockNode.java:
##########
@@ -77,7 +82,13 @@ private void init(int port) throws IOException {
 
         this.clusterService = ClusterServiceTestUtils.clusterService(testInfo, port, nodeFinder);
 
-        Loza raftManager = new Loza(clusterService, null, workDir, new HybridClockImpl());
+        RaftConfiguration raftConfiguration = mock(RaftConfiguration.class);
+        ConfigurationValue<Integer> rpcInstallSnapshotTimeutValue = mock(ConfigurationValue.class);

Review Comment:
   Typo in `timeut`



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/Cluster.java:
##########
@@ -0,0 +1,404 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import org.apache.ignite.IgnitionManager;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.raft.RaftNodeId;
+import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
+import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.lang.IgniteStringFormatter;
+import org.apache.ignite.raft.jraft.RaftGroupService;
+import org.apache.ignite.raft.jraft.util.concurrent.ConcurrentHashSet;
+import org.apache.ignite.sql.ResultSet;
+import org.apache.ignite.sql.Session;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.TestInfo;
+
+/**
+ * Cluster of nodes used for testing.
+ */
+@SuppressWarnings("resource")
+public class Cluster {
+    private static final IgniteLogger LOG = Loggers.forClass(Cluster.class);
+
+    /** Base port number. */
+    private static final int BASE_PORT = 3344;
+
+    private static final String CONNECT_NODE_ADDR = "\"localhost:" + BASE_PORT + '\"';
+
+    /** Timeout for SQL queries (in milliseconds). */
+    private static final int QUERY_TIMEOUT_MS = 10_000;
+
+    private final TestInfo testInfo;
+
+    private final Path workDir;
+
+    private final String nodeBootstrapConfig;
+
+    /** Cluster nodes. */
+    private final List<IgniteImpl> nodes = new CopyOnWriteArrayList<>();
+
+    private volatile boolean started = false;
+
+    /** Indices of nodes that have been knocked out. */
+    private final Set<Integer> knockedOutNodeIndices = new ConcurrentHashSet<>();

Review Comment:
   I know I suck at grammar, but shouldn't it be a "knockedOutNode**s**Indices"



##########
modules/raft-api/src/main/java/org/apache/ignite/internal/raft/configuration/RaftConfigurationSchema.java:
##########
@@ -20,13 +20,18 @@
 import org.apache.ignite.configuration.annotation.ConfigValue;
 import org.apache.ignite.configuration.annotation.ConfigurationRoot;
 import org.apache.ignite.configuration.annotation.ConfigurationType;
+import org.apache.ignite.configuration.annotation.Value;
 
 /**
  * Raft configuration schema.
  */
 @SuppressWarnings("PMD.UnusedPrivateField")
 @ConfigurationRoot(rootName = "raft", type = ConfigurationType.LOCAL)
 public class RaftConfigurationSchema {
+    /** RPC Timeout for InstallSnapshot request (in milliseconds). */
+    @Value(hasDefault = true)
+    public int rpcInstallSnapshotTimeout = 5 * 60 * 1000;

Review Comment:
   5 minutes is definitely not long enough. What timeout do we use in Ignite 2.x?
   JRaft has been designed for very small state machines, in that case 5 minutes would have been reasonable. TODO is good, thank you!



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/Cluster.java:
##########
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import org.apache.ignite.IgnitionManager;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.raft.RaftNodeId;
+import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
+import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.lang.IgniteStringFormatter;
+import org.apache.ignite.raft.jraft.RaftGroupService;
+import org.apache.ignite.raft.jraft.util.concurrent.ConcurrentHashSet;
+import org.apache.ignite.sql.Session;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.TestInfo;
+
+/**
+ * Cluster of nodes used for testing.
+ */
+@SuppressWarnings("resource")
+public class Cluster {
+    private static final IgniteLogger LOG = Loggers.forClass(Cluster.class);
+
+    /** Base port number. */
+    private static final int BASE_PORT = 3344;
+
+    private static final String CONNECT_NODE_ADDR = "\"localhost:" + BASE_PORT + '\"';
+
+    /** Timeout for SQL queries (in milliseconds). */
+    private static final int QUERY_TIMEOUT_MS = 10_000;
+
+    private final TestInfo testInfo;
+
+    private final Path workDir;
+
+    private final String nodeBootstrapConfig;
+
+    /** Cluster nodes. */
+    private final List<IgniteImpl> nodes = new CopyOnWriteArrayList<>();
+
+    private volatile boolean started = false;
+
+    /** Indices of nodes that have been knocked out. */
+    private final Set<Integer> knockedOutIndices = new ConcurrentHashSet<>();
+
+    /**
+     * Creates a new instance.
+     */
+    public Cluster(TestInfo testInfo, Path workDir, String nodeBootstrapConfig) {
+        this.testInfo = testInfo;
+        this.workDir = workDir;
+        this.nodeBootstrapConfig = nodeBootstrapConfig;
+    }
+
+    /**
+     * Starts the cluster with the given number of nodes and initializes it.
+     *
+     * @param nodeCount Number of nodes in the cluster.
+     */
+    public void startAndInit(int nodeCount) {
+        if (started) {
+            throw new IllegalStateException("The cluster is already started");
+        }
+
+        List<CompletableFuture<IgniteImpl>> futures = IntStream.range(0, nodeCount)
+                .mapToObj(this::startClusterNode)
+                .collect(toList());
+
+        String metaStorageAndCmgNodeName = testNodeName(testInfo, 0);
+
+        IgnitionManager.init(metaStorageAndCmgNodeName, List.of(metaStorageAndCmgNodeName), "cluster");
+
+        for (CompletableFuture<IgniteImpl> future : futures) {
+            assertThat(future, willCompleteSuccessfully());
+
+            nodes.add(future.join());
+        }
+
+        started = true;
+    }
+
+    private CompletableFuture<IgniteImpl> startClusterNode(int nodeIndex) {
+        String nodeName = testNodeName(testInfo, nodeIndex);
+
+        String config = IgniteStringFormatter.format(nodeBootstrapConfig, BASE_PORT + nodeIndex, CONNECT_NODE_ADDR);
+
+        return IgnitionManager.start(nodeName, config, workDir.resolve(nodeName))
+                .thenApply(IgniteImpl.class::cast);
+    }
+
+    /**
+     * Returns an Ignite node (a member of the cluster) by its index.
+     */
+    public IgniteImpl node(int index) {
+        return nodes.get(index);
+    }
+
+    /**
+     * Returns a node that is not stopped and not knocked out (so it can be used to interact with the cluster).
+     */
+    public IgniteImpl entryNode() {

Review Comment:
   Maybe aliveNode? We use this word quite often in Ignite 2



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/Cluster.java:
##########
@@ -0,0 +1,404 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import org.apache.ignite.IgnitionManager;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.raft.RaftNodeId;
+import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
+import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.lang.IgniteStringFormatter;
+import org.apache.ignite.raft.jraft.RaftGroupService;
+import org.apache.ignite.raft.jraft.util.concurrent.ConcurrentHashSet;
+import org.apache.ignite.sql.ResultSet;
+import org.apache.ignite.sql.Session;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.TestInfo;
+
+/**
+ * Cluster of nodes used for testing.
+ */
+@SuppressWarnings("resource")
+public class Cluster {
+    private static final IgniteLogger LOG = Loggers.forClass(Cluster.class);
+
+    /** Base port number. */
+    private static final int BASE_PORT = 3344;
+
+    private static final String CONNECT_NODE_ADDR = "\"localhost:" + BASE_PORT + '\"';
+
+    /** Timeout for SQL queries (in milliseconds). */
+    private static final int QUERY_TIMEOUT_MS = 10_000;
+
+    private final TestInfo testInfo;
+
+    private final Path workDir;
+
+    private final String nodeBootstrapConfig;
+
+    /** Cluster nodes. */
+    private final List<IgniteImpl> nodes = new CopyOnWriteArrayList<>();
+
+    private volatile boolean started = false;
+
+    /** Indices of nodes that have been knocked out. */
+    private final Set<Integer> knockedOutNodeIndices = new ConcurrentHashSet<>();
+
+    /**
+     * Creates a new instance.
+     */
+    public Cluster(TestInfo testInfo, Path workDir, String nodeBootstrapConfig) {
+        this.testInfo = testInfo;
+        this.workDir = workDir;
+        this.nodeBootstrapConfig = nodeBootstrapConfig;
+    }
+
+    /**
+     * Starts the cluster with the given number of nodes and initializes it.
+     *
+     * @param nodeCount Number of nodes in the cluster.
+     */
+    public void startAndInit(int nodeCount) {
+        if (started) {
+            throw new IllegalStateException("The cluster is already started");
+        }
+
+        List<CompletableFuture<IgniteImpl>> futures = IntStream.range(0, nodeCount)
+                .mapToObj(this::startClusterNode)
+                .collect(toList());
+
+        String metaStorageAndCmgNodeName = testNodeName(testInfo, 0);
+
+        IgnitionManager.init(metaStorageAndCmgNodeName, List.of(metaStorageAndCmgNodeName), "cluster");
+
+        for (CompletableFuture<IgniteImpl> future : futures) {
+            assertThat(future, willCompleteSuccessfully());
+
+            nodes.add(future.join());
+        }
+
+        started = true;
+    }
+
+    private CompletableFuture<IgniteImpl> startClusterNode(int nodeIndex) {
+        String nodeName = testNodeName(testInfo, nodeIndex);
+
+        String config = IgniteStringFormatter.format(nodeBootstrapConfig, BASE_PORT + nodeIndex, CONNECT_NODE_ADDR);
+
+        return IgnitionManager.start(nodeName, config, workDir.resolve(nodeName))
+                .thenApply(IgniteImpl.class::cast);
+    }
+
+    /**
+     * Returns an Ignite node (a member of the cluster) by its index.
+     */
+    public IgniteImpl node(int index) {
+        return Objects.requireNonNull(nodes.get(index));
+    }
+
+    /**
+     * Returns a node that is not stopped and not knocked out (so it can be used to interact with the cluster).
+     */
+    public IgniteImpl entryNode() {
+        return IntStream.range(0, nodes.size())
+                .filter(index -> nodes.get(index) != null)
+                .filter(index -> !knockedOutNodeIndices.contains(index))
+                .mapToObj(nodes::get)
+                .findAny()
+                .orElseThrow(() -> new IllegalStateException("There is no single alive node that would not be knocked out"));
+    }
+
+    /**
+     * Starts a new node with the given index.
+     *
+     * @param index Node index.
+     * @return Started node (if the cluster is already initialized, the node is returned when it joins the cluster; if it
+     *     is not initialized, the node is returned in a state in which it is ready to join the cluster).
+     */
+    public IgniteImpl startNode(int index) {
+        checkNodeIndex(index);
+
+        IgniteImpl newIgniteNode;
+
+        try {
+            newIgniteNode = startClusterNode(index).get(10, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+
+            throw new RuntimeException(e);
+        } catch (ExecutionException | TimeoutException e) {
+            throw new RuntimeException(e);
+        }
+
+        nodes.set(index, newIgniteNode);
+
+        return newIgniteNode;
+    }
+
+    private void checkNodeIndex(int index) {
+        if (index < 0) {
+            throw new IllegalArgumentException("Index cannot be negative");
+        }
+        if (index >= nodes.size()) {
+            throw new IllegalArgumentException("Cluster only contains " + nodes.size() + " nodes, but node with index "
+                    + index + " was tried to be accessed");
+        }
+    }
+
+    /**
+     * Stops a node by index.
+     *
+     * @param index Node index in the cluster.
+     */
+    public void stopNode(int index) {
+        checkNodeIndex(index);
+
+        IgnitionManager.stop(nodes.get(index).name());
+
+        nodes.set(index, null);
+    }
+
+    /**
+     * Restarts a node by index.
+     *
+     * @param index Node index in the cluster.
+     * @return New node.
+     */
+    public IgniteImpl restartNode(int index) {
+        stopNode(index);
+
+        return startNode(index);
+    }
+
+    /**
+     * Returns {@link RaftGroupService} that is the leader for the given table partition.
+     *
+     * @param tablePartitionId Table partition ID for which a leader is to be found.
+     * @return {@link RaftGroupService} that is the leader for the given table partition.
+     * @throws InterruptedException Thrown if interrupted while waiting for the leader to be found.
+     */
+    public RaftGroupService leaderServiceFor(TablePartitionId tablePartitionId) throws InterruptedException {
+        AtomicReference<RaftGroupService> serviceRef = new AtomicReference<>();
+
+        assertTrue(
+                waitForCondition(() -> {
+                    RaftGroupService service = currentLeaderServiceFor(tablePartitionId);
+
+                    serviceRef.set(service);
+
+                    return service != null;
+                }, 10_000),
+                "Did not find a leader for " + tablePartitionId + " in time"
+        );
+
+        RaftGroupService result = serviceRef.get();
+
+        assertNotNull(result);
+
+        return result;
+    }
+
+    @Nullable
+    private RaftGroupService currentLeaderServiceFor(TablePartitionId tablePartitionId) {
+        return aliveNodes()
+                .map(IgniteImpl.class::cast)
+                .map(ignite -> {
+                    JraftServerImpl server = (JraftServerImpl) ignite.raftManager().server();
+
+                    Optional<RaftNodeId> maybeRaftNodeId = server.localNodes().stream()
+                            .filter(nodeId -> nodeId.groupId().equals(tablePartitionId))
+                            .findAny();
+
+                    return maybeRaftNodeId.map(server::raftGroupService).orElse(null);
+                })
+                .filter(Objects::nonNull)

Review Comment:
   Or, just use "limit(1)" on the original stream and don't use Optional at all. `flatMap` is a cool thing!



##########
modules/raft/src/test/java/org/apache/ignite/internal/raft/LozaTest.java:
##########
@@ -59,7 +61,13 @@ public void testLozaStop() throws Exception {
         Mockito.doReturn(mock(MessagingService.class)).when(clusterNetSvc).messagingService();
         Mockito.doReturn(mock(TopologyService.class)).when(clusterNetSvc).topologyService();
 
-        Loza loza = new Loza(clusterNetSvc, mock(RaftConfiguration.class), workDir, new HybridClockImpl());
+        RaftConfiguration raftConfiguration = mock(RaftConfiguration.class);

Review Comment:
   Same question, maybe we should use configuration extension?



##########
modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java:
##########
@@ -201,6 +201,11 @@ private CompletableFuture<NetworkMessage> invoke0(ClusterNode recipient, Network
             return failedFuture(new NodeStoppingException());
         }
 
+        BiPredicate<String, NetworkMessage> dropMessage = dropMessagePredicate;

Review Comment:
   Is this a test-only functionality in the production code? What the hell is happening here =)
   Please add a big fat comment and maybe a TODO with issue to fix this.



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java:
##########
@@ -0,0 +1,617 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raftsnapshot;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.SessionUtils.executeUpdate;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.hasCause;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BooleanSupplier;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.logging.LogRecord;
+import java.util.logging.Logger;
+import java.util.stream.IntStream;
+import org.apache.calcite.sql.validate.SqlValidatorException;
+import org.apache.ignite.internal.Cluster;
+import org.apache.ignite.internal.Cluster.NodeKnockout;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.raft.RaftNodeId;
+import org.apache.ignite.internal.replicator.exception.ReplicationTimeoutException;
+import org.apache.ignite.internal.storage.pagememory.PersistentPageMemoryStorageEngine;
+import org.apache.ignite.internal.storage.pagememory.VolatilePageMemoryStorageEngine;
+import org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine;
+import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.testframework.jul.NoOpHandler;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.raft.jraft.RaftGroupService;
+import org.apache.ignite.raft.jraft.Status;
+import org.apache.ignite.raft.jraft.core.NodeImpl;
+import org.apache.ignite.raft.jraft.core.Replicator;
+import org.apache.ignite.raft.jraft.entity.PeerId;
+import org.apache.ignite.raft.jraft.error.RaftError;
+import org.apache.ignite.sql.ResultSet;
+import org.apache.ignite.sql.SqlRow;
+import org.apache.ignite.tx.Transaction;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+/**
+ * Tests how RAFT snapshots installation works for table partitions.
+ */
+@SuppressWarnings("resource")
+@ExtendWith(WorkDirectoryExtension.class)
+@Timeout(60)
+// TODO: IGNITE-18465- extend AbstractClusterIntegrationTest
+class ItTableRaftSnapshotsTest {
+    private static final IgniteLogger LOG = Loggers.forClass(ItTableRaftSnapshotsTest.class);
+
+    /**
+     * Nodes bootstrap configuration pattern.
+     *
+     * <p>rpcInstallSnapshotTimeout is changed to 10 seconds so that sporadic snapshot installation failures still
+     * allow tests pass thanks to retries.
+     */
+    private static final String NODE_BOOTSTRAP_CFG = "{\n"
+            + "  \"network\": {\n"

Review Comment:
   I recommend removing quotes where possible. HOCON format allows it, and the code will look cleaner. The amount of lines can be reduced as well by joining some values, like `raft.rpcInstallSnapshotTimeout = 10000`, but it's up to you. I, personally, don't like when text occupies too many lines



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/Cluster.java:
##########
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import org.apache.ignite.IgnitionManager;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.raft.RaftNodeId;
+import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
+import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.lang.IgniteStringFormatter;
+import org.apache.ignite.raft.jraft.RaftGroupService;
+import org.apache.ignite.raft.jraft.util.concurrent.ConcurrentHashSet;
+import org.apache.ignite.sql.Session;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.TestInfo;
+
+/**
+ * Cluster of nodes used for testing.
+ */
+@SuppressWarnings("resource")
+public class Cluster {

Review Comment:
   That's an interesting point. You're correct technically, but it doesn't feel right. I won't ask you to move classes though, that's really pointless for `runner`



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/Cluster.java:
##########
@@ -0,0 +1,404 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import org.apache.ignite.IgnitionManager;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.raft.RaftNodeId;
+import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
+import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.lang.IgniteStringFormatter;
+import org.apache.ignite.raft.jraft.RaftGroupService;
+import org.apache.ignite.raft.jraft.util.concurrent.ConcurrentHashSet;
+import org.apache.ignite.sql.ResultSet;
+import org.apache.ignite.sql.Session;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.TestInfo;
+
+/**
+ * Cluster of nodes used for testing.
+ */
+@SuppressWarnings("resource")
+public class Cluster {
+    private static final IgniteLogger LOG = Loggers.forClass(Cluster.class);
+
+    /** Base port number. */
+    private static final int BASE_PORT = 3344;
+
+    private static final String CONNECT_NODE_ADDR = "\"localhost:" + BASE_PORT + '\"';
+
+    /** Timeout for SQL queries (in milliseconds). */
+    private static final int QUERY_TIMEOUT_MS = 10_000;
+
+    private final TestInfo testInfo;
+
+    private final Path workDir;
+
+    private final String nodeBootstrapConfig;
+
+    /** Cluster nodes. */
+    private final List<IgniteImpl> nodes = new CopyOnWriteArrayList<>();
+
+    private volatile boolean started = false;
+
+    /** Indices of nodes that have been knocked out. */
+    private final Set<Integer> knockedOutNodeIndices = new ConcurrentHashSet<>();
+
+    /**
+     * Creates a new instance.
+     */
+    public Cluster(TestInfo testInfo, Path workDir, String nodeBootstrapConfig) {
+        this.testInfo = testInfo;
+        this.workDir = workDir;
+        this.nodeBootstrapConfig = nodeBootstrapConfig;
+    }
+
+    /**
+     * Starts the cluster with the given number of nodes and initializes it.
+     *
+     * @param nodeCount Number of nodes in the cluster.
+     */
+    public void startAndInit(int nodeCount) {
+        if (started) {
+            throw new IllegalStateException("The cluster is already started");
+        }
+
+        List<CompletableFuture<IgniteImpl>> futures = IntStream.range(0, nodeCount)
+                .mapToObj(this::startClusterNode)
+                .collect(toList());
+
+        String metaStorageAndCmgNodeName = testNodeName(testInfo, 0);
+
+        IgnitionManager.init(metaStorageAndCmgNodeName, List.of(metaStorageAndCmgNodeName), "cluster");
+
+        for (CompletableFuture<IgniteImpl> future : futures) {
+            assertThat(future, willCompleteSuccessfully());
+
+            nodes.add(future.join());
+        }
+
+        started = true;
+    }
+
+    private CompletableFuture<IgniteImpl> startClusterNode(int nodeIndex) {
+        String nodeName = testNodeName(testInfo, nodeIndex);
+
+        String config = IgniteStringFormatter.format(nodeBootstrapConfig, BASE_PORT + nodeIndex, CONNECT_NODE_ADDR);
+
+        return IgnitionManager.start(nodeName, config, workDir.resolve(nodeName))
+                .thenApply(IgniteImpl.class::cast);
+    }
+
+    /**
+     * Returns an Ignite node (a member of the cluster) by its index.
+     */
+    public IgniteImpl node(int index) {
+        return Objects.requireNonNull(nodes.get(index));
+    }
+
+    /**
+     * Returns a node that is not stopped and not knocked out (so it can be used to interact with the cluster).
+     */
+    public IgniteImpl entryNode() {
+        return IntStream.range(0, nodes.size())
+                .filter(index -> nodes.get(index) != null)
+                .filter(index -> !knockedOutNodeIndices.contains(index))
+                .mapToObj(nodes::get)
+                .findAny()
+                .orElseThrow(() -> new IllegalStateException("There is no single alive node that would not be knocked out"));
+    }
+
+    /**
+     * Starts a new node with the given index.
+     *
+     * @param index Node index.
+     * @return Started node (if the cluster is already initialized, the node is returned when it joins the cluster; if it
+     *     is not initialized, the node is returned in a state in which it is ready to join the cluster).
+     */
+    public IgniteImpl startNode(int index) {
+        checkNodeIndex(index);
+
+        IgniteImpl newIgniteNode;
+
+        try {
+            newIgniteNode = startClusterNode(index).get(10, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+
+            throw new RuntimeException(e);
+        } catch (ExecutionException | TimeoutException e) {
+            throw new RuntimeException(e);
+        }
+
+        nodes.set(index, newIgniteNode);
+
+        return newIgniteNode;
+    }
+
+    private void checkNodeIndex(int index) {
+        if (index < 0) {
+            throw new IllegalArgumentException("Index cannot be negative");
+        }
+        if (index >= nodes.size()) {
+            throw new IllegalArgumentException("Cluster only contains " + nodes.size() + " nodes, but node with index "
+                    + index + " was tried to be accessed");
+        }
+    }
+
+    /**
+     * Stops a node by index.
+     *
+     * @param index Node index in the cluster.
+     */
+    public void stopNode(int index) {
+        checkNodeIndex(index);
+
+        IgnitionManager.stop(nodes.get(index).name());
+
+        nodes.set(index, null);
+    }
+
+    /**
+     * Restarts a node by index.
+     *
+     * @param index Node index in the cluster.
+     * @return New node.
+     */
+    public IgniteImpl restartNode(int index) {
+        stopNode(index);
+
+        return startNode(index);
+    }
+
+    /**
+     * Returns {@link RaftGroupService} that is the leader for the given table partition.
+     *
+     * @param tablePartitionId Table partition ID for which a leader is to be found.
+     * @return {@link RaftGroupService} that is the leader for the given table partition.
+     * @throws InterruptedException Thrown if interrupted while waiting for the leader to be found.
+     */
+    public RaftGroupService leaderServiceFor(TablePartitionId tablePartitionId) throws InterruptedException {
+        AtomicReference<RaftGroupService> serviceRef = new AtomicReference<>();
+
+        assertTrue(
+                waitForCondition(() -> {
+                    RaftGroupService service = currentLeaderServiceFor(tablePartitionId);
+
+                    serviceRef.set(service);
+
+                    return service != null;
+                }, 10_000),
+                "Did not find a leader for " + tablePartitionId + " in time"
+        );
+
+        RaftGroupService result = serviceRef.get();
+
+        assertNotNull(result);
+
+        return result;
+    }
+
+    @Nullable
+    private RaftGroupService currentLeaderServiceFor(TablePartitionId tablePartitionId) {
+        return aliveNodes()
+                .map(IgniteImpl.class::cast)
+                .map(ignite -> {
+                    JraftServerImpl server = (JraftServerImpl) ignite.raftManager().server();
+
+                    Optional<RaftNodeId> maybeRaftNodeId = server.localNodes().stream()
+                            .filter(nodeId -> nodeId.groupId().equals(tablePartitionId))
+                            .findAny();
+
+                    return maybeRaftNodeId.map(server::raftGroupService).orElse(null);
+                })
+                .filter(Objects::nonNull)

Review Comment:
   You know, you could have used Optional#stream and flatMap, that way you wouldn't have to filter-out null values or use `orElse`. Maybe that's what you should do, but I don't insist



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org