You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sd...@apache.org on 2022/08/25 10:39:43 UTC

[ignite-3] 01/01: IGNITE-17574 Fix assertion error ECATCHUP expected

This is an automated email from the ASF dual-hosted git repository.

sdanilov pushed a commit to branch ignite-17574
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit 425daec3c1324021b5e0ff76cf16339910c1725a
Author: Semyon Danilov <sa...@yandex.ru>
AuthorDate: Thu Aug 25 14:39:32 2022 +0400

    IGNITE-17574 Fix assertion error ECATCHUP expected
---
 .../testframework/util/DirectExecutor.java         | 101 +++++++++++++++++++++
 .../raft/RebalanceRaftGroupEventsListener.java     |  11 ++-
 .../raft/RebalanceRaftGroupEventsListenerTest.java |  76 ++++++++++++++++
 3 files changed, 185 insertions(+), 3 deletions(-)

diff --git a/modules/core/src/test/java/org/apache/ignite/internal/testframework/util/DirectExecutor.java b/modules/core/src/test/java/org/apache/ignite/internal/testframework/util/DirectExecutor.java
new file mode 100644
index 0000000000..84dbdf4a15
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/testframework/util/DirectExecutor.java
@@ -0,0 +1,101 @@
+package org.apache.ignite.internal.testframework.util;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.jetbrains.annotations.NotNull;
+
+/** Executor service that executes tasks within the thread task was submitted in. */
+public class DirectExecutor implements ExecutorService {
+    private final AtomicBoolean isShutdown = new AtomicBoolean();
+
+    @Override
+    public void shutdown() {
+        isShutdown.set(true);
+    }
+
+    @NotNull
+    @Override
+    public List<Runnable> shutdownNow() {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public boolean isShutdown() {
+        return isShutdown.get();
+    }
+
+    @Override
+    public boolean isTerminated() {
+        return isShutdown.get();
+    }
+
+    @Override
+    public boolean awaitTermination(long timeout, @NotNull TimeUnit unit) throws InterruptedException {
+        return true;
+    }
+
+    @NotNull
+    @Override
+    public <T> Future<T> submit(@NotNull Callable<T> task) {
+        T result;
+        try {
+            result = task.call();
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+        return CompletableFuture.completedFuture(result);
+    }
+
+    @NotNull
+    @Override
+    public <T> Future<T> submit(@NotNull Runnable task, T result) {
+        task.run();
+        return CompletableFuture.completedFuture(result);
+    }
+
+    @NotNull
+    @Override
+    public Future<?> submit(@NotNull Runnable task) {
+        task.run();
+        return CompletableFuture.completedFuture(null);
+    }
+
+    @NotNull
+    @Override
+    public <T> List<Future<T>> invokeAll(@NotNull Collection<? extends Callable<T>> tasks) throws InterruptedException {
+        throw new UnsupportedOperationException("Not implemented yet");
+    }
+
+    @NotNull
+    @Override
+    public <T> List<Future<T>> invokeAll(@NotNull Collection<? extends Callable<T>> tasks, long timeout, @NotNull TimeUnit unit)
+            throws InterruptedException {
+        throw new UnsupportedOperationException("Not implemented yet");
+    }
+
+    @NotNull
+    @Override
+    public <T> T invokeAny(@NotNull Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
+        throw new UnsupportedOperationException("Not implemented yet");
+    }
+
+    @Override
+    public <T> T invokeAny(@NotNull Collection<? extends Callable<T>> tasks, long timeout, @NotNull TimeUnit unit)
+            throws InterruptedException, ExecutionException, TimeoutException {
+        throw new UnsupportedOperationException("Not implemented yet");
+    }
+
+    @Override
+    public void execute(@NotNull Runnable command) {
+        command.run();
+    }
+}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/RebalanceRaftGroupEventsListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/RebalanceRaftGroupEventsListener.java
index bd562157e5..58e3b37953 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/RebalanceRaftGroupEventsListener.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/RebalanceRaftGroupEventsListener.java
@@ -191,14 +191,19 @@ public class RebalanceRaftGroupEventsListener implements RaftGroupEventsListener
         }
 
         try {
-            if (status == null) {
-                // leader stepped down, so we are expecting RebalanceRaftGroupEventsListener.onLeaderElected to be called on a new leader.
+            assert status != null;
+
+            RaftError raftError = status.getRaftError();
+
+            if (raftError == RaftError.EPERM && "Leader stepped down.".equals(status.getErrorMsg())) {
+                // Leader stepped down, so we are expecting RebalanceRaftGroupEventsListener.onLeaderElected to be called on a new leader.
                 LOG.info("Leader stepped down during rebalance [partId={}]", partId);
 
                 return;
             }
 
-            assert status.getRaftError() == RaftError.ECATCHUP : "According to the JRaft protocol, RaftError.ECATCHUP is expected.";
+            assert raftError == RaftError.ECATCHUP : "According to the JRaft protocol, " + RaftError.ECATCHUP
+                    + " is expected, got " + raftError;
 
             LOG.debug("Error occurred during rebalance [partId={}]", partId);
 
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/RebalanceRaftGroupEventsListenerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/RebalanceRaftGroupEventsListenerTest.java
new file mode 100644
index 0000000000..bb729bb6cd
--- /dev/null
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/RebalanceRaftGroupEventsListenerTest.java
@@ -0,0 +1,76 @@
+package org.apache.ignite.internal.table.distributed.raft;
+
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Method;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.raft.server.RaftGroupEventsListener;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.internal.testframework.util.DirectExecutor;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.raft.jraft.Closure;
+import org.apache.ignite.raft.jraft.Status;
+import org.apache.ignite.raft.jraft.core.NodeImpl;
+import org.apache.ignite.raft.jraft.error.RaftError;
+import org.apache.ignite.raft.jraft.option.NodeOptions;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+class RebalanceRaftGroupEventsListenerTest {
+    /**
+     * Tests that {@link RebalanceRaftGroupEventsListener} handles correctly a situation when
+     * ConfigurationCtx#reset is called with null status.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    void onReconfigurationErrorCalledFromResetWithNullStatus() throws Exception {
+        IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+        RaftGroupEventsListener spy = Mockito.spy(new RebalanceRaftGroupEventsListener(null, null,null, 0, busyLock, null, null));
+
+        NodeImpl node = Mockito.mock(NodeImpl.class);
+
+        NodeOptions nodeOptions = new NodeOptions();
+        nodeOptions.setCommonExecutor(new DirectExecutor());
+        nodeOptions.setRaftGrpEvtsLsnr(spy);
+
+        when(node.getOptions()).thenReturn(nodeOptions);
+
+        Class<?> confCtxClass = Class.forName("org.apache.ignite.raft.jraft.core.NodeImpl$ConfigurationCtx");
+
+        Constructor<?> constructor = confCtxClass.getDeclaredConstructor(NodeImpl.class);
+        constructor.setAccessible(true);
+
+        // ConfigurationCtx object.
+        Object confCtx = constructor.newInstance(node);
+
+        var resultFuture = new CompletableFuture<Status>();
+
+        IgniteTestUtils.setFieldValue(confCtx, "done", (Closure) resultFuture::complete);
+
+        Method resetMethod = confCtxClass.getDeclaredMethod("reset", Status.class);
+        resetMethod.setAccessible(true);
+
+        // Execute reset method with null status
+        resetMethod.invoke(confCtx, new Object[]{null});
+
+        Status defaultStatus = new Status(RaftError.EPERM, "Leader stepped down.");
+
+        // onReconfigurationError should not be called with null status but rather with a default status.
+        verify(spy, times(1)).onReconfigurationError(eq(defaultStatus), any(), anyLong());
+
+        // Future should be already done as execution is in the same thread.
+        assertTrue(resultFuture.isDone());
+        assertThat(resultFuture, willBe(defaultStatus));
+    }
+}
\ No newline at end of file