You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sd...@apache.org on 2022/08/25 20:59:47 UTC

[ignite-3] branch main updated: IGNITE-17574 Fix assertion error ECATCHUP expected (#1036)

This is an automated email from the ASF dual-hosted git repository.

sdanilov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 6b431b9557 IGNITE-17574 Fix assertion error ECATCHUP expected (#1036)
6b431b9557 is described below

commit 6b431b955743b3dd7903fbffbecda3122c4a23b1
Author: Semyon Danilov <sa...@yandex.ru>
AuthorDate: Fri Aug 26 00:59:42 2022 +0400

    IGNITE-17574 Fix assertion error ECATCHUP expected (#1036)
---
 .../testframework/util/DirectExecutor.java         | 109 +++++++++++++++++++++
 .../apache/ignite/raft/jraft/core/NodeImpl.java    |   5 +-
 .../raft/RebalanceRaftGroupEventsListener.java     |  12 ++-
 .../raft/RebalanceRaftGroupEventsListenerTest.java |  93 ++++++++++++++++++
 4 files changed, 214 insertions(+), 5 deletions(-)

diff --git a/modules/core/src/test/java/org/apache/ignite/internal/testframework/util/DirectExecutor.java b/modules/core/src/test/java/org/apache/ignite/internal/testframework/util/DirectExecutor.java
new file mode 100644
index 0000000000..449ddaf39c
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/testframework/util/DirectExecutor.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.testframework.util;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/** Executor service that executes tasks within the thread task was submitted in. */
+public class DirectExecutor implements ExecutorService {
+    private final AtomicBoolean isShutdown = new AtomicBoolean();
+
+    @Override
+    public void shutdown() {
+        isShutdown.set(true);
+    }
+
+    @Override
+    public List<Runnable> shutdownNow() {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public boolean isShutdown() {
+        return isShutdown.get();
+    }
+
+    @Override
+    public boolean isTerminated() {
+        return isShutdown.get();
+    }
+
+    @Override
+    public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+        return true;
+    }
+
+    @Override
+    public <T> Future<T> submit(Callable<T> task) {
+        T result;
+        try {
+            result = task.call();
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+        return CompletableFuture.completedFuture(result);
+    }
+
+    @Override
+    public <T> Future<T> submit(Runnable task, T result) {
+        task.run();
+        return CompletableFuture.completedFuture(result);
+    }
+
+    @Override
+    public Future<?> submit(Runnable task) {
+        task.run();
+        return CompletableFuture.completedFuture(null);
+    }
+
+    @Override
+    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
+        throw new UnsupportedOperationException("Not implemented yet");
+    }
+
+    @Override
+    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
+        throw new UnsupportedOperationException("Not implemented yet");
+    }
+
+    @Override
+    public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
+        throw new UnsupportedOperationException("Not implemented yet");
+    }
+
+    @Override
+    public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
+            throws InterruptedException, ExecutionException, TimeoutException {
+        throw new UnsupportedOperationException("Not implemented yet");
+    }
+
+    @Override
+    public void execute(Runnable command) {
+        command.run();
+    }
+}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
index c7565cb030..08e77d428d 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
@@ -127,6 +127,8 @@ import org.apache.ignite.raft.jraft.util.concurrent.LongHeldDetectingReadWriteLo
 public class NodeImpl implements Node, RaftServerService {
     private static final IgniteLogger LOG = Loggers.forClass(NodeImpl.class);
 
+    public static final Status LEADER_STEPPED_DOWN = new Status(RaftError.EPERM, "Leader stepped down.");
+
     // Max retry times when applying tasks.
     private static final int MAX_APPLY_RETRY_TIMES = 3;
 
@@ -456,8 +458,7 @@ public class NodeImpl implements Node, RaftServerService {
                 };
 
                 // TODO: in case of changePeerAsync this invocation is useless as far as we have already sent OK response in done closure.
-                Utils.runClosureInThread(this.node.getOptions().getCommonExecutor(), newDone, st != null ? st :
-                        new Status(RaftError.EPERM, "Leader stepped down."));
+                Utils.runClosureInThread(this.node.getOptions().getCommonExecutor(), newDone, st != null ? st : LEADER_STEPPED_DOWN);
                 this.done = null;
             }
         }
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/RebalanceRaftGroupEventsListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/RebalanceRaftGroupEventsListener.java
index bd562157e5..e2e8c2f871 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/RebalanceRaftGroupEventsListener.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/RebalanceRaftGroupEventsListener.java
@@ -25,6 +25,7 @@ import static org.apache.ignite.internal.metastorage.client.Operations.remove;
 import static org.apache.ignite.internal.utils.RebalanceUtil.pendingPartAssignmentsKey;
 import static org.apache.ignite.internal.utils.RebalanceUtil.plannedPartAssignmentsKey;
 import static org.apache.ignite.internal.utils.RebalanceUtil.stablePartAssignmentsKey;
+import static org.apache.ignite.raft.jraft.core.NodeImpl.LEADER_STEPPED_DOWN;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -191,14 +192,19 @@ public class RebalanceRaftGroupEventsListener implements RaftGroupEventsListener
         }
 
         try {
-            if (status == null) {
-                // leader stepped down, so we are expecting RebalanceRaftGroupEventsListener.onLeaderElected to be called on a new leader.
+            assert status != null;
+
+            if (status.equals(LEADER_STEPPED_DOWN)) {
+                // Leader stepped down, so we are expecting RebalanceRaftGroupEventsListener.onLeaderElected to be called on a new leader.
                 LOG.info("Leader stepped down during rebalance [partId={}]", partId);
 
                 return;
             }
 
-            assert status.getRaftError() == RaftError.ECATCHUP : "According to the JRaft protocol, RaftError.ECATCHUP is expected.";
+            RaftError raftError = status.getRaftError();
+
+            assert raftError == RaftError.ECATCHUP : "According to the JRaft protocol, " + RaftError.ECATCHUP
+                    + " is expected, got " + raftError;
 
             LOG.debug("Error occurred during rebalance [partId={}]", partId);
 
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/RebalanceRaftGroupEventsListenerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/RebalanceRaftGroupEventsListenerTest.java
new file mode 100644
index 0000000000..7a15981de5
--- /dev/null
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/RebalanceRaftGroupEventsListenerTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.raft;
+
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
+import static org.apache.ignite.raft.jraft.core.NodeImpl.LEADER_STEPPED_DOWN;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Method;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.raft.server.RaftGroupEventsListener;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.internal.testframework.util.DirectExecutor;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.raft.jraft.Closure;
+import org.apache.ignite.raft.jraft.Status;
+import org.apache.ignite.raft.jraft.core.NodeImpl;
+import org.apache.ignite.raft.jraft.option.NodeOptions;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+class RebalanceRaftGroupEventsListenerTest {
+    /**
+     * Tests that {@link RebalanceRaftGroupEventsListener} handles correctly a situation when
+     * ConfigurationCtx#reset is called with null status.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    void testOnReconfigurationErrorCalledFromResetWithNullStatus() throws Exception {
+        IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+        RaftGroupEventsListener spy = Mockito.spy(new RebalanceRaftGroupEventsListener(null, null, null, 0, busyLock, null, null));
+
+        NodeImpl node = Mockito.mock(NodeImpl.class);
+
+        NodeOptions nodeOptions = new NodeOptions();
+        nodeOptions.setCommonExecutor(new DirectExecutor());
+        nodeOptions.setRaftGrpEvtsLsnr(spy);
+
+        when(node.getOptions()).thenReturn(nodeOptions);
+
+        Class<?> confCtxClass = Class.forName("org.apache.ignite.raft.jraft.core.NodeImpl$ConfigurationCtx");
+
+        Constructor<?> constructor = confCtxClass.getDeclaredConstructor(NodeImpl.class);
+        constructor.setAccessible(true);
+
+        // ConfigurationCtx object.
+        Object confCtx = constructor.newInstance(node);
+
+        var resultFuture = new CompletableFuture<Status>();
+
+        IgniteTestUtils.setFieldValue(confCtx, "done", (Closure) resultFuture::complete);
+
+        Method resetMethod = confCtxClass.getDeclaredMethod("reset", Status.class);
+        resetMethod.setAccessible(true);
+
+        // Execute reset method with null status
+        resetMethod.invoke(confCtx, new Object[]{null});
+
+        Status defaultStatus = LEADER_STEPPED_DOWN;
+
+        // onReconfigurationError should not be called with null status but rather with a default status.
+        verify(spy, times(1)).onReconfigurationError(eq(defaultStatus), any(), anyLong());
+
+        // Future should be already done as execution is in the same thread.
+        assertTrue(resultFuture.isDone());
+        assertThat(resultFuture, willBe(defaultStatus));
+    }
+}