You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sk...@apache.org on 2021/10/07 10:43:44 UTC

[ignite-3] branch main updated: IGNITE-15696 Fix NPE in StripeEntryHandler and logging bug in StripeExceptionHandler. Fixes #385

This is an automated email from the ASF dual-hosted git repository.

sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new a21454a  IGNITE-15696 Fix NPE in StripeEntryHandler and logging bug in StripeExceptionHandler. Fixes #385
a21454a is described below

commit a21454a23f8b2ab215253f47f459f20dda895c9f
Author: Mirza Aliev <al...@gmail.com>
AuthorDate: Thu Oct 7 13:42:15 2021 +0300

    IGNITE-15696 Fix NPE in StripeEntryHandler and logging bug in StripeExceptionHandler. Fixes #385
    
    Signed-off-by: Slava Koptilin <sl...@gmail.com>
---
 .../ignite/raft/jraft/core/FSMCallerImpl.java       |  3 +--
 .../raft/jraft/disruptor/StripedDisruptor.java      | 21 +++++++++++----------
 .../ignite/raft/jraft/option/RaftOptions.java       |  3 +--
 .../raft/jraft/storage/impl/LogManagerImpl.java     |  3 +--
 .../ignite/disruptor/StripedDisruptorTest.java      | 12 ++----------
 5 files changed, 16 insertions(+), 26 deletions(-)

diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/FSMCallerImpl.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/FSMCallerImpl.java
index c23b4e7..4d93213 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/FSMCallerImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/FSMCallerImpl.java
@@ -432,8 +432,7 @@ public class FSMCallerImpl implements FSMCaller {
             }
         }
         try {
-            //TODO: IGNITE-15568 This limitation is used until the issue is not fixed.
-            if ((node.getOptions().getRaftOptions().getApplyBatch() == 1 || endOfBatch) && maxCommittedIndex >= 0) {
+            if (endOfBatch && maxCommittedIndex >= 0) {
                 this.currTask = TaskType.COMMITTED;
                 doCommitted(maxCommittedIndex);
                 maxCommittedIndex = -1L; // reset maxCommittedIndex
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/disruptor/StripedDisruptor.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/disruptor/StripedDisruptor.java
index 000f424..4269a93 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/disruptor/StripedDisruptor.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/disruptor/StripedDisruptor.java
@@ -176,13 +176,13 @@ public class StripedDisruptor<T extends GroupAware> {
      * It routs an event to the event handler for a group.
      */
     private class StripeEntryHandler implements EventHandler<T> {
-        private final ConcurrentHashMap<String, EventHandler<T>> subscrivers;
+        private final ConcurrentHashMap<String, EventHandler<T>> subscribers;
 
         /**
          * The constructor.
          */
         StripeEntryHandler() {
-            subscrivers = new ConcurrentHashMap<>();
+            subscribers = new ConcurrentHashMap<>();
         }
 
         /**
@@ -192,7 +192,7 @@ public class StripedDisruptor<T extends GroupAware> {
          * @param handler Event handler for the group specified.
          */
         void subscribe(String group, EventHandler<T> handler) {
-            subscrivers.put(group, handler);
+            subscribers.put(group, handler);
         }
 
         /**
@@ -201,21 +201,22 @@ public class StripedDisruptor<T extends GroupAware> {
          * @param group Group id.
          */
         void unsubscribe(String group) {
-            subscrivers.remove(group);
+            subscribers.remove(group);
         }
 
         /** {@inheritDoc} */
         @Override public void onEvent(T event, long sequence, boolean endOfBatch) throws Exception {
-            EventHandler<T> handler = subscrivers.get(event.groupId());
+            EventHandler<T> handler = subscribers.get(event.groupId());
 
             assert handler != null : format("Group of the event is unsupported [group={}, event={}]", event.groupId(), event);
 
-            handler.onEvent(event, sequence, endOfBatch);
+            //TODO: IGNITE-15568 endOfBatch should be set to true to prevent caching tasks until IGNITE-15568 has fixed.
+            handler.onEvent(event, sequence, true);
         }
     }
 
     /**
-     * Striped disruptor exxception handler.
+     * Striped disruptor exception handler.
      * It prints into log when an exception has occurred and route it to the handler for group.
      */
     private class StripeExceptionHandler implements ExceptionHandler<T> {
@@ -254,12 +255,12 @@ public class StripedDisruptor<T extends GroupAware> {
 
         /** {@inheritDoc} */
         @Override public void handleOnStartException(Throwable ex) {
-            LOG.error("Fail to start disruptor [name={}]", name, ex);
+            LOG.error("Fail to start disruptor [name={}]", ex, name);
         }
 
         /** {@inheritDoc} */
         @Override public void handleOnShutdownException(Throwable ex) {
-            LOG.error("Fail to shutdown disruptor [name={}]", name, ex);
+            LOG.error("Fail to shutdown disruptor [name={}]", ex, name);
 
         }
 
@@ -267,7 +268,7 @@ public class StripedDisruptor<T extends GroupAware> {
         @Override public void handleEventException(Throwable ex, long sequence, T event) {
             BiConsumer<T, Throwable> handler = subscrivers.get(event.groupId());
 
-            LOG.error("Handle disruptor event error [name={}, event={}, hasHandler={}]", name, event, handler != null, ex);
+            LOG.error("Handle disruptor event error [name={}, event={}, hasHandler={}]", ex, name, event, handler != null);
 
             if (handler != null)
                 handler.accept(event, ex);
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/RaftOptions.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/RaftOptions.java
index 731d950..0c91980 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/RaftOptions.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/RaftOptions.java
@@ -68,8 +68,7 @@ public class RaftOptions implements Copiable<RaftOptions> {
     /**
      * Maximum number of tasks that can be applied in a batch
      */
-    //TODO: IGNITE-15568 Default value will be returned to 32 after the issue would be fixed.
-    private int applyBatch = 1;
+    private int applyBatch = 32;
 
     /**
      * Call fsync when need
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerImpl.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerImpl.java
index 3c9d5fc..2f0aa77 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerImpl.java
@@ -565,8 +565,7 @@ public class LogManagerImpl implements LogManager {
                 }
             }
 
-            //TODO: IGNITE-15568 This limitation is used until the issue is not fixed.
-            if (nodeOptions.getRaftOptions().getApplyBatch() == 1 || endOfBatch) {
+            if (endOfBatch) {
                 this.lastId = this.ab.flush();
                 setDiskId(this.lastId);
             }
diff --git a/modules/raft/src/test/java/org/apache/ignite/disruptor/StripedDisruptorTest.java b/modules/raft/src/test/java/org/apache/ignite/disruptor/StripedDisruptorTest.java
index f0cb385..4478176 100644
--- a/modules/raft/src/test/java/org/apache/ignite/disruptor/StripedDisruptorTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/disruptor/StripedDisruptorTest.java
@@ -25,10 +25,8 @@ import org.apache.ignite.internal.testframework.IgniteTestUtils;
 import org.apache.ignite.lang.LoggerMessageHelper;
 import org.apache.ignite.raft.jraft.disruptor.GroupAware;
 import org.apache.ignite.raft.jraft.disruptor.StripedDisruptor;
-import org.apache.ignite.raft.jraft.option.RaftOptions;
 import org.junit.jupiter.api.Test;
 
-import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertSame;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -36,9 +34,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
  * Tests for striped disruptor.
  */
 public class StripedDisruptorTest extends IgniteAbstractTest {
-    /** Default RAFT options. */
-    private static final RaftOptions options = new RaftOptions();
-
     /**
      * Checks the correctness of disruptor batching in a handler.
      * This test creates only one stripe in order to the real Disruptor is shared between two groups.
@@ -47,9 +42,6 @@ public class StripedDisruptorTest extends IgniteAbstractTest {
      */
     @Test
     public void testDisruptorBatch() throws Exception {
-        //TODO: IGNITE-15568 This asserts should be deleted after the issue would be fixed.
-        assertEquals(options.getApplyBatch(), 1);
-
         StripedDisruptor<GroupAwareTestObj> disruptor = new StripedDisruptor<>("test-disruptor",
             16384,
             GroupAwareTestObj::new,
@@ -129,10 +121,10 @@ public class StripedDisruptorTest extends IgniteAbstractTest {
         int applied = 0;
 
         /** {@inheritDoc} */
-        @Override public void onEvent(GroupAwareTestObj event, long sequence, boolean endOfBatch) throws Exception {
+        @Override public void onEvent(GroupAwareTestObj event, long sequence, boolean endOfBatch) {
             batch.add(event.num);
 
-            if (endOfBatch || batch.size() >= options.getApplyBatch()) {
+            if (endOfBatch) {
                 applied += batch.size();
 
                 batch.clear();