You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ap...@apache.org on 2023/11/13 09:58:08 UTC
(ignite-3) branch main updated: IGNITE-20833 Switch ItTableRaftSnapshotsTest to KV API (#2833)
This is an automated email from the ASF dual-hosted git repository.
apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new ca2dfd115a IGNITE-20833 Switch ItTableRaftSnapshotsTest to KV API (#2833)
ca2dfd115a is described below
commit ca2dfd115a366f91d117ed56b3dbf00171091a12
Author: Roman Puchkovskiy <ro...@gmail.com>
AuthorDate: Mon Nov 13 13:58:03 2023 +0400
IGNITE-20833 Switch ItTableRaftSnapshotsTest to KV API (#2833)
---
.../raftsnapshot/ItTableRaftSnapshotsTest.java | 222 +++++----------------
1 file changed, 46 insertions(+), 176 deletions(-)
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
index 82cd9629a7..c9ed4e4b64 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
@@ -18,25 +18,24 @@
package org.apache.ignite.internal.raftsnapshot;
import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toSet;
import static org.apache.ignite.internal.SessionUtils.executeUpdate;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.getFieldValue;
-import static org.apache.ignite.internal.testframework.IgniteTestUtils.hasCause;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.everyItem;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.Assertions.fail;
-import java.net.ConnectException;
import java.nio.file.Path;
-import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
@@ -46,17 +45,10 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiPredicate;
import java.util.function.Consumer;
-import java.util.function.Function;
-import java.util.function.Predicate;
-import java.util.function.Supplier;
import java.util.stream.IntStream;
-import org.apache.calcite.sql.validate.SqlValidatorException;
import org.apache.ignite.internal.Cluster;
import org.apache.ignite.internal.IgniteIntegrationTest;
import org.apache.ignite.internal.app.IgniteImpl;
-import org.apache.ignite.internal.lang.IgniteBiTuple;
-import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
-import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.metastorage.server.raft.MetastorageGroupId;
@@ -64,8 +56,6 @@ import org.apache.ignite.internal.raft.server.RaftServer;
import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.replicator.command.SafeTimeSyncCommand;
-import org.apache.ignite.internal.replicator.exception.ReplicationTimeoutException;
-import org.apache.ignite.internal.storage.StorageRebalanceException;
import org.apache.ignite.internal.storage.pagememory.PersistentPageMemoryStorageEngine;
import org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine;
import org.apache.ignite.internal.table.distributed.raft.snapshot.incoming.IncomingSnapshotCopier;
@@ -75,8 +65,6 @@ import org.apache.ignite.internal.testframework.IgniteTestUtils;
import org.apache.ignite.internal.testframework.WorkDirectory;
import org.apache.ignite.internal.testframework.log4j2.LogInspector;
import org.apache.ignite.internal.testframework.log4j2.LogInspector.Handler;
-import org.apache.ignite.lang.ErrorGroups.Sql;
-import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.raft.jraft.RaftGroupService;
import org.apache.ignite.raft.jraft.RaftMessagesFactory;
@@ -95,11 +83,11 @@ import org.apache.ignite.raft.jraft.rpc.WriteActionRequest;
import org.apache.ignite.raft.jraft.rpc.impl.IgniteRpcServer;
import org.apache.ignite.raft.jraft.rpc.impl.core.AppendEntriesRequestProcessor;
import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotExecutorImpl;
-import org.apache.ignite.sql.ResultSet;
-import org.apache.ignite.sql.SqlRow;
import org.apache.ignite.table.KeyValueView;
+import org.apache.ignite.table.Table;
import org.apache.ignite.table.Tuple;
import org.apache.ignite.tx.Transaction;
+import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
@@ -167,107 +155,6 @@ class ItTableRaftSnapshotsTest extends IgniteIntegrationTest {
cluster.shutdown();
}
- /**
- * Executes the given action, retrying it up to a few times if a transient failure occurs (like node unavailability) or
- * until {@code shouldStop} returns {@code true}, in that case this method throws {@link UnableToRetry} exception.
- */
- private static <T> T withRetry(Supplier<T> action, Predicate<RuntimeException> shouldStop) {
- // The following allows to retry for up to 16 seconds (we need so much time to account
- // for a node restart).
- int maxAttempts = 10;
- float backoffFactor = 1.3f;
- int sleepMillis = 500;
-
- for (int attempt = 1; attempt <= maxAttempts; attempt++) {
- try {
- return action.get();
- } catch (RuntimeException e) {
- if (shouldStop.test(e)) {
- throw new UnableToRetry(e);
- }
- if (attempt < maxAttempts && isTransientFailure(e)) {
- LOG.warn("Attempt {} failed, going to retry", e, attempt);
- } else {
- LOG.error("Attempt {} failed, not going to retry anymore, rethrowing", e, attempt);
-
- throw e;
- }
- }
-
- try {
- Thread.sleep(sleepMillis);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
-
- fail("Interrupted while waiting for next attempt");
- }
-
- //noinspection NumericCastThatLosesPrecision
- sleepMillis = (int) (sleepMillis * backoffFactor);
- }
-
- throw new AssertionError("Should not reach here");
- }
-
- /**
- * Executes the given UPDATE/INSERT statement until it succeed or receives duplicate key error.
- */
- private void executeDmlWithRetry(int nodeIndex, String statement) {
- // We should retry a DML statement until we either succeed or receive a duplicate key error.
- // The number of attempts is bounded because we know that node is going to recover.
- Predicate<RuntimeException> stopOnDuplicateKeyError = (e) -> {
- if (e instanceof IgniteException) {
- IgniteException ie = (IgniteException) e;
- return ie.code() == Sql.CONSTRAINT_VIOLATION_ERR;
- } else {
- return false;
- }
- };
-
- try {
- withRetry(() -> {
- cluster.doInSession(nodeIndex, session -> {
- executeUpdate(statement, session);
- });
- return null;
- }, stopOnDuplicateKeyError);
-
- } catch (UnableToRetry ignore) {
- // Duplicate key exception was caught.
- }
- }
-
- private static boolean isTransientFailure(RuntimeException e) {
- return hasCause(e, ReplicationTimeoutException.class, null)
- || hasCause(e, IgniteInternalException.class, "Failed to send message to node")
- || hasCause(e, IgniteInternalCheckedException.class, "Failed to execute query, node left")
- || hasCause(e, SqlValidatorException.class, "Object 'TEST' not found")
- // TODO: remove after https://issues.apache.org/jira/browse/IGNITE-18848 is implemented.
- || hasCause(e, StorageRebalanceException.class, "process of rebalancing")
- || hasCause(e, ConnectException.class, null);
- }
-
- private <T> T queryWithRetry(int nodeIndex, String sql, Function<ResultSet<SqlRow>, T> extractor) {
- Predicate<RuntimeException> retryForever = (e) -> false;
- // TODO: IGNITE-18423 remove this retry machinery when the networking bug is fixed as replication timeout seems to be caused by it.
- return withRetry(() -> cluster.query(nodeIndex, sql, extractor), retryForever);
- }
-
- /**
- * Reads all rows from TEST table.
- */
- private static List<IgniteBiTuple<Integer, String>> readRows(ResultSet<SqlRow> rs) {
- List<IgniteBiTuple<Integer, String>> rows = new ArrayList<>();
-
- while (rs.hasNext()) {
- SqlRow sqlRow = rs.next();
-
- rows.add(new IgniteBiTuple<>(sqlRow.intValue(0), sqlRow.stringValue(1)));
- }
-
- return rows;
- }
-
/**
* Tests that a leader successfully feeds a follower with a RAFT snapshot on any of the supported storage engines.
*/
@@ -291,9 +178,11 @@ class ItTableRaftSnapshotsTest extends IgniteIntegrationTest {
transferLeadershipOnSolePartitionTo(2);
- List<IgniteBiTuple<Integer, String>> rows = queryWithRetry(2, "select * from test", ItTableRaftSnapshotsTest::readRows);
+ assertThat(getFromNode(2, 1), is("one"));
+ }
- assertThat(rows, is(List.of(new IgniteBiTuple<>(1, "one"))));
+ private @Nullable String getFromNode(int clusterNode, int key) {
+ return tableViewAt(clusterNode).get(null, key);
}
private void feedNode2WithSnapshotOfOneRow() throws InterruptedException {
@@ -347,12 +236,25 @@ class ItTableRaftSnapshotsTest extends IgniteIntegrationTest {
knockoutNode(2);
- executeDmlWithRetry(0, "insert into test(key, val) values (1, 'one')");
+ putToNode(0, 1, "one");
// Make sure AppendEntries from leader to follower is impossible, making the leader to use InstallSnapshot.
causeLogTruncationOnSolePartitionLeader(0);
}
+ private void putToNode(int nodeIndex, int key, String value) {
+ putToNode(nodeIndex, key, value, null);
+ }
+
+ private void putToNode(int nodeIndex, int key, String value, @Nullable Transaction tx) {
+ tableViewAt(nodeIndex).put(tx, key, value);
+ }
+
+ private KeyValueView<Integer, String> tableViewAt(int nodeIndex) {
+ Table table = cluster.node(nodeIndex).tables().table("test");
+ return table.keyValueView(Integer.class, String.class);
+ }
+
private void knockoutNode(int nodeIndex) {
cluster.stopNode(nodeIndex);
@@ -480,13 +382,11 @@ class ItTableRaftSnapshotsTest extends IgniteIntegrationTest {
Transaction tx = cluster.node(0).transactions().begin();
- cluster.doInSession(0, session -> {
- executeUpdate("insert into test(key, val) values (1, 'one')", session, tx);
+ putToNode(0, 1, "one", tx);
- knockoutNode(2);
+ knockoutNode(2);
- tx.commit();
- });
+ tx.commit();
// Make sure AppendEntries from leader to follower is impossible, making the leader to use InstallSnapshot.
causeLogTruncationOnSolePartitionLeader(0);
@@ -495,9 +395,7 @@ class ItTableRaftSnapshotsTest extends IgniteIntegrationTest {
transferLeadershipOnSolePartitionTo(2);
- List<IgniteBiTuple<Integer, String>> rows = queryWithRetry(2, "select * from test", ItTableRaftSnapshotsTest::readRows);
-
- assertThat(rows, is(List.of(new IgniteBiTuple<>(1, "one"))));
+ assertThat(getFromNode(2, 1), is("one"));
}
/**
@@ -507,17 +405,12 @@ class ItTableRaftSnapshotsTest extends IgniteIntegrationTest {
void entriesKeepAppendedAfterSnapshotInstallation() throws Exception {
feedNode2WithSnapshotOfOneRow();
- // this should be possibly replaced with executeDmlWithRetry.
- cluster.doInSession(0, session -> {
- executeUpdate("insert into test(key, val) values (2, 'two')", session);
- });
+ putToNode(0, 2, "two");
transferLeadershipOnSolePartitionTo(2);
- List<IgniteBiTuple<Integer, String>> rows = queryWithRetry(0, "select * from test order by key",
- ItTableRaftSnapshotsTest::readRows);
-
- assertThat(rows, is(List.of(new IgniteBiTuple<>(1, "one"), new IgniteBiTuple<>(2, "two"))));
+ assertThat(getFromNode(0, 1), is("one"));
+ assertThat(getFromNode(0, 2), is("two"));
}
/**
@@ -534,12 +427,9 @@ class ItTableRaftSnapshotsTest extends IgniteIntegrationTest {
AtomicInteger lastLoadedKey = new AtomicInteger();
CompletableFuture<?> loadingFuture = IgniteTestUtils.runAsync(() -> {
- for (int i = 2; !installedSnapshot.get(); i++) {
- int key = i;
- cluster.doInSession(0, session -> {
- executeUpdate("insert into test(key, val) values (" + key + ", 'extra')", session);
- lastLoadedKey.set(key);
- });
+ for (int key = 2; !installedSnapshot.get(); key++) {
+ putToNode(0, key, "extra");
+ lastLoadedKey.set(key);
}
});
@@ -551,10 +441,14 @@ class ItTableRaftSnapshotsTest extends IgniteIntegrationTest {
transferLeadershipOnSolePartitionTo(2);
- List<Integer> keys = queryWithRetry(2, "select * from test order by key", ItTableRaftSnapshotsTest::readRows)
- .stream().map(IgniteBiTuple::get1).collect(toList());
+ assertThat(getFromNode(2, 1), is("one"));
+
+ List<Integer> expectedKeysAndNextKey = IntStream.rangeClosed(2, lastLoadedKey.get() + 1).boxed().collect(toList());
+ Map<Integer, String> keysToValues = tableViewAt(2).getAll(null, expectedKeysAndNextKey);
- assertThat(keys, equalTo(IntStream.rangeClosed(1, lastLoadedKey.get()).boxed().collect(toList())));
+ Set<Integer> expectedKeys = IntStream.rangeClosed(2, lastLoadedKey.get()).boxed().collect(toSet());
+ assertThat(keysToValues.keySet(), equalTo(expectedKeys));
+ assertThat(keysToValues.values(), everyItem(is("extra")));
}
/**
@@ -573,9 +467,7 @@ class ItTableRaftSnapshotsTest extends IgniteIntegrationTest {
knockoutNode(0);
- cluster.doInSession(2, session -> {
- executeUpdate("insert into test(key, val) values (2, 'two')", session);
- });
+ putToNode(2, 2, "two");
// Make sure AppendEntries from leader to follower is impossible, making the leader to use InstallSnapshot.
causeLogTruncationOnSolePartitionLeader(2);
@@ -584,10 +476,8 @@ class ItTableRaftSnapshotsTest extends IgniteIntegrationTest {
transferLeadershipOnSolePartitionTo(0);
- List<IgniteBiTuple<Integer, String>> rows = queryWithRetry(0, "select * from test order by key",
- ItTableRaftSnapshotsTest::readRows);
-
- assertThat(rows, is(List.of(new IgniteBiTuple<>(1, "one"), new IgniteBiTuple<>(2, "two"))));
+ assertThat(getFromNode(0, 1), is("one"));
+ assertThat(getFromNode(0, 2), is("two"));
}
/**
@@ -717,9 +607,7 @@ class ItTableRaftSnapshotsTest extends IgniteIntegrationTest {
assertThat(installSnapshotSuccessfulFuture, willSucceedIn(1, TimeUnit.MINUTES));
// Make sure the rebalancing is complete.
- List<IgniteBiTuple<Integer, String>> rows = queryWithRetry(2, "select * from test", ItTableRaftSnapshotsTest::readRows);
-
- assertThat(rows, is(List.of(new IgniteBiTuple<>(1, "one"))));
+ assertThat(getFromNode(2, 1), is("one"));
}
/**
@@ -773,10 +661,7 @@ class ItTableRaftSnapshotsTest extends IgniteIntegrationTest {
// Change the leader to node 1.
transferLeadershipOnSolePartitionTo(1);
- boolean replicated = waitForCondition(() -> {
- List<IgniteBiTuple<Integer, String>> rows = queryWithRetry(2, "select * from test", ItTableRaftSnapshotsTest::readRows);
- return rows.size() == 1;
- }, 20_000);
+ boolean replicated = waitForCondition(() -> getFromNode(2, 1) != null, 20_000);
assertTrue(replicated, "Data has not been replicated to node 2 in time");
@@ -784,9 +669,7 @@ class ItTableRaftSnapshotsTest extends IgniteIntegrationTest {
assertFalse(installSnapshotSuccessfulFuture.isDone());
// Make sure the rebalancing is complete.
- List<IgniteBiTuple<Integer, String>> rows = queryWithRetry(2, "select * from test", ItTableRaftSnapshotsTest::readRows);
-
- assertThat(rows, is(List.of(new IgniteBiTuple<>(1, "one"))));
+ assertThat(getFromNode(2, 1), is("one"));
}
/**
@@ -898,19 +781,6 @@ class ItTableRaftSnapshotsTest extends IgniteIntegrationTest {
return blockingProcessor;
}
- /**
- * This exception is thrown to indicate that an operation can not possibly succeed after some error condition.
- * For example there is no reason to retry an operation that inserts a certain key after receiving a duplicate key error.
- */
- private static final class UnableToRetry extends RuntimeException {
-
- private static final long serialVersionUID = -504618429083573198L;
-
- private UnableToRetry(Throwable cause) {
- super(cause);
- }
- }
-
/**
* {@link AppendEntriesRequestProcessor} that, when blocking is enabled, blocks all AppendEntriesRequests of
* the given group (that is, returns EBUSY error code, which makes JRaft repeat them).