You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by ni...@apache.org on 2022/09/19 10:00:33 UTC

[bookkeeper] 22/22: Check if channel closed before processing read request (#3486)

This is an automated email from the ASF dual-hosted git repository.

nicoloboschi pushed a commit to branch ds-4.14
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git

commit 99563890aa26851828be8e84b8ef09a86572d6e0
Author: Michael Marshall <mm...@apache.org>
AuthorDate: Mon Sep 19 02:57:51 2022 -0700

    Check if channel closed before processing read request (#3486)
    
    * Check if channel closed before processing read request
    
    * Add missed call to onReadRequestFinish()
    
    * Fix test
    
    * Mock more tests
    
    Co-authored-by: Nicolò Boschi <bo...@gmail.com>
    (cherry picked from commit 1313b8e2e7964a0bbf2e221f49fe30a1fd812d31)
---
 .../main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java  | 7 +++++++
 .../java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java     | 7 +++++++
 .../org/apache/bookkeeper/proto/ForceLedgerProcessorV3Test.java    | 1 +
 .../apache/bookkeeper/proto/LongPollReadEntryProcessorV3Test.java  | 1 +
 .../java/org/apache/bookkeeper/proto/ReadEntryProcessorTest.java   | 1 +
 .../java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java  | 1 +
 .../org/apache/bookkeeper/proto/WriteEntryProcessorV3Test.java     | 1 +
 7 files changed, 19 insertions(+)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
index 60de0440c1..7647e4afef 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
@@ -65,6 +65,13 @@ class ReadEntryProcessor extends PacketProcessorBase<ReadRequest> {
         if (LOG.isDebugEnabled()) {
             LOG.debug("Received new read request: {}", request);
         }
+        if (!channel.isOpen()) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Dropping read request for closed channel: {}", channel);
+            }
+            requestProcessor.onReadRequestFinish();
+            return;
+        }
         int errorCode = BookieProtocol.EOK;
         long startTimeNanos = MathUtils.nowInNano();
         ByteBuf data = null;
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java
index a8ecc11d7e..6b3b624141 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java
@@ -248,6 +248,13 @@ class ReadEntryProcessorV3 extends PacketProcessorBaseV3 {
     public void safeRun() {
         requestProcessor.getRequestStats().getReadEntrySchedulingDelayStats().registerSuccessfulEvent(
             MathUtils.elapsedNanos(enqueueNanos), TimeUnit.NANOSECONDS);
+        if (!channel.isOpen()) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Dropping read request for closed channel: {}", channel);
+            }
+            requestProcessor.onReadRequestFinish();
+            return;
+        }
 
         if (!isVersionCompatible()) {
             ReadResponse readResponse = ReadResponse.newBuilder()
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3Test.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3Test.java
index 1b07fbb4d1..ecb2676bc8 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3Test.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3Test.java
@@ -71,6 +71,7 @@ public class ForceLedgerProcessorV3Test {
                 .build())
             .build();
         channel = mock(Channel.class);
+        when(channel.isOpen()).thenReturn(true);
         bookie = mock(Bookie.class);
         requestProcessor = mock(BookieRequestProcessor.class);
         when(requestProcessor.getBookie()).thenReturn(bookie);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3Test.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3Test.java
index 393d5ddf33..9eae1b9c0d 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3Test.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3Test.java
@@ -83,6 +83,7 @@ public class LongPollReadEntryProcessorV3Test {
             .build();
 
         Channel channel = mock(Channel.class);
+        when(channel.isOpen()).thenReturn(true);
         Bookie bookie = mock(Bookie.class);
 
         BookieRequestProcessor requestProcessor = mock(BookieRequestProcessor.class);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ReadEntryProcessorTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ReadEntryProcessorTest.java
index fcd74b1705..1fd42fd16f 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ReadEntryProcessorTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ReadEntryProcessorTest.java
@@ -60,6 +60,7 @@ public class ReadEntryProcessorTest {
     @Before
     public void setup() throws IOException, BookieException {
         channel = mock(Channel.class);
+        when(channel.isOpen()).thenReturn(true);
         bookie = mock(Bookie.class);
         requestProcessor = mock(BookieRequestProcessor.class);
         when(requestProcessor.getBookie()).thenReturn(bookie);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java
index a69245d4a9..0752c05c32 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java
@@ -64,6 +64,7 @@ public class WriteEntryProcessorTest {
             new byte[0],
             Unpooled.wrappedBuffer("test-entry-data".getBytes(UTF_8)));
         channel = mock(Channel.class);
+        when(channel.isOpen()).thenReturn(true);
         bookie = mock(Bookie.class);
         requestProcessor = mock(BookieRequestProcessor.class);
         when(requestProcessor.getBookie()).thenReturn(bookie);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3Test.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3Test.java
index 7abaa100c8..477d83bb2a 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3Test.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3Test.java
@@ -76,6 +76,7 @@ public class WriteEntryProcessorV3Test {
                 .build())
             .build();
         channel = mock(Channel.class);
+        when(channel.isOpen()).thenReturn(true);
         bookie = mock(Bookie.class);
         requestProcessor = mock(BookieRequestProcessor.class);
         when(requestProcessor.getBookie()).thenReturn(bookie);