You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sd...@apache.org on 2022/08/25 10:39:43 UTC
[ignite-3] 01/01: IGNITE-17574 Fix assertion error ECATCHUP expected
This is an automated email from the ASF dual-hosted git repository.
sdanilov pushed a commit to branch ignite-17574
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit 425daec3c1324021b5e0ff76cf16339910c1725a
Author: Semyon Danilov <sa...@yandex.ru>
AuthorDate: Thu Aug 25 14:39:32 2022 +0400
IGNITE-17574 Fix assertion error ECATCHUP expected
---
.../testframework/util/DirectExecutor.java | 101 +++++++++++++++++++++
.../raft/RebalanceRaftGroupEventsListener.java | 11 ++-
.../raft/RebalanceRaftGroupEventsListenerTest.java | 76 ++++++++++++++++
3 files changed, 185 insertions(+), 3 deletions(-)
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/testframework/util/DirectExecutor.java b/modules/core/src/test/java/org/apache/ignite/internal/testframework/util/DirectExecutor.java
new file mode 100644
index 0000000000..84dbdf4a15
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/testframework/util/DirectExecutor.java
@@ -0,0 +1,101 @@
+package org.apache.ignite.internal.testframework.util;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.jetbrains.annotations.NotNull;
+
+/** Executor service that executes tasks within the thread task was submitted in. */
+public class DirectExecutor implements ExecutorService {
+ private final AtomicBoolean isShutdown = new AtomicBoolean();
+
+ @Override
+ public void shutdown() {
+ isShutdown.set(true);
+ }
+
+ @NotNull
+ @Override
+ public List<Runnable> shutdownNow() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public boolean isShutdown() {
+ return isShutdown.get();
+ }
+
+ @Override
+ public boolean isTerminated() {
+ return isShutdown.get();
+ }
+
+ @Override
+ public boolean awaitTermination(long timeout, @NotNull TimeUnit unit) throws InterruptedException {
+ return true;
+ }
+
+ @NotNull
+ @Override
+ public <T> Future<T> submit(@NotNull Callable<T> task) {
+ T result;
+ try {
+ result = task.call();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return CompletableFuture.completedFuture(result);
+ }
+
+ @NotNull
+ @Override
+ public <T> Future<T> submit(@NotNull Runnable task, T result) {
+ task.run();
+ return CompletableFuture.completedFuture(result);
+ }
+
+ @NotNull
+ @Override
+ public Future<?> submit(@NotNull Runnable task) {
+ task.run();
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @NotNull
+ @Override
+ public <T> List<Future<T>> invokeAll(@NotNull Collection<? extends Callable<T>> tasks) throws InterruptedException {
+ throw new UnsupportedOperationException("Not implemented yet");
+ }
+
+ @NotNull
+ @Override
+ public <T> List<Future<T>> invokeAll(@NotNull Collection<? extends Callable<T>> tasks, long timeout, @NotNull TimeUnit unit)
+ throws InterruptedException {
+ throw new UnsupportedOperationException("Not implemented yet");
+ }
+
+ @NotNull
+ @Override
+ public <T> T invokeAny(@NotNull Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
+ throw new UnsupportedOperationException("Not implemented yet");
+ }
+
+ @Override
+ public <T> T invokeAny(@NotNull Collection<? extends Callable<T>> tasks, long timeout, @NotNull TimeUnit unit)
+ throws InterruptedException, ExecutionException, TimeoutException {
+ throw new UnsupportedOperationException("Not implemented yet");
+ }
+
+ @Override
+ public void execute(@NotNull Runnable command) {
+ command.run();
+ }
+}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/RebalanceRaftGroupEventsListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/RebalanceRaftGroupEventsListener.java
index bd562157e5..58e3b37953 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/RebalanceRaftGroupEventsListener.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/RebalanceRaftGroupEventsListener.java
@@ -191,14 +191,19 @@ public class RebalanceRaftGroupEventsListener implements RaftGroupEventsListener
}
try {
- if (status == null) {
- // leader stepped down, so we are expecting RebalanceRaftGroupEventsListener.onLeaderElected to be called on a new leader.
+ assert status != null;
+
+ RaftError raftError = status.getRaftError();
+
+ if (raftError == RaftError.EPERM && "Leader stepped down.".equals(status.getErrorMsg())) {
+ // Leader stepped down, so we are expecting RebalanceRaftGroupEventsListener.onLeaderElected to be called on a new leader.
LOG.info("Leader stepped down during rebalance [partId={}]", partId);
return;
}
- assert status.getRaftError() == RaftError.ECATCHUP : "According to the JRaft protocol, RaftError.ECATCHUP is expected.";
+ assert raftError == RaftError.ECATCHUP : "According to the JRaft protocol, " + RaftError.ECATCHUP
+ + " is expected, got " + raftError;
LOG.debug("Error occurred during rebalance [partId={}]", partId);
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/RebalanceRaftGroupEventsListenerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/RebalanceRaftGroupEventsListenerTest.java
new file mode 100644
index 0000000000..bb729bb6cd
--- /dev/null
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/RebalanceRaftGroupEventsListenerTest.java
@@ -0,0 +1,76 @@
+package org.apache.ignite.internal.table.distributed.raft;
+
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Method;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.raft.server.RaftGroupEventsListener;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.internal.testframework.util.DirectExecutor;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.raft.jraft.Closure;
+import org.apache.ignite.raft.jraft.Status;
+import org.apache.ignite.raft.jraft.core.NodeImpl;
+import org.apache.ignite.raft.jraft.error.RaftError;
+import org.apache.ignite.raft.jraft.option.NodeOptions;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+class RebalanceRaftGroupEventsListenerTest {
+ /**
+ * Tests that {@link RebalanceRaftGroupEventsListener} handles correctly a situation when
+ * ConfigurationCtx#reset is called with null status.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ void onReconfigurationErrorCalledFromResetWithNullStatus() throws Exception {
+ IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+ RaftGroupEventsListener spy = Mockito.spy(new RebalanceRaftGroupEventsListener(null, null,null, 0, busyLock, null, null));
+
+ NodeImpl node = Mockito.mock(NodeImpl.class);
+
+ NodeOptions nodeOptions = new NodeOptions();
+ nodeOptions.setCommonExecutor(new DirectExecutor());
+ nodeOptions.setRaftGrpEvtsLsnr(spy);
+
+ when(node.getOptions()).thenReturn(nodeOptions);
+
+ Class<?> confCtxClass = Class.forName("org.apache.ignite.raft.jraft.core.NodeImpl$ConfigurationCtx");
+
+ Constructor<?> constructor = confCtxClass.getDeclaredConstructor(NodeImpl.class);
+ constructor.setAccessible(true);
+
+ // ConfigurationCtx object.
+ Object confCtx = constructor.newInstance(node);
+
+ var resultFuture = new CompletableFuture<Status>();
+
+ IgniteTestUtils.setFieldValue(confCtx, "done", (Closure) resultFuture::complete);
+
+ Method resetMethod = confCtxClass.getDeclaredMethod("reset", Status.class);
+ resetMethod.setAccessible(true);
+
+ // Execute reset method with null status
+ resetMethod.invoke(confCtx, new Object[]{null});
+
+ Status defaultStatus = new Status(RaftError.EPERM, "Leader stepped down.");
+
+ // onReconfigurationError should not be called with null status but rather with a default status.
+ verify(spy, times(1)).onReconfigurationError(eq(defaultStatus), any(), anyLong());
+
+ // Future should be already done as execution is in the same thread.
+ assertTrue(resultFuture.isDone());
+ assertThat(resultFuture, willBe(defaultStatus));
+ }
+}
\ No newline at end of file