You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ap...@apache.org on 2023/11/13 09:58:08 UTC

(ignite-3) branch main updated: IGNITE-20833 Switch ItTableRaftSnapshotsTest to KV API (#2833)

This is an automated email from the ASF dual-hosted git repository.

apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new ca2dfd115a IGNITE-20833 Switch ItTableRaftSnapshotsTest to KV API (#2833)
ca2dfd115a is described below

commit ca2dfd115a366f91d117ed56b3dbf00171091a12
Author: Roman Puchkovskiy <ro...@gmail.com>
AuthorDate: Mon Nov 13 13:58:03 2023 +0400

    IGNITE-20833 Switch ItTableRaftSnapshotsTest to KV API (#2833)
---
 .../raftsnapshot/ItTableRaftSnapshotsTest.java     | 222 +++++----------------
 1 file changed, 46 insertions(+), 176 deletions(-)

diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
index 82cd9629a7..c9ed4e4b64 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
@@ -18,25 +18,24 @@
 package org.apache.ignite.internal.raftsnapshot;
 
 import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toSet;
 import static org.apache.ignite.internal.SessionUtils.executeUpdate;
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.getFieldValue;
-import static org.apache.ignite.internal.testframework.IgniteTestUtils.hasCause;
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
 import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.everyItem;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.not;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.Assertions.fail;
 
-import java.net.ConnectException;
 import java.nio.file.Path;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
@@ -46,17 +45,10 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiPredicate;
 import java.util.function.Consumer;
-import java.util.function.Function;
-import java.util.function.Predicate;
-import java.util.function.Supplier;
 import java.util.stream.IntStream;
-import org.apache.calcite.sql.validate.SqlValidatorException;
 import org.apache.ignite.internal.Cluster;
 import org.apache.ignite.internal.IgniteIntegrationTest;
 import org.apache.ignite.internal.app.IgniteImpl;
-import org.apache.ignite.internal.lang.IgniteBiTuple;
-import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
-import org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.metastorage.server.raft.MetastorageGroupId;
@@ -64,8 +56,6 @@ import org.apache.ignite.internal.raft.server.RaftServer;
 import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
 import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.replicator.command.SafeTimeSyncCommand;
-import org.apache.ignite.internal.replicator.exception.ReplicationTimeoutException;
-import org.apache.ignite.internal.storage.StorageRebalanceException;
 import org.apache.ignite.internal.storage.pagememory.PersistentPageMemoryStorageEngine;
 import org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine;
 import org.apache.ignite.internal.table.distributed.raft.snapshot.incoming.IncomingSnapshotCopier;
@@ -75,8 +65,6 @@ import org.apache.ignite.internal.testframework.IgniteTestUtils;
 import org.apache.ignite.internal.testframework.WorkDirectory;
 import org.apache.ignite.internal.testframework.log4j2.LogInspector;
 import org.apache.ignite.internal.testframework.log4j2.LogInspector.Handler;
-import org.apache.ignite.lang.ErrorGroups.Sql;
-import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.network.NetworkMessage;
 import org.apache.ignite.raft.jraft.RaftGroupService;
 import org.apache.ignite.raft.jraft.RaftMessagesFactory;
@@ -95,11 +83,11 @@ import org.apache.ignite.raft.jraft.rpc.WriteActionRequest;
 import org.apache.ignite.raft.jraft.rpc.impl.IgniteRpcServer;
 import org.apache.ignite.raft.jraft.rpc.impl.core.AppendEntriesRequestProcessor;
 import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotExecutorImpl;
-import org.apache.ignite.sql.ResultSet;
-import org.apache.ignite.sql.SqlRow;
 import org.apache.ignite.table.KeyValueView;
+import org.apache.ignite.table.Table;
 import org.apache.ignite.table.Tuple;
 import org.apache.ignite.tx.Transaction;
+import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Disabled;
@@ -167,107 +155,6 @@ class ItTableRaftSnapshotsTest extends IgniteIntegrationTest {
         cluster.shutdown();
     }
 
-    /**
-     * Executes the given action, retrying it up to a few times if a transient failure occurs (like node unavailability) or
-     * until {@code shouldStop} returns {@code true}, in that case this method throws {@link UnableToRetry} exception.
-     */
-    private static <T> T withRetry(Supplier<T> action, Predicate<RuntimeException> shouldStop) {
-        // The following allows to retry for up to 16 seconds (we need so much time to account
-        // for a node restart).
-        int maxAttempts = 10;
-        float backoffFactor = 1.3f;
-        int sleepMillis = 500;
-
-        for (int attempt = 1; attempt <= maxAttempts; attempt++) {
-            try {
-                return action.get();
-            } catch (RuntimeException e) {
-                if (shouldStop.test(e)) {
-                    throw new UnableToRetry(e);
-                }
-                if (attempt < maxAttempts && isTransientFailure(e)) {
-                    LOG.warn("Attempt {} failed, going to retry", e, attempt);
-                } else {
-                    LOG.error("Attempt {} failed, not going to retry anymore, rethrowing", e, attempt);
-
-                    throw e;
-                }
-            }
-
-            try {
-                Thread.sleep(sleepMillis);
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-
-                fail("Interrupted while waiting for next attempt");
-            }
-
-            //noinspection NumericCastThatLosesPrecision
-            sleepMillis = (int) (sleepMillis * backoffFactor);
-        }
-
-        throw new AssertionError("Should not reach here");
-    }
-
-    /**
-     * Executes the given UPDATE/INSERT statement until it succeed or receives duplicate key error.
-     */
-    private void executeDmlWithRetry(int nodeIndex, String statement) {
-        // We should retry a DML statement until we either succeed or receive a duplicate key error.
-        // The number of attempts is bounded because we know that node is going to recover.
-        Predicate<RuntimeException> stopOnDuplicateKeyError = (e) -> {
-            if (e instanceof IgniteException) {
-                IgniteException ie = (IgniteException) e;
-                return ie.code() == Sql.CONSTRAINT_VIOLATION_ERR;
-            } else {
-                return false;
-            }
-        };
-
-        try {
-            withRetry(() -> {
-                cluster.doInSession(nodeIndex, session -> {
-                    executeUpdate(statement, session);
-                });
-                return null;
-            }, stopOnDuplicateKeyError);
-
-        } catch (UnableToRetry ignore) {
-            // Duplicate key exception was caught.
-        }
-    }
-
-    private static boolean isTransientFailure(RuntimeException e) {
-        return hasCause(e, ReplicationTimeoutException.class, null)
-                || hasCause(e, IgniteInternalException.class, "Failed to send message to node")
-                || hasCause(e, IgniteInternalCheckedException.class, "Failed to execute query, node left")
-                || hasCause(e, SqlValidatorException.class, "Object 'TEST' not found")
-                // TODO: remove after https://issues.apache.org/jira/browse/IGNITE-18848 is implemented.
-                || hasCause(e, StorageRebalanceException.class, "process of rebalancing")
-                || hasCause(e, ConnectException.class, null);
-    }
-
-    private <T> T queryWithRetry(int nodeIndex, String sql, Function<ResultSet<SqlRow>, T> extractor) {
-        Predicate<RuntimeException> retryForever = (e) -> false;
-        // TODO: IGNITE-18423 remove this retry machinery when the networking bug is fixed as replication timeout seems to be caused by it.
-        return withRetry(() -> cluster.query(nodeIndex, sql, extractor), retryForever);
-    }
-
-    /**
-     * Reads all rows from TEST table.
-     */
-    private static List<IgniteBiTuple<Integer, String>> readRows(ResultSet<SqlRow> rs) {
-        List<IgniteBiTuple<Integer, String>> rows = new ArrayList<>();
-
-        while (rs.hasNext()) {
-            SqlRow sqlRow = rs.next();
-
-            rows.add(new IgniteBiTuple<>(sqlRow.intValue(0), sqlRow.stringValue(1)));
-        }
-
-        return rows;
-    }
-
     /**
      * Tests that a leader successfully feeds a follower with a RAFT snapshot on any of the supported storage engines.
      */
@@ -291,9 +178,11 @@ class ItTableRaftSnapshotsTest extends IgniteIntegrationTest {
 
         transferLeadershipOnSolePartitionTo(2);
 
-        List<IgniteBiTuple<Integer, String>> rows = queryWithRetry(2, "select * from test", ItTableRaftSnapshotsTest::readRows);
+        assertThat(getFromNode(2, 1), is("one"));
+    }
 
-        assertThat(rows, is(List.of(new IgniteBiTuple<>(1, "one"))));
+    private @Nullable String getFromNode(int clusterNode, int key) {
+        return tableViewAt(clusterNode).get(null, key);
     }
 
     private void feedNode2WithSnapshotOfOneRow() throws InterruptedException {
@@ -347,12 +236,25 @@ class ItTableRaftSnapshotsTest extends IgniteIntegrationTest {
 
         knockoutNode(2);
 
-        executeDmlWithRetry(0, "insert into test(key, val) values (1, 'one')");
+        putToNode(0, 1, "one");
 
         // Make sure AppendEntries from leader to follower is impossible, making the leader to use InstallSnapshot.
         causeLogTruncationOnSolePartitionLeader(0);
     }
 
+    private void putToNode(int nodeIndex, int key, String value) {
+        putToNode(nodeIndex, key, value, null);
+    }
+
+    private void putToNode(int nodeIndex, int key, String value, @Nullable Transaction tx) {
+        tableViewAt(nodeIndex).put(tx, key, value);
+    }
+
+    private KeyValueView<Integer, String> tableViewAt(int nodeIndex) {
+        Table table = cluster.node(nodeIndex).tables().table("test");
+        return table.keyValueView(Integer.class, String.class);
+    }
+
     private void knockoutNode(int nodeIndex) {
         cluster.stopNode(nodeIndex);
 
@@ -480,13 +382,11 @@ class ItTableRaftSnapshotsTest extends IgniteIntegrationTest {
 
         Transaction tx = cluster.node(0).transactions().begin();
 
-        cluster.doInSession(0, session -> {
-            executeUpdate("insert into test(key, val) values (1, 'one')", session, tx);
+        putToNode(0, 1, "one", tx);
 
-            knockoutNode(2);
+        knockoutNode(2);
 
-            tx.commit();
-        });
+        tx.commit();
 
         // Make sure AppendEntries from leader to follower is impossible, making the leader to use InstallSnapshot.
         causeLogTruncationOnSolePartitionLeader(0);
@@ -495,9 +395,7 @@ class ItTableRaftSnapshotsTest extends IgniteIntegrationTest {
 
         transferLeadershipOnSolePartitionTo(2);
 
-        List<IgniteBiTuple<Integer, String>> rows = queryWithRetry(2, "select * from test", ItTableRaftSnapshotsTest::readRows);
-
-        assertThat(rows, is(List.of(new IgniteBiTuple<>(1, "one"))));
+        assertThat(getFromNode(2, 1), is("one"));
     }
 
     /**
@@ -507,17 +405,12 @@ class ItTableRaftSnapshotsTest extends IgniteIntegrationTest {
     void entriesKeepAppendedAfterSnapshotInstallation() throws Exception {
         feedNode2WithSnapshotOfOneRow();
 
-        // this should be possibly replaced with executeDmlWithRetry.
-        cluster.doInSession(0, session -> {
-            executeUpdate("insert into test(key, val) values (2, 'two')", session);
-        });
+        putToNode(0, 2, "two");
 
         transferLeadershipOnSolePartitionTo(2);
 
-        List<IgniteBiTuple<Integer, String>> rows = queryWithRetry(0, "select * from test order by key",
-                ItTableRaftSnapshotsTest::readRows);
-
-        assertThat(rows, is(List.of(new IgniteBiTuple<>(1, "one"), new IgniteBiTuple<>(2, "two"))));
+        assertThat(getFromNode(0, 1), is("one"));
+        assertThat(getFromNode(0, 2), is("two"));
     }
 
     /**
@@ -534,12 +427,9 @@ class ItTableRaftSnapshotsTest extends IgniteIntegrationTest {
         AtomicInteger lastLoadedKey = new AtomicInteger();
 
         CompletableFuture<?> loadingFuture = IgniteTestUtils.runAsync(() -> {
-            for (int i = 2; !installedSnapshot.get(); i++) {
-                int key = i;
-                cluster.doInSession(0, session -> {
-                    executeUpdate("insert into test(key, val) values (" + key + ", 'extra')", session);
-                    lastLoadedKey.set(key);
-                });
+            for (int key = 2; !installedSnapshot.get(); key++) {
+                putToNode(0, key, "extra");
+                lastLoadedKey.set(key);
             }
         });
 
@@ -551,10 +441,14 @@ class ItTableRaftSnapshotsTest extends IgniteIntegrationTest {
 
         transferLeadershipOnSolePartitionTo(2);
 
-        List<Integer> keys = queryWithRetry(2, "select * from test order by key", ItTableRaftSnapshotsTest::readRows)
-                .stream().map(IgniteBiTuple::get1).collect(toList());
+        assertThat(getFromNode(2, 1), is("one"));
+
+        List<Integer> expectedKeysAndNextKey = IntStream.rangeClosed(2, lastLoadedKey.get() + 1).boxed().collect(toList());
+        Map<Integer, String> keysToValues = tableViewAt(2).getAll(null, expectedKeysAndNextKey);
 
-        assertThat(keys, equalTo(IntStream.rangeClosed(1, lastLoadedKey.get()).boxed().collect(toList())));
+        Set<Integer> expectedKeys = IntStream.rangeClosed(2, lastLoadedKey.get()).boxed().collect(toSet());
+        assertThat(keysToValues.keySet(), equalTo(expectedKeys));
+        assertThat(keysToValues.values(), everyItem(is("extra")));
     }
 
     /**
@@ -573,9 +467,7 @@ class ItTableRaftSnapshotsTest extends IgniteIntegrationTest {
 
         knockoutNode(0);
 
-        cluster.doInSession(2, session -> {
-            executeUpdate("insert into test(key, val) values (2, 'two')", session);
-        });
+        putToNode(2, 2, "two");
 
         // Make sure AppendEntries from leader to follower is impossible, making the leader to use InstallSnapshot.
         causeLogTruncationOnSolePartitionLeader(2);
@@ -584,10 +476,8 @@ class ItTableRaftSnapshotsTest extends IgniteIntegrationTest {
 
         transferLeadershipOnSolePartitionTo(0);
 
-        List<IgniteBiTuple<Integer, String>> rows = queryWithRetry(0, "select * from test order by key",
-                ItTableRaftSnapshotsTest::readRows);
-
-        assertThat(rows, is(List.of(new IgniteBiTuple<>(1, "one"), new IgniteBiTuple<>(2, "two"))));
+        assertThat(getFromNode(0, 1), is("one"));
+        assertThat(getFromNode(0, 2), is("two"));
     }
 
     /**
@@ -717,9 +607,7 @@ class ItTableRaftSnapshotsTest extends IgniteIntegrationTest {
         assertThat(installSnapshotSuccessfulFuture, willSucceedIn(1, TimeUnit.MINUTES));
 
         // Make sure the rebalancing is complete.
-        List<IgniteBiTuple<Integer, String>> rows = queryWithRetry(2, "select * from test", ItTableRaftSnapshotsTest::readRows);
-
-        assertThat(rows, is(List.of(new IgniteBiTuple<>(1, "one"))));
+        assertThat(getFromNode(2, 1), is("one"));
     }
 
     /**
@@ -773,10 +661,7 @@ class ItTableRaftSnapshotsTest extends IgniteIntegrationTest {
         // Change the leader to node 1.
         transferLeadershipOnSolePartitionTo(1);
 
-        boolean replicated = waitForCondition(() -> {
-            List<IgniteBiTuple<Integer, String>> rows = queryWithRetry(2, "select * from test", ItTableRaftSnapshotsTest::readRows);
-            return rows.size() == 1;
-        }, 20_000);
+        boolean replicated = waitForCondition(() -> getFromNode(2, 1) != null, 20_000);
 
         assertTrue(replicated, "Data has not been replicated to node 2 in time");
 
@@ -784,9 +669,7 @@ class ItTableRaftSnapshotsTest extends IgniteIntegrationTest {
         assertFalse(installSnapshotSuccessfulFuture.isDone());
 
         // Make sure the rebalancing is complete.
-        List<IgniteBiTuple<Integer, String>> rows = queryWithRetry(2, "select * from test", ItTableRaftSnapshotsTest::readRows);
-
-        assertThat(rows, is(List.of(new IgniteBiTuple<>(1, "one"))));
+        assertThat(getFromNode(2, 1), is("one"));
     }
 
     /**
@@ -898,19 +781,6 @@ class ItTableRaftSnapshotsTest extends IgniteIntegrationTest {
         return blockingProcessor;
     }
 
-    /**
-     * This exception is thrown to indicate that an operation can not possibly succeed after some error condition.
-     * For example there is no reason to retry an operation that inserts a certain key after receiving a duplicate key error.
-     */
-    private static final class UnableToRetry extends RuntimeException {
-
-        private static final long serialVersionUID = -504618429083573198L;
-
-        private UnableToRetry(Throwable cause) {
-            super(cause);
-        }
-    }
-
     /**
      * {@link AppendEntriesRequestProcessor} that, when blocking is enabled, blocks all AppendEntriesRequests of
      * the given group (that is, returns EBUSY error code, which makes JRaft repeat them).