You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by iv...@apache.org on 2021/11/03 16:18:35 UTC
[bookkeeper] branch master updated: Return too many requests error
when the write cache flush timeout occurs (#2860)
This is an automated email from the ASF dual-hosted git repository.
ivank pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 3adb0b2 Return too many requests error when the write cache flush timeout occurs (#2860)
3adb0b2 is described below
commit 3adb0b2b093bca4a6bebbcc8ae38900d0acff2c9
Author: pradeepbn <pr...@gmail.com>
AuthorDate: Wed Nov 3 09:18:28 2021 -0700
Return too many requests error when the write cache flush timeout occurs (#2860)
Return too many requests error when there is OperationRejectedException which occurs because of internal resource saturation
---
.../apache/bookkeeper/bookie/BookieException.java | 2 +-
.../bookkeeper/proto/WriteEntryProcessor.java | 2 +-
.../bookkeeper/proto/WriteEntryProcessorV3.java | 2 +-
.../bookkeeper/proto/WriteEntryProcessorTest.java | 33 ++++++++++++++++++++++
.../proto/WriteEntryProcessorV3Test.java | 32 +++++++++++++++++++++
5 files changed, 68 insertions(+), 3 deletions(-)
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieException.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieException.java
index 83002ce..02bddd7 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieException.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieException.java
@@ -186,7 +186,7 @@ public abstract class BookieException extends Exception {
}
/**
- * Signals that a ledger has been fenced in a bookie. No more entries can be appended to that ledger.
+ * Signals that a ledger's operation has been rejected by an internal component because of the resource saturation.
*/
public static class OperationRejectedException extends BookieException {
public OperationRejectedException() {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
index a61e0d5..7dbd30c 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
@@ -85,7 +85,7 @@ class WriteEntryProcessor extends PacketProcessorBase<ParsedAddRequest> implemen
if (LOG.isDebugEnabled()) {
LOG.debug("Operation rejected while writing {}", request, e);
}
- rc = BookieProtocol.EIO;
+ rc = BookieProtocol.ETOOMANYREQUESTS;
} catch (IOException e) {
LOG.error("Error writing {}", request, e);
rc = BookieProtocol.EIO;
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
index d8ed70c..ed2e304 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
@@ -131,7 +131,7 @@ class WriteEntryProcessorV3 extends PacketProcessorBaseV3 {
if (logger.isDebugEnabled()) {
logger.debug("Operation rejected while writing {}", request, e);
}
- status = StatusCode.EIO;
+ status = StatusCode.ETOOMANYREQUESTS;
} catch (IOException e) {
logger.error("Error writing entry:{} to ledger:{}",
entryId, ledgerId, e);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java
index bbcffea..21bca29 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java
@@ -34,9 +34,12 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelPromise;
+import io.netty.channel.DefaultChannelPromise;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
+
import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.bookie.BookieException;
import org.apache.bookkeeper.proto.BookieProtocol.ParsedAddRequest;
import org.apache.bookkeeper.proto.BookieProtocol.Response;
import org.apache.bookkeeper.stats.NullStatsLogger;
@@ -218,4 +221,34 @@ public class WriteEntryProcessorTest {
response.recycle();
}
+ @Test
+ public void testWritesCacheFlushTimeout() throws Exception {
+ when(bookie.isReadOnly()).thenReturn(false);
+ when(channel.voidPromise()).thenReturn(mock(ChannelPromise.class));
+ when(channel.writeAndFlush(any())).thenReturn(mock(ChannelPromise.class));
+ doAnswer(invocationOnMock -> {
+ throw new BookieException.OperationRejectedException();
+ }).when(bookie).addEntry(
+ any(ByteBuf.class), eq(false), same(processor), same(channel), eq(new byte[0]));
+
+ ChannelPromise promise = new DefaultChannelPromise(channel);
+ AtomicReference<Object> writtenObject = new AtomicReference<>();
+ CountDownLatch latch = new CountDownLatch(1);
+ doAnswer(invocationOnMock -> {
+ writtenObject.set(invocationOnMock.getArgument(0));
+ latch.countDown();
+ return promise;
+ }).when(channel).writeAndFlush(any(), any(ChannelPromise.class));
+
+ processor.run();
+
+ verify(bookie, times(1))
+ .addEntry(any(ByteBuf.class), eq(false), same(processor), same(channel), eq(new byte[0]));
+ verify(channel, times(1)).writeAndFlush(any(Response.class), any(ChannelPromise.class));
+
+ latch.await();
+ assertTrue(writtenObject.get() instanceof Response);
+ Response response = (Response) writtenObject.get();
+ assertEquals(BookieProtocol.ETOOMANYREQUESTS, response.getErrorCode());
+ }
}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3Test.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3Test.java
index 292dc51..76ead52 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3Test.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3Test.java
@@ -37,6 +37,7 @@ import io.netty.channel.DefaultChannelPromise;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.bookie.BookieException;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
import org.apache.bookkeeper.proto.BookkeeperProtocol.AddRequest;
import org.apache.bookkeeper.proto.BookkeeperProtocol.BKPacketHeader;
@@ -241,6 +242,37 @@ public class WriteEntryProcessorV3Test {
}
@Test
+ public void testWritesCacheFlushTimeout() throws Exception {
+ when(bookie.isReadOnly()).thenReturn(false);
+ when(channel.voidPromise()).thenReturn(mock(ChannelPromise.class));
+ when(channel.writeAndFlush(any())).thenReturn(mock(ChannelPromise.class));
+ doAnswer(invocationOnMock -> {
+ throw new BookieException.OperationRejectedException();
+ }).when(bookie).addEntry(
+ any(ByteBuf.class), eq(false), any(WriteCallback.class), same(channel), eq(new byte[0]));
+
+ ChannelPromise promise = new DefaultChannelPromise(channel);
+ AtomicReference<Object> writtenObject = new AtomicReference<>();
+ CountDownLatch latch = new CountDownLatch(1);
+ doAnswer(invocationOnMock -> {
+ writtenObject.set(invocationOnMock.getArgument(0));
+ latch.countDown();
+ return promise;
+ }).when(channel).writeAndFlush(any());
+
+ processor.run();
+
+ verify(bookie, times(1))
+ .addEntry(any(ByteBuf.class), eq(false), any(WriteCallback.class), same(channel), eq(new byte[0]));
+ verify(channel, times(1)).writeAndFlush(any(Response.class));
+
+ latch.await();
+ assertTrue(writtenObject.get() instanceof Response);
+ Response response = (Response) writtenObject.get();
+ assertEquals(StatusCode.ETOOMANYREQUESTS, response.getStatus());
+ }
+
+ @Test
public void testWritesWithClientNotAcceptingReponses() throws Exception {
when(requestProcessor.getWaitTimeoutOnBackpressureMillis()).thenReturn(5L);