You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by ni...@apache.org on 2022/09/20 08:12:45 UTC
[bookkeeper] 22/25: Check if channel closed before processing read request (#3486)
This is an automated email from the ASF dual-hosted git repository.
nicoloboschi pushed a commit to branch ds-4.14
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
commit 99563890aa26851828be8e84b8ef09a86572d6e0
Author: Michael Marshall <mm...@apache.org>
AuthorDate: Mon Sep 19 02:57:51 2022 -0700
Check if channel closed before processing read request (#3486)
* Check if channel closed before processing read request
* Add missed call to onReadRequestFinish()
* Fix test
* Mock more tests
Co-authored-by: Nicolò Boschi <bo...@gmail.com>
(cherry picked from commit 1313b8e2e7964a0bbf2e221f49fe30a1fd812d31)
---
.../main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java | 7 +++++++
.../java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java | 7 +++++++
.../org/apache/bookkeeper/proto/ForceLedgerProcessorV3Test.java | 1 +
.../apache/bookkeeper/proto/LongPollReadEntryProcessorV3Test.java | 1 +
.../java/org/apache/bookkeeper/proto/ReadEntryProcessorTest.java | 1 +
.../java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java | 1 +
.../org/apache/bookkeeper/proto/WriteEntryProcessorV3Test.java | 1 +
7 files changed, 19 insertions(+)
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
index 60de0440c1..7647e4afef 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
@@ -65,6 +65,13 @@ class ReadEntryProcessor extends PacketProcessorBase<ReadRequest> {
if (LOG.isDebugEnabled()) {
LOG.debug("Received new read request: {}", request);
}
+ if (!channel.isOpen()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Dropping read request for closed channel: {}", channel);
+ }
+ requestProcessor.onReadRequestFinish();
+ return;
+ }
int errorCode = BookieProtocol.EOK;
long startTimeNanos = MathUtils.nowInNano();
ByteBuf data = null;
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java
index a8ecc11d7e..6b3b624141 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java
@@ -248,6 +248,13 @@ class ReadEntryProcessorV3 extends PacketProcessorBaseV3 {
public void safeRun() {
requestProcessor.getRequestStats().getReadEntrySchedulingDelayStats().registerSuccessfulEvent(
MathUtils.elapsedNanos(enqueueNanos), TimeUnit.NANOSECONDS);
+ if (!channel.isOpen()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Dropping read request for closed channel: {}", channel);
+ }
+ requestProcessor.onReadRequestFinish();
+ return;
+ }
if (!isVersionCompatible()) {
ReadResponse readResponse = ReadResponse.newBuilder()
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3Test.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3Test.java
index 1b07fbb4d1..ecb2676bc8 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3Test.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3Test.java
@@ -71,6 +71,7 @@ public class ForceLedgerProcessorV3Test {
.build())
.build();
channel = mock(Channel.class);
+ when(channel.isOpen()).thenReturn(true);
bookie = mock(Bookie.class);
requestProcessor = mock(BookieRequestProcessor.class);
when(requestProcessor.getBookie()).thenReturn(bookie);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3Test.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3Test.java
index 393d5ddf33..9eae1b9c0d 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3Test.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3Test.java
@@ -83,6 +83,7 @@ public class LongPollReadEntryProcessorV3Test {
.build();
Channel channel = mock(Channel.class);
+ when(channel.isOpen()).thenReturn(true);
Bookie bookie = mock(Bookie.class);
BookieRequestProcessor requestProcessor = mock(BookieRequestProcessor.class);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ReadEntryProcessorTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ReadEntryProcessorTest.java
index fcd74b1705..1fd42fd16f 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ReadEntryProcessorTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ReadEntryProcessorTest.java
@@ -60,6 +60,7 @@ public class ReadEntryProcessorTest {
@Before
public void setup() throws IOException, BookieException {
channel = mock(Channel.class);
+ when(channel.isOpen()).thenReturn(true);
bookie = mock(Bookie.class);
requestProcessor = mock(BookieRequestProcessor.class);
when(requestProcessor.getBookie()).thenReturn(bookie);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java
index a69245d4a9..0752c05c32 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java
@@ -64,6 +64,7 @@ public class WriteEntryProcessorTest {
new byte[0],
Unpooled.wrappedBuffer("test-entry-data".getBytes(UTF_8)));
channel = mock(Channel.class);
+ when(channel.isOpen()).thenReturn(true);
bookie = mock(Bookie.class);
requestProcessor = mock(BookieRequestProcessor.class);
when(requestProcessor.getBookie()).thenReturn(bookie);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3Test.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3Test.java
index 7abaa100c8..477d83bb2a 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3Test.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3Test.java
@@ -76,6 +76,7 @@ public class WriteEntryProcessorV3Test {
.build())
.build();
channel = mock(Channel.class);
+ when(channel.isOpen()).thenReturn(true);
bookie = mock(Bookie.class);
requestProcessor = mock(BookieRequestProcessor.class);
when(requestProcessor.getBookie()).thenReturn(bookie);