You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by iv...@apache.org on 2021/11/03 16:18:35 UTC

[bookkeeper] branch master updated: Return too many requests error when the write cache flush timeout occurs (#2860)

This is an automated email from the ASF dual-hosted git repository.

ivank pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 3adb0b2  Return too many requests error when the write cache flush timeout occurs (#2860)
3adb0b2 is described below

commit 3adb0b2b093bca4a6bebbcc8ae38900d0acff2c9
Author: pradeepbn <pr...@gmail.com>
AuthorDate: Wed Nov 3 09:18:28 2021 -0700

    Return too many requests error when the write cache flush timeout occurs (#2860)
    
    Return too many requests error when there is OperationRejectedException which occurs because of internal resource saturation
---
 .../apache/bookkeeper/bookie/BookieException.java  |  2 +-
 .../bookkeeper/proto/WriteEntryProcessor.java      |  2 +-
 .../bookkeeper/proto/WriteEntryProcessorV3.java    |  2 +-
 .../bookkeeper/proto/WriteEntryProcessorTest.java  | 33 ++++++++++++++++++++++
 .../proto/WriteEntryProcessorV3Test.java           | 32 +++++++++++++++++++++
 5 files changed, 68 insertions(+), 3 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieException.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieException.java
index 83002ce..02bddd7 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieException.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieException.java
@@ -186,7 +186,7 @@ public abstract class BookieException extends Exception {
     }
 
     /**
-     * Signals that a ledger has been fenced in a bookie. No more entries can be appended to that ledger.
+     * Signals that a ledger's operation has been rejected by an internal component because of the resource saturation.
      */
     public static class OperationRejectedException extends BookieException {
         public OperationRejectedException() {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
index a61e0d5..7dbd30c 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
@@ -85,7 +85,7 @@ class WriteEntryProcessor extends PacketProcessorBase<ParsedAddRequest> implemen
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Operation rejected while writing {}", request, e);
             }
-            rc = BookieProtocol.EIO;
+            rc = BookieProtocol.ETOOMANYREQUESTS;
         } catch (IOException e) {
             LOG.error("Error writing {}", request, e);
             rc = BookieProtocol.EIO;
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
index d8ed70c..ed2e304 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
@@ -131,7 +131,7 @@ class WriteEntryProcessorV3 extends PacketProcessorBaseV3 {
             if (logger.isDebugEnabled()) {
                 logger.debug("Operation rejected while writing {}", request, e);
             }
-            status = StatusCode.EIO;
+            status = StatusCode.ETOOMANYREQUESTS;
         } catch (IOException e) {
             logger.error("Error writing entry:{} to ledger:{}",
                     entryId, ledgerId, e);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java
index bbcffea..21bca29 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java
@@ -34,9 +34,12 @@ import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelPromise;
+import io.netty.channel.DefaultChannelPromise;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicReference;
+
 import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.bookie.BookieException;
 import org.apache.bookkeeper.proto.BookieProtocol.ParsedAddRequest;
 import org.apache.bookkeeper.proto.BookieProtocol.Response;
 import org.apache.bookkeeper.stats.NullStatsLogger;
@@ -218,4 +221,34 @@ public class WriteEntryProcessorTest {
         response.recycle();
     }
 
+    @Test
+    public void testWritesCacheFlushTimeout() throws Exception {
+        when(bookie.isReadOnly()).thenReturn(false);
+        when(channel.voidPromise()).thenReturn(mock(ChannelPromise.class));
+        when(channel.writeAndFlush(any())).thenReturn(mock(ChannelPromise.class));
+        doAnswer(invocationOnMock -> {
+            throw new BookieException.OperationRejectedException();
+        }).when(bookie).addEntry(
+                any(ByteBuf.class), eq(false), same(processor), same(channel), eq(new byte[0]));
+
+        ChannelPromise promise = new DefaultChannelPromise(channel);
+        AtomicReference<Object> writtenObject = new AtomicReference<>();
+        CountDownLatch latch = new CountDownLatch(1);
+        doAnswer(invocationOnMock -> {
+            writtenObject.set(invocationOnMock.getArgument(0));
+            latch.countDown();
+            return promise;
+        }).when(channel).writeAndFlush(any(), any(ChannelPromise.class));
+
+        processor.run();
+
+        verify(bookie, times(1))
+                .addEntry(any(ByteBuf.class), eq(false), same(processor), same(channel), eq(new byte[0]));
+        verify(channel, times(1)).writeAndFlush(any(Response.class), any(ChannelPromise.class));
+
+        latch.await();
+        assertTrue(writtenObject.get() instanceof Response);
+        Response response = (Response) writtenObject.get();
+        assertEquals(BookieProtocol.ETOOMANYREQUESTS, response.getErrorCode());
+    }
 }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3Test.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3Test.java
index 292dc51..76ead52 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3Test.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3Test.java
@@ -37,6 +37,7 @@ import io.netty.channel.DefaultChannelPromise;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.bookie.BookieException;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.AddRequest;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.BKPacketHeader;
@@ -241,6 +242,37 @@ public class WriteEntryProcessorV3Test {
     }
 
     @Test
+    public void testWritesCacheFlushTimeout() throws Exception {
+        when(bookie.isReadOnly()).thenReturn(false);
+        when(channel.voidPromise()).thenReturn(mock(ChannelPromise.class));
+        when(channel.writeAndFlush(any())).thenReturn(mock(ChannelPromise.class));
+        doAnswer(invocationOnMock -> {
+            throw new BookieException.OperationRejectedException();
+        }).when(bookie).addEntry(
+                any(ByteBuf.class), eq(false), any(WriteCallback.class), same(channel), eq(new byte[0]));
+
+        ChannelPromise promise = new DefaultChannelPromise(channel);
+        AtomicReference<Object> writtenObject = new AtomicReference<>();
+        CountDownLatch latch = new CountDownLatch(1);
+        doAnswer(invocationOnMock -> {
+            writtenObject.set(invocationOnMock.getArgument(0));
+            latch.countDown();
+            return promise;
+        }).when(channel).writeAndFlush(any());
+
+        processor.run();
+
+        verify(bookie, times(1))
+                .addEntry(any(ByteBuf.class), eq(false), any(WriteCallback.class), same(channel), eq(new byte[0]));
+        verify(channel, times(1)).writeAndFlush(any(Response.class));
+
+        latch.await();
+        assertTrue(writtenObject.get() instanceof Response);
+        Response response = (Response) writtenObject.get();
+        assertEquals(StatusCode.ETOOMANYREQUESTS, response.getStatus());
+    }
+
+    @Test
     public void testWritesWithClientNotAcceptingReponses() throws Exception {
         when(requestProcessor.getWaitTimeoutOnBackpressureMillis()).thenReturn(5L);