You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sd...@apache.org on 2022/08/25 10:39:42 UTC

[ignite-3] branch ignite-17574 created (now 425daec3c1)

This is an automated email from the ASF dual-hosted git repository.

sdanilov pushed a change to branch ignite-17574
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


      at 425daec3c1 IGNITE-17574 Fix assertion error ECATCHUP expected

This branch includes the following new commits:

     new 425daec3c1 IGNITE-17574 Fix assertion error ECATCHUP expected

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[ignite-3] 01/01: IGNITE-17574 Fix assertion error ECATCHUP expected

Posted by sd...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sdanilov pushed a commit to branch ignite-17574
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit 425daec3c1324021b5e0ff76cf16339910c1725a
Author: Semyon Danilov <sa...@yandex.ru>
AuthorDate: Thu Aug 25 14:39:32 2022 +0400

    IGNITE-17574 Fix assertion error ECATCHUP expected
---
 .../testframework/util/DirectExecutor.java         | 101 +++++++++++++++++++++
 .../raft/RebalanceRaftGroupEventsListener.java     |  11 ++-
 .../raft/RebalanceRaftGroupEventsListenerTest.java |  76 ++++++++++++++++
 3 files changed, 185 insertions(+), 3 deletions(-)

diff --git a/modules/core/src/test/java/org/apache/ignite/internal/testframework/util/DirectExecutor.java b/modules/core/src/test/java/org/apache/ignite/internal/testframework/util/DirectExecutor.java
new file mode 100644
index 0000000000..84dbdf4a15
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/testframework/util/DirectExecutor.java
@@ -0,0 +1,101 @@
+package org.apache.ignite.internal.testframework.util;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.jetbrains.annotations.NotNull;
+
+/** Executor service that executes tasks within the thread task was submitted in. */
+public class DirectExecutor implements ExecutorService {
+    private final AtomicBoolean isShutdown = new AtomicBoolean();
+
+    @Override
+    public void shutdown() {
+        isShutdown.set(true);
+    }
+
+    @NotNull
+    @Override
+    public List<Runnable> shutdownNow() {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public boolean isShutdown() {
+        return isShutdown.get();
+    }
+
+    @Override
+    public boolean isTerminated() {
+        return isShutdown.get();
+    }
+
+    @Override
+    public boolean awaitTermination(long timeout, @NotNull TimeUnit unit) throws InterruptedException {
+        return true;
+    }
+
+    @NotNull
+    @Override
+    public <T> Future<T> submit(@NotNull Callable<T> task) {
+        T result;
+        try {
+            result = task.call();
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+        return CompletableFuture.completedFuture(result);
+    }
+
+    @NotNull
+    @Override
+    public <T> Future<T> submit(@NotNull Runnable task, T result) {
+        task.run();
+        return CompletableFuture.completedFuture(result);
+    }
+
+    @NotNull
+    @Override
+    public Future<?> submit(@NotNull Runnable task) {
+        task.run();
+        return CompletableFuture.completedFuture(null);
+    }
+
+    @NotNull
+    @Override
+    public <T> List<Future<T>> invokeAll(@NotNull Collection<? extends Callable<T>> tasks) throws InterruptedException {
+        throw new UnsupportedOperationException("Not implemented yet");
+    }
+
+    @NotNull
+    @Override
+    public <T> List<Future<T>> invokeAll(@NotNull Collection<? extends Callable<T>> tasks, long timeout, @NotNull TimeUnit unit)
+            throws InterruptedException {
+        throw new UnsupportedOperationException("Not implemented yet");
+    }
+
+    @NotNull
+    @Override
+    public <T> T invokeAny(@NotNull Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
+        throw new UnsupportedOperationException("Not implemented yet");
+    }
+
+    @Override
+    public <T> T invokeAny(@NotNull Collection<? extends Callable<T>> tasks, long timeout, @NotNull TimeUnit unit)
+            throws InterruptedException, ExecutionException, TimeoutException {
+        throw new UnsupportedOperationException("Not implemented yet");
+    }
+
+    @Override
+    public void execute(@NotNull Runnable command) {
+        command.run();
+    }
+}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/RebalanceRaftGroupEventsListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/RebalanceRaftGroupEventsListener.java
index bd562157e5..58e3b37953 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/RebalanceRaftGroupEventsListener.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/RebalanceRaftGroupEventsListener.java
@@ -191,14 +191,19 @@ public class RebalanceRaftGroupEventsListener implements RaftGroupEventsListener
         }
 
         try {
-            if (status == null) {
-                // leader stepped down, so we are expecting RebalanceRaftGroupEventsListener.onLeaderElected to be called on a new leader.
+            assert status != null;
+
+            RaftError raftError = status.getRaftError();
+
+            if (raftError == RaftError.EPERM && "Leader stepped down.".equals(status.getErrorMsg())) {
+                // Leader stepped down, so we are expecting RebalanceRaftGroupEventsListener.onLeaderElected to be called on a new leader.
                 LOG.info("Leader stepped down during rebalance [partId={}]", partId);
 
                 return;
             }
 
-            assert status.getRaftError() == RaftError.ECATCHUP : "According to the JRaft protocol, RaftError.ECATCHUP is expected.";
+            assert raftError == RaftError.ECATCHUP : "According to the JRaft protocol, " + RaftError.ECATCHUP
+                    + " is expected, got " + raftError;
 
             LOG.debug("Error occurred during rebalance [partId={}]", partId);
 
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/RebalanceRaftGroupEventsListenerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/RebalanceRaftGroupEventsListenerTest.java
new file mode 100644
index 0000000000..bb729bb6cd
--- /dev/null
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/RebalanceRaftGroupEventsListenerTest.java
@@ -0,0 +1,76 @@
+package org.apache.ignite.internal.table.distributed.raft;
+
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Method;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.raft.server.RaftGroupEventsListener;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.internal.testframework.util.DirectExecutor;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.raft.jraft.Closure;
+import org.apache.ignite.raft.jraft.Status;
+import org.apache.ignite.raft.jraft.core.NodeImpl;
+import org.apache.ignite.raft.jraft.error.RaftError;
+import org.apache.ignite.raft.jraft.option.NodeOptions;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+class RebalanceRaftGroupEventsListenerTest {
+    /**
+     * Tests that {@link RebalanceRaftGroupEventsListener} handles correctly a situation when
+     * ConfigurationCtx#reset is called with null status.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    void onReconfigurationErrorCalledFromResetWithNullStatus() throws Exception {
+        IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+        RaftGroupEventsListener spy = Mockito.spy(new RebalanceRaftGroupEventsListener(null, null,null, 0, busyLock, null, null));
+
+        NodeImpl node = Mockito.mock(NodeImpl.class);
+
+        NodeOptions nodeOptions = new NodeOptions();
+        nodeOptions.setCommonExecutor(new DirectExecutor());
+        nodeOptions.setRaftGrpEvtsLsnr(spy);
+
+        when(node.getOptions()).thenReturn(nodeOptions);
+
+        Class<?> confCtxClass = Class.forName("org.apache.ignite.raft.jraft.core.NodeImpl$ConfigurationCtx");
+
+        Constructor<?> constructor = confCtxClass.getDeclaredConstructor(NodeImpl.class);
+        constructor.setAccessible(true);
+
+        // ConfigurationCtx object.
+        Object confCtx = constructor.newInstance(node);
+
+        var resultFuture = new CompletableFuture<Status>();
+
+        IgniteTestUtils.setFieldValue(confCtx, "done", (Closure) resultFuture::complete);
+
+        Method resetMethod = confCtxClass.getDeclaredMethod("reset", Status.class);
+        resetMethod.setAccessible(true);
+
+        // Execute reset method with null status
+        resetMethod.invoke(confCtx, new Object[]{null});
+
+        Status defaultStatus = new Status(RaftError.EPERM, "Leader stepped down.");
+
+        // onReconfigurationError should not be called with null status but rather with a default status.
+        verify(spy, times(1)).onReconfigurationError(eq(defaultStatus), any(), anyLong());
+
+        // Future should be already done as execution is in the same thread.
+        assertTrue(resultFuture.isDone());
+        assertThat(resultFuture, willBe(defaultStatus));
+    }
+}
\ No newline at end of file