You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sk...@apache.org on 2021/10/07 10:43:44 UTC
[ignite-3] branch main updated: IGNITE-15696 Fix NPE in
StripeEntryHandler and logging bug in StripeExceptionHandler. Fixes #385
This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new a21454a IGNITE-15696 Fix NPE in StripeEntryHandler and logging bug in StripeExceptionHandler. Fixes #385
a21454a is described below
commit a21454a23f8b2ab215253f47f459f20dda895c9f
Author: Mirza Aliev <al...@gmail.com>
AuthorDate: Thu Oct 7 13:42:15 2021 +0300
IGNITE-15696 Fix NPE in StripeEntryHandler and logging bug in StripeExceptionHandler. Fixes #385
Signed-off-by: Slava Koptilin <sl...@gmail.com>
---
.../ignite/raft/jraft/core/FSMCallerImpl.java | 3 +--
.../raft/jraft/disruptor/StripedDisruptor.java | 21 +++++++++++----------
.../ignite/raft/jraft/option/RaftOptions.java | 3 +--
.../raft/jraft/storage/impl/LogManagerImpl.java | 3 +--
.../ignite/disruptor/StripedDisruptorTest.java | 12 ++----------
5 files changed, 16 insertions(+), 26 deletions(-)
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/FSMCallerImpl.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/FSMCallerImpl.java
index c23b4e7..4d93213 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/FSMCallerImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/FSMCallerImpl.java
@@ -432,8 +432,7 @@ public class FSMCallerImpl implements FSMCaller {
}
}
try {
- //TODO: IGNITE-15568 This limitation is used until the issue is not fixed.
- if ((node.getOptions().getRaftOptions().getApplyBatch() == 1 || endOfBatch) && maxCommittedIndex >= 0) {
+ if (endOfBatch && maxCommittedIndex >= 0) {
this.currTask = TaskType.COMMITTED;
doCommitted(maxCommittedIndex);
maxCommittedIndex = -1L; // reset maxCommittedIndex
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/disruptor/StripedDisruptor.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/disruptor/StripedDisruptor.java
index 000f424..4269a93 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/disruptor/StripedDisruptor.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/disruptor/StripedDisruptor.java
@@ -176,13 +176,13 @@ public class StripedDisruptor<T extends GroupAware> {
* It routs an event to the event handler for a group.
*/
private class StripeEntryHandler implements EventHandler<T> {
- private final ConcurrentHashMap<String, EventHandler<T>> subscrivers;
+ private final ConcurrentHashMap<String, EventHandler<T>> subscribers;
/**
* The constructor.
*/
StripeEntryHandler() {
- subscrivers = new ConcurrentHashMap<>();
+ subscribers = new ConcurrentHashMap<>();
}
/**
@@ -192,7 +192,7 @@ public class StripedDisruptor<T extends GroupAware> {
* @param handler Event handler for the group specified.
*/
void subscribe(String group, EventHandler<T> handler) {
- subscrivers.put(group, handler);
+ subscribers.put(group, handler);
}
/**
@@ -201,21 +201,22 @@ public class StripedDisruptor<T extends GroupAware> {
* @param group Group id.
*/
void unsubscribe(String group) {
- subscrivers.remove(group);
+ subscribers.remove(group);
}
/** {@inheritDoc} */
@Override public void onEvent(T event, long sequence, boolean endOfBatch) throws Exception {
- EventHandler<T> handler = subscrivers.get(event.groupId());
+ EventHandler<T> handler = subscribers.get(event.groupId());
assert handler != null : format("Group of the event is unsupported [group={}, event={}]", event.groupId(), event);
- handler.onEvent(event, sequence, endOfBatch);
+ //TODO: IGNITE-15568 endOfBatch should be set to true to prevent caching tasks until IGNITE-15568 has fixed.
+ handler.onEvent(event, sequence, true);
}
}
/**
- * Striped disruptor exxception handler.
+ * Striped disruptor exception handler.
* It prints into log when an exception has occurred and route it to the handler for group.
*/
private class StripeExceptionHandler implements ExceptionHandler<T> {
@@ -254,12 +255,12 @@ public class StripedDisruptor<T extends GroupAware> {
/** {@inheritDoc} */
@Override public void handleOnStartException(Throwable ex) {
- LOG.error("Fail to start disruptor [name={}]", name, ex);
+ LOG.error("Fail to start disruptor [name={}]", ex, name);
}
/** {@inheritDoc} */
@Override public void handleOnShutdownException(Throwable ex) {
- LOG.error("Fail to shutdown disruptor [name={}]", name, ex);
+ LOG.error("Fail to shutdown disruptor [name={}]", ex, name);
}
@@ -267,7 +268,7 @@ public class StripedDisruptor<T extends GroupAware> {
@Override public void handleEventException(Throwable ex, long sequence, T event) {
BiConsumer<T, Throwable> handler = subscrivers.get(event.groupId());
- LOG.error("Handle disruptor event error [name={}, event={}, hasHandler={}]", name, event, handler != null, ex);
+ LOG.error("Handle disruptor event error [name={}, event={}, hasHandler={}]", ex, name, event, handler != null);
if (handler != null)
handler.accept(event, ex);
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/RaftOptions.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/RaftOptions.java
index 731d950..0c91980 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/RaftOptions.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/RaftOptions.java
@@ -68,8 +68,7 @@ public class RaftOptions implements Copiable<RaftOptions> {
/**
* Maximum number of tasks that can be applied in a batch
*/
- //TODO: IGNITE-15568 Default value will be returned to 32 after the issue would be fixed.
- private int applyBatch = 1;
+ private int applyBatch = 32;
/**
* Call fsync when need
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerImpl.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerImpl.java
index 3c9d5fc..2f0aa77 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerImpl.java
@@ -565,8 +565,7 @@ public class LogManagerImpl implements LogManager {
}
}
- //TODO: IGNITE-15568 This limitation is used until the issue is not fixed.
- if (nodeOptions.getRaftOptions().getApplyBatch() == 1 || endOfBatch) {
+ if (endOfBatch) {
this.lastId = this.ab.flush();
setDiskId(this.lastId);
}
diff --git a/modules/raft/src/test/java/org/apache/ignite/disruptor/StripedDisruptorTest.java b/modules/raft/src/test/java/org/apache/ignite/disruptor/StripedDisruptorTest.java
index f0cb385..4478176 100644
--- a/modules/raft/src/test/java/org/apache/ignite/disruptor/StripedDisruptorTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/disruptor/StripedDisruptorTest.java
@@ -25,10 +25,8 @@ import org.apache.ignite.internal.testframework.IgniteTestUtils;
import org.apache.ignite.lang.LoggerMessageHelper;
import org.apache.ignite.raft.jraft.disruptor.GroupAware;
import org.apache.ignite.raft.jraft.disruptor.StripedDisruptor;
-import org.apache.ignite.raft.jraft.option.RaftOptions;
import org.junit.jupiter.api.Test;
-import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -36,9 +34,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
* Tests for striped disruptor.
*/
public class StripedDisruptorTest extends IgniteAbstractTest {
- /** Default RAFT options. */
- private static final RaftOptions options = new RaftOptions();
-
/**
* Checks the correctness of disruptor batching in a handler.
* This test creates only one stripe in order to the real Disruptor is shared between two groups.
@@ -47,9 +42,6 @@ public class StripedDisruptorTest extends IgniteAbstractTest {
*/
@Test
public void testDisruptorBatch() throws Exception {
- //TODO: IGNITE-15568 This asserts should be deleted after the issue would be fixed.
- assertEquals(options.getApplyBatch(), 1);
-
StripedDisruptor<GroupAwareTestObj> disruptor = new StripedDisruptor<>("test-disruptor",
16384,
GroupAwareTestObj::new,
@@ -129,10 +121,10 @@ public class StripedDisruptorTest extends IgniteAbstractTest {
int applied = 0;
/** {@inheritDoc} */
- @Override public void onEvent(GroupAwareTestObj event, long sequence, boolean endOfBatch) throws Exception {
+ @Override public void onEvent(GroupAwareTestObj event, long sequence, boolean endOfBatch) {
batch.add(event.num);
- if (endOfBatch || batch.size() >= options.getApplyBatch()) {
+ if (endOfBatch) {
applied += batch.size();
batch.clear();