You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2018/04/05 07:03:08 UTC

[GitHub] sijie closed pull request #1315: Issue 1314: Provide a mechanism to allow high priority writes to readonly bookies

sijie closed pull request #1315: Issue 1314: Provide a mechanism to allow high priority writes to readonly bookies
URL: https://github.com/apache/bookkeeper/pull/1315
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
index 73221731e..b391c531c 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
@@ -885,16 +885,6 @@ private LedgerDirsListener getLedgerDirsListener() {
 
         return new LedgerDirsListener() {
 
-            @Override
-            public void diskFull(File disk) {
-                // Nothing needs to be handled here.
-            }
-
-            @Override
-            public void diskAlmostFull(File disk) {
-                // Nothing needs to be handled here.
-            }
-
             @Override
             public void diskFailed(File disk) {
                 // Shutdown the bookie on disk failure.
@@ -902,8 +892,9 @@ public void diskFailed(File disk) {
             }
 
             @Override
-            public void allDisksFull() {
+            public void allDisksFull(boolean highPriorityWritesAllowed) {
                 // Transition to readOnly mode on all disks full
+                stateManager.setHighPriorityWritesAvailability(highPriorityWritesAllowed);
                 stateManager.transitionToReadOnlyMode();
             }
 
@@ -916,12 +907,14 @@ public void fatalError() {
             @Override
             public void diskWritable(File disk) {
                 // Transition to writable mode when a disk becomes writable again.
+                stateManager.setHighPriorityWritesAvailability(true);
                 stateManager.transitionToWritableMode();
             }
 
             @Override
             public void diskJustWritable(File disk) {
                 // Transition to writable mode when a disk becomes writable again.
+                stateManager.setHighPriorityWritesAvailability(true);
                 stateManager.transitionToWritableMode();
             }
         };
@@ -962,6 +955,15 @@ public boolean isReadOnly() {
         return stateManager.isReadOnly();
     }
 
+    /**
+     * Check whether Bookie is available for high priority writes.
+     *
+     * @return true if the bookie is able to take high priority writes.
+     */
+    public boolean isAvailableForHighPriorityWrites() {
+        return stateManager.isAvailableForHighPriorityWrites();
+    }
+
     public boolean isRunning() {
         return stateManager.isRunning();
     }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieStateManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieStateManager.java
index 798db41d5..1689b11f5 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieStateManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieStateManager.java
@@ -61,6 +61,7 @@
     private final BookieStatus bookieStatus = new BookieStatus();
     private final AtomicBoolean rmRegistered = new AtomicBoolean(false);
     private final AtomicBoolean forceReadOnly = new AtomicBoolean(false);
+    private volatile boolean availableForHighPriorityWrites = true;
 
     private final String bookieId;
     private ShutdownHandler shutdownHandler;
@@ -68,7 +69,6 @@
     // Expose Stats
     private final StatsLogger statsLogger;
 
-
     public BookieStateManager(ServerConfiguration conf, StatsLogger statsLogger,
            MetadataBookieDriver metadataDriver, LedgerDirsManager ledgerDirsManager) throws IOException {
         this.conf = conf;
@@ -135,6 +135,16 @@ public boolean isReadOnly(){
         return forceReadOnly.get() || bookieStatus.isInReadOnlyMode();
     }
 
+    @Override
+    public boolean isAvailableForHighPriorityWrites() {
+        return availableForHighPriorityWrites;
+    }
+
+    @Override
+    public void setHighPriorityWritesAvailability(boolean available) {
+        this.availableForHighPriorityWrites = available;
+    }
+
     @Override
     public boolean isRunning(){
         return running;
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieStatus.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieStatus.java
index 7c702da32..49aa66ed2 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieStatus.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieStatus.java
@@ -45,7 +45,7 @@
 
     enum BookieMode {
         READ_ONLY,
-        READ_WRITE;
+        READ_WRITE
     }
 
     private static final long INVALID_UPDATE_TIME = -1;
@@ -54,7 +54,6 @@
     private long lastUpdateTime;
     private volatile BookieMode bookieMode;
 
-
     BookieStatus() {
         this.bookieMode = BookieMode.READ_WRITE;
         this.layoutVersion = CURRENT_STATUS_LAYOUT_VERSION;
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
index 2055758c7..66efbaa5b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
@@ -439,31 +439,6 @@ public void diskAlmostFull(File disk) {
                     shouldCreateNewEntryLog.set(true);
                 }
             }
-
-            @Override
-            public void diskFailed(File disk) {
-                // Nothing to handle here. Will be handled in Bookie
-            }
-
-            @Override
-            public void allDisksFull() {
-                // Nothing to handle here. Will be handled in Bookie
-            }
-
-            @Override
-            public void fatalError() {
-                // Nothing to handle here. Will be handled in Bookie
-            }
-
-            @Override
-            public void diskWritable(File disk) {
-                // Nothing to handle here. Will be handled in Bookie
-            }
-
-            @Override
-            public void diskJustWritable(File disk) {
-                // Nothing to handle here. Will be handled in Bookie
-            }
         };
     }
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
index 05b5eb570..eb3b9354f 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
@@ -42,7 +42,6 @@
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import org.apache.bookkeeper.bookie.FileInfoBackingCache.CachedFileInfo;
-import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
 import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
 import org.apache.bookkeeper.common.util.Watcher;
 import org.apache.bookkeeper.conf.ServerConfiguration;
@@ -108,7 +107,6 @@ public IndexPersistenceMgr(int pageSize,
         LOG.info("openFileLimit = {}", openFileLimit);
         // Retrieve all of the active ledgers.
         getActiveLedgers();
-        ledgerDirsManager.addLedgerDirsListener(getLedgerDirsListener());
 
         // build the file info cache
         int concurrencyLevel = Math.max(1, Math.max(conf.getNumAddWorkerThreads(), conf.getNumReadWorkerThreads()));
@@ -493,45 +491,6 @@ int getOpenFileLimit() {
         return openFileLimit;
     }
 
-    private LedgerDirsListener getLedgerDirsListener() {
-        return new LedgerDirsListener() {
-            @Override
-            public void diskFull(File disk) {
-                // Nothing to handle here. Will be handled in Bookie
-            }
-
-            @Override
-            public void diskAlmostFull(File disk) {
-                // Nothing to handle here. Will be handled in Bookie
-            }
-
-            @Override
-            public void diskFailed(File disk) {
-                // Nothing to handle here. Will be handled in Bookie
-            }
-
-            @Override
-            public void allDisksFull() {
-                // Nothing to handle here. Will be handled in Bookie
-            }
-
-            @Override
-            public void fatalError() {
-                // Nothing to handle here. Will be handled in Bookie
-            }
-
-            @Override
-            public void diskWritable(File disk) {
-                // Nothing to handle here. Will be handled in Bookie
-            }
-
-            @Override
-            public void diskJustWritable(File disk) {
-                // Nothing to handle here. Will be handled in Bookie
-            }
-        };
-    }
-
     private void relocateIndexFileAndFlushHeader(long ledger, FileInfo fi) throws IOException {
         File currentDir = getLedgerDirForLedger(fi);
         if (ledgerDirsManager.isDirFull(currentDir)) {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
index ce1a4998e..2c576d7ca 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
@@ -113,10 +113,6 @@ public void initialize(ServerConfiguration conf,
 
     private LedgerDirsListener getLedgerDirsListener() {
         return new LedgerDirsListener() {
-            @Override
-            public void diskFailed(File disk) {
-                // do nothing.
-            }
 
             @Override
             public void diskAlmostFull(File disk) {
@@ -138,7 +134,7 @@ public void diskFull(File disk) {
             }
 
             @Override
-            public void allDisksFull() {
+            public void allDisksFull(boolean highPriorityWritesAllowed) {
                 if (gcThread.isForceGCAllowWhenNoSpace) {
                     gcThread.enableForceGC();
                 } else {
@@ -147,11 +143,6 @@ public void allDisksFull() {
                 }
             }
 
-            @Override
-            public void fatalError() {
-                // do nothing.
-            }
-
             @Override
             public void diskWritable(File disk) {
                 // we have enough space now
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsManager.java
index 4bd28b2fb..5d2c11ff7 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsManager.java
@@ -373,43 +373,49 @@ public NoWritableLedgerDirException(String errMsg) {
          *
          * @param disk Failed disk
          */
-        void diskFailed(File disk);
+        default void diskFailed(File disk) {}
 
         /**
          * Notified when the disk usage warn threshold is exceeded on the drive.
          * @param disk
          */
-        void diskAlmostFull(File disk);
+        default void diskAlmostFull(File disk) {}
 
         /**
          * This will be notified on disk detected as full.
          *
          * @param disk Filled disk
          */
-        void diskFull(File disk);
+        default void diskFull(File disk) {}
 
         /**
          * This will be notified on disk detected as writable and under warn threshold.
          *
          * @param disk Writable disk
          */
-        void diskWritable(File disk);
+        default void diskWritable(File disk) {}
 
         /**
          * This will be notified on disk detected as writable but still in warn threshold.
          *
          * @param disk Writable disk
          */
-        void diskJustWritable(File disk);
+        default void diskJustWritable(File disk) {}
 
         /**
          * This will be notified whenever all disks are detected as full.
+         *
+         * <p>Normal writes will be rejected when disks are detected as "full". High priority writes
+         * such as ledger recovery writes can go through if disks are still available.
+         *
+         * @param highPriorityWritesAllowed the parameter indicates we are still have disk spaces for high priority
+         *                                  writes even disks are detected as "full"
          */
-        void allDisksFull();
+        default void allDisksFull(boolean highPriorityWritesAllowed) {}
 
         /**
          * This will notify the fatal errors.
          */
-        void fatalError();
+        default void fatalError() {}
     }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsMonitor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsMonitor.java
index afd8ad843..2904643b0 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsMonitor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsMonitor.java
@@ -53,6 +53,7 @@
     private final ConcurrentMap<File, Float> diskUsages;
     private final DiskChecker diskChecker;
     private final LedgerDirsManager ldm;
+    private long minUsableSizeForHighPriorityWrites;
     private ScheduledExecutorService executor;
     private ScheduledFuture<?> checkTask;
 
@@ -60,6 +61,7 @@ public LedgerDirsMonitor(final ServerConfiguration conf,
                              final DiskChecker diskChecker,
                              final LedgerDirsManager ldm) {
         this.interval = conf.getDiskCheckInterval();
+        this.minUsableSizeForHighPriorityWrites = conf.getMinUsableSizeForHighPriorityWrites();
         this.conf = conf;
         this.diskChecker = diskChecker;
         this.diskUsages = ldm.getDiskUsages();
@@ -98,8 +100,14 @@ private void check() {
             // bookie cannot get writable dir but considered to be writable
             ldm.getWritableLedgerDirs();
         } catch (NoWritableLedgerDirException e) {
+            boolean highPriorityWritesAllowed = true;
+            try {
+                ldm.getDirsAboveUsableThresholdSize(minUsableSizeForHighPriorityWrites);
+            } catch (NoWritableLedgerDirException e1) {
+                highPriorityWritesAllowed = false;
+            }
             for (LedgerDirsListener listener : ldm.getListeners()) {
-                listener.allDisksFull();
+                listener.allDisksFull(highPriorityWritesAllowed);
             }
         }
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/StateManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/StateManager.java
index ad4ac0c67..538f3ac19 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/StateManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/StateManager.java
@@ -25,12 +25,25 @@
  */
 public interface StateManager extends AutoCloseable {
 
-
     /**
      * Init state of Bookie when launch bookie.
      */
     void initState();
 
+    /**
+     * Check if the bookie is available for high priority writes or not.
+     *
+     * @return true if the bookie is available for high priority writes; otherwise false.
+     */
+    boolean isAvailableForHighPriorityWrites();
+
+    /**
+     * Enable/Disable the availability for high priority writes.
+     *
+     * @param available the flag to enable/disable the availability for high priority writes.
+     */
+    void setHighPriorityWritesAvailability(boolean available);
+
     /**
      * Check is ReadOnly.
      */
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java
index 466d46a9f..a7c3a7a56 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java
@@ -122,7 +122,7 @@ private void flush() {
             ledgerStorage.flush();
         } catch (NoWritableLedgerDirException e) {
             log.error("No writeable ledger directories", e);
-            dirsListener.allDisksFull();
+            dirsListener.allDisksFull(true);
             return;
         } catch (IOException e) {
             log.error("Exception flushing ledgers", e);
@@ -138,7 +138,7 @@ private void flush() {
             checkpointSource.checkpointComplete(checkpoint, false);
         } catch (IOException e) {
             log.error("Exception marking checkpoint as complete", e);
-            dirsListener.allDisksFull();
+            dirsListener.allDisksFull(true);
         }
     }
 
@@ -153,7 +153,7 @@ public void checkpoint(Checkpoint checkpoint) {
             ledgerStorage.checkpoint(checkpoint);
         } catch (NoWritableLedgerDirException e) {
             log.error("No writeable ledger directories", e);
-            dirsListener.allDisksFull();
+            dirsListener.allDisksFull(true);
             return;
         } catch (IOException e) {
             log.error("Exception flushing ledgers", e);
@@ -164,7 +164,7 @@ public void checkpoint(Checkpoint checkpoint) {
             checkpointSource.checkpointComplete(checkpoint, true);
         } catch (IOException e) {
             log.error("Exception marking checkpoint as complete", e);
-            dirsListener.allDisksFull();
+            dirsListener.allDisksFull(true);
         }
     }
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
index e9950fd1c..cb398216b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
@@ -162,6 +162,7 @@
     protected static final String BOOKIE_AUTH_PROVIDER_FACTORY_CLASS = "bookieAuthProviderFactoryClass";
 
     protected static final String MIN_USABLESIZE_FOR_INDEXFILE_CREATION = "minUsableSizeForIndexFileCreation";
+    protected static final String MIN_USABLESIZE_FOR_HIGH_PRIORITY_WRITES = "minUsableSizeForHighPriorityWrites";
 
     protected static final String ALLOW_MULTIPLEDIRS_UNDER_SAME_DISKPARTITION =
         "allowMultipleDirsUnderSameDiskPartition";
@@ -2523,7 +2524,30 @@ public long getMinUsableSizeForIndexFileCreation() {
      * @return
      */
     public ServerConfiguration setMinUsableSizeForIndexFileCreation(long minUsableSizeForIndexFileCreation) {
-        this.setProperty(MIN_USABLESIZE_FOR_INDEXFILE_CREATION, Long.toString(minUsableSizeForIndexFileCreation));
+        this.setProperty(MIN_USABLESIZE_FOR_INDEXFILE_CREATION, minUsableSizeForIndexFileCreation);
+        return this;
+    }
+
+    /**
+     * Gets the minimum safe usable size to be available in ledger directory for Bookie to accept high priority writes.
+     *
+     * <p>If not set, it is two times of {@link #getEntryLogSizeLimit()}.
+     *
+     * @return the minimum safe usable size per ledger directory for bookie to accept high priority writes.
+     */
+    public long getMinUsableSizeForHighPriorityWrites() {
+        return this.getLong(MIN_USABLESIZE_FOR_HIGH_PRIORITY_WRITES, 2 * getEntryLogSizeLimit());
+    }
+
+    /**
+     * Sets the minimum safe usable size to be available in ledger directory for Bookie to accept high priority writes.
+     *
+     * @param minUsableSizeForHighPriorityWrites minimum safe usable size per ledger directory for Bookie to accept
+     *                                           high priority writes
+     * @return server configuration.
+     */
+    public ServerConfiguration setMinUsableSizeForHighPriorityWrites(long minUsableSizeForHighPriorityWrites) {
+        this.setProperty(MIN_USABLESIZE_FOR_HIGH_PRIORITY_WRITES, minUsableSizeForHighPriorityWrites);
         return this;
     }
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
index 4102e75c8..edb89247d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
@@ -76,6 +76,7 @@
 /**
  * An implementation of the RequestProcessor interface.
  */
+@Getter(AccessLevel.PACKAGE)
 public class BookieRequestProcessor implements RequestProcessor {
 
     private static final Logger LOG = LoggerFactory.getLogger(BookieRequestProcessor.class);
@@ -94,7 +95,6 @@
     /**
      * The threadpool used to execute all read entry requests issued to this server.
      */
-    @Getter(AccessLevel.PACKAGE)
     private final OrderedExecutor readThreadPool;
 
     /**
@@ -111,7 +111,6 @@
      * The threadpool used to execute all long poll requests issued to this server
      * after they are done waiting.
      */
-    @Getter(AccessLevel.PACKAGE)
     private final OrderedExecutor longPollThreadPool;
 
     /**
@@ -127,8 +126,8 @@
     // Expose Stats
     private final BKStats bkStats = BKStats.getInstance();
     private final boolean statsEnabled;
-    final OpStatsLogger addRequestStats;
-    final OpStatsLogger addEntryStats;
+    private final OpStatsLogger addRequestStats;
+    private final OpStatsLogger addEntryStats;
     final OpStatsLogger readRequestStats;
     final OpStatsLogger readEntryStats;
     final OpStatsLogger fenceReadRequestStats;
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java
index e84bbee24..9cd9fd5e1 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java
@@ -58,9 +58,11 @@ protected void sendResponse(StatusCode code, Object response, OpStatsLogger stat
             public void operationComplete(ChannelFuture future) throws Exception {
                 long writeElapsedNanos = MathUtils.elapsedNanos(writeNanos);
                 if (!future.isSuccess()) {
-                    requestProcessor.channelWriteStats.registerFailedEvent(writeElapsedNanos, TimeUnit.NANOSECONDS);
+                    requestProcessor.getChannelWriteStats()
+                        .registerFailedEvent(writeElapsedNanos, TimeUnit.NANOSECONDS);
                 } else {
-                    requestProcessor.channelWriteStats.registerSuccessfulEvent(writeElapsedNanos, TimeUnit.NANOSECONDS);
+                    requestProcessor.getChannelWriteStats()
+                        .registerSuccessfulEvent(writeElapsedNanos, TimeUnit.NANOSECONDS);
                 }
                 if (StatusCode.EOK == code) {
                     statsLogger.registerSuccessfulEvent(MathUtils.elapsedNanos(enqueueNanos), TimeUnit.NANOSECONDS);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
index eaf2473a2..f5af75ac4 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
@@ -17,6 +17,7 @@
  */
 package org.apache.bookkeeper.proto;
 
+import com.google.common.annotations.VisibleForTesting;
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.Channel;
 import io.netty.util.Recycler;
@@ -56,12 +57,13 @@ public static WriteEntryProcessor create(ParsedAddRequest request, Channel chann
 
     @Override
     protected void processPacket() {
-        if (requestProcessor.bookie.isReadOnly()) {
+        if (requestProcessor.getBookie().isReadOnly()
+            && !(request.isHighPriority() && requestProcessor.getBookie().isAvailableForHighPriorityWrites())) {
             LOG.warn("BookieServer is running in readonly mode,"
                     + " so rejecting the request from the client!");
             sendResponse(BookieProtocol.EREADONLY,
                          ResponseBuilder.buildErrorResponse(BookieProtocol.EREADONLY, request),
-                         requestProcessor.addRequestStats);
+                         requestProcessor.getAddRequestStats());
             request.release();
             request.recycle();
             return;
@@ -72,9 +74,9 @@ protected void processPacket() {
         ByteBuf addData = request.getData();
         try {
             if (request.isRecoveryAdd()) {
-                requestProcessor.bookie.recoveryAddEntry(addData, this, channel, request.getMasterKey());
+                requestProcessor.getBookie().recoveryAddEntry(addData, this, channel, request.getMasterKey());
             } else {
-                requestProcessor.bookie.addEntry(addData, false, this, channel, request.getMasterKey());
+                requestProcessor.getBookie().addEntry(addData, false, this, channel, request.getMasterKey());
             }
         } catch (OperationRejectedException e) {
             // Avoid to log each occurence of this exception as this can happen when the ledger storage is
@@ -102,11 +104,11 @@ protected void processPacket() {
         }
 
         if (rc != BookieProtocol.EOK) {
-            requestProcessor.addEntryStats.registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos),
+            requestProcessor.getAddEntryStats().registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos),
                     TimeUnit.NANOSECONDS);
             sendResponse(rc,
                          ResponseBuilder.buildErrorResponse(rc, request),
-                         requestProcessor.addRequestStats);
+                         requestProcessor.getAddRequestStats());
             request.recycle();
         }
     }
@@ -115,15 +117,15 @@ protected void processPacket() {
     public void writeComplete(int rc, long ledgerId, long entryId,
                               BookieSocketAddress addr, Object ctx) {
         if (BookieProtocol.EOK == rc) {
-            requestProcessor.addEntryStats.registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos),
+            requestProcessor.getAddEntryStats().registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos),
                     TimeUnit.NANOSECONDS);
         } else {
-            requestProcessor.addEntryStats.registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos),
+            requestProcessor.getAddEntryStats().registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos),
                     TimeUnit.NANOSECONDS);
         }
         sendResponse(rc,
                      ResponseBuilder.buildAddResponse(request),
-                     requestProcessor.addRequestStats);
+                     requestProcessor.getAddRequestStats());
         request.recycle();
         recycle();
     }
@@ -134,7 +136,8 @@ public String toString() {
                              request.getLedgerId(), request.getEntryId());
     }
 
-    private void recycle() {
+    @VisibleForTesting
+    void recycle() {
         reset();
         recyclerHandle.recycle(this);
     }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
index 0854bf538..9267d4549 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
@@ -65,7 +65,9 @@ private AddResponse getAddResponse() {
             return addResponse.build();
         }
 
-        if (requestProcessor.bookie.isReadOnly()) {
+        if (requestProcessor.getBookie().isReadOnly()
+            && !(RequestUtils.isHighPriority(request)
+                    && requestProcessor.getBookie().isAvailableForHighPriorityWrites())) {
             logger.warn("BookieServer is running as readonly mode, so rejecting the request from the client!");
             addResponse.setStatus(StatusCode.EREADONLY);
             return addResponse.build();
@@ -76,10 +78,10 @@ private AddResponse getAddResponse() {
             public void writeComplete(int rc, long ledgerId, long entryId,
                                       BookieSocketAddress addr, Object ctx) {
                 if (BookieProtocol.EOK == rc) {
-                    requestProcessor.addEntryStats.registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos),
+                    requestProcessor.getAddEntryStats().registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos),
                             TimeUnit.NANOSECONDS);
                 } else {
-                    requestProcessor.addEntryStats.registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos),
+                    requestProcessor.getAddEntryStats().registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos),
                             TimeUnit.NANOSECONDS);
                 }
 
@@ -101,7 +103,7 @@ public void writeComplete(int rc, long ledgerId, long entryId,
                         .setStatus(addResponse.getStatus())
                         .setAddResponse(addResponse);
                 Response resp = response.build();
-                sendResponse(status, resp, requestProcessor.addRequestStats);
+                sendResponse(status, resp, requestProcessor.getAddRequestStats());
             }
         };
         final EnumSet<WriteFlag> writeFlags;
@@ -116,9 +118,9 @@ public void writeComplete(int rc, long ledgerId, long entryId,
         ByteBuf entryToAdd = Unpooled.wrappedBuffer(addRequest.getBody().asReadOnlyByteBuffer());
         try {
             if (RequestUtils.hasFlag(addRequest, AddRequest.Flag.RECOVERY_ADD)) {
-                requestProcessor.bookie.recoveryAddEntry(entryToAdd, wcb, channel, masterKey);
+                requestProcessor.getBookie().recoveryAddEntry(entryToAdd, wcb, channel, masterKey);
             } else {
-                requestProcessor.bookie.addEntry(entryToAdd, ackBeforeSync, wcb, channel, masterKey);
+                requestProcessor.getBookie().addEntry(entryToAdd, ackBeforeSync, wcb, channel, masterKey);
             }
             status = StatusCode.EOK;
         } catch (OperationRejectedException e) {
@@ -167,7 +169,7 @@ public void safeRun() {
                     .setAddResponse(addResponse);
             Response resp = response.build();
             sendResponse(addResponse.getStatus(), resp,
-                         requestProcessor.addRequestStats);
+                         requestProcessor.getAddRequestStats());
         }
     }
 }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieStorageThresholdTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieStorageThresholdTest.java
index ad7ba1e53..ce084b35c 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieStorageThresholdTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieStorageThresholdTest.java
@@ -167,35 +167,17 @@ public void testStorageThresholdCompaction() throws Exception {
         final CountDownLatch diskWritable = new CountDownLatch(1);
         final CountDownLatch diskFull = new CountDownLatch(1);
         ledgerDirsManager.addLedgerDirsListener(new LedgerDirsListener() {
-            @Override
-            public void fatalError() {
-            }
 
             @Override
             public void diskWritable(File disk) {
                 diskWritable.countDown();
             }
 
-            @Override
-            public void diskJustWritable(File disk) {
-            }
-
             @Override
             public void diskFull(File disk) {
                 diskFull.countDown();
             }
 
-            @Override
-            public void diskFailed(File disk) {
-            }
-
-            @Override
-            public void diskAlmostFull(File disk) {
-            }
-
-            @Override
-            public void allDisksFull() {
-            }
         });
 
         // Dependency Injected class
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestLedgerDirsManager.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestLedgerDirsManager.java
index 4efc69f62..1731b9b54 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestLedgerDirsManager.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestLedgerDirsManager.java
@@ -181,20 +181,42 @@ public void testGetWritableDirForLog() throws Exception {
 
     @Test
     public void testLedgerDirsMonitorDuringTransition() throws Exception {
+        testLedgerDirsMonitorDuringTransition(true);
+    }
+
+    @Test
+    public void testHighPriorityWritesDisallowedDuringTransition() throws Exception {
+        testLedgerDirsMonitorDuringTransition(false);
+    }
+
+    private void testLedgerDirsMonitorDuringTransition(boolean highPriorityWritesAllowed) throws Exception {
+        if (!highPriorityWritesAllowed) {
+            ledgerMonitor.shutdown();
+            conf.setMinUsableSizeForHighPriorityWrites(curDir.getUsableSpace() + 1024);
+            dirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
+                new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()), statsLogger);
+            ledgerMonitor = new LedgerDirsMonitor(conf, mockDiskChecker, dirsManager);
+            ledgerMonitor.init();
+        }
+
         MockLedgerDirsListener mockLedgerDirsListener = new MockLedgerDirsListener();
         dirsManager.addLedgerDirsListener(mockLedgerDirsListener);
         ledgerMonitor.start();
 
         assertFalse(mockLedgerDirsListener.readOnly);
-        mockDiskChecker.setUsage(threshold + 0.05f);
+        assertTrue(mockLedgerDirsListener.highPriorityWritesAllowed);
 
+        mockDiskChecker.setUsage(threshold + 0.05f);
         executorController.advance(Duration.ofMillis(diskCheckInterval));
+
         assertTrue(mockLedgerDirsListener.readOnly);
+        assertEquals(highPriorityWritesAllowed, mockLedgerDirsListener.highPriorityWritesAllowed);
 
         mockDiskChecker.setUsage(threshold - 0.05f);
         executorController.advance(Duration.ofMillis(diskCheckInterval));
 
         assertFalse(mockLedgerDirsListener.readOnly);
+        assertTrue(mockLedgerDirsListener.highPriorityWritesAllowed);
     }
 
     @Test
@@ -427,45 +449,34 @@ public void setUsageMap(Map<File, Float> usageMap) {
 
     private class MockLedgerDirsListener implements LedgerDirsListener {
 
+        public volatile boolean highPriorityWritesAllowed;
         public volatile boolean readOnly;
 
         public MockLedgerDirsListener() {
             reset();
         }
 
-        @Override
-        public void diskFailed(File disk) {
-        }
-
-        @Override
-        public void diskAlmostFull(File disk) {
-        }
-
-        @Override
-        public void diskFull(File disk) {
-        }
-
         @Override
         public void diskWritable(File disk) {
             readOnly = false;
+            highPriorityWritesAllowed = true;
         }
 
         @Override
         public void diskJustWritable(File disk) {
             readOnly = false;
+            highPriorityWritesAllowed = true;
         }
 
         @Override
-        public void allDisksFull() {
-            readOnly = true;
-        }
-
-        @Override
-        public void fatalError() {
+        public void allDisksFull(boolean highPriorityWritesAllowed) {
+            this.readOnly = true;
+            this.highPriorityWritesAllowed = highPriorityWritesAllowed;
         }
 
         public void reset() {
             readOnly = false;
+            highPriorityWritesAllowed = true;
         }
 
     }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
index e6cb93d95..d9fa8cc67 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
@@ -26,7 +26,6 @@
 import static org.junit.Assert.fail;
 
 import io.netty.buffer.ByteBuf;
-import java.io.File;
 import java.io.IOException;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
@@ -81,7 +80,7 @@ public void testSyncThreadLongShutdown() throws Exception {
         ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
         conf.setFlushInterval(flushInterval);
         CheckpointSource checkpointSource = new DummyCheckpointSource();
-        LedgerDirsListener listener = new DummyLedgerDirsListener();
+        LedgerDirsListener listener = new LedgerDirsListener() {};
 
         final CountDownLatch checkpointCalledLatch = new CountDownLatch(1);
         final CountDownLatch checkpointLatch = new CountDownLatch(1);
@@ -154,7 +153,7 @@ public void testSyncThreadSuspension() throws Exception {
         ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
         conf.setFlushInterval(flushInterval);
         CheckpointSource checkpointSource = new DummyCheckpointSource();
-        LedgerDirsListener listener = new DummyLedgerDirsListener();
+        LedgerDirsListener listener = new LedgerDirsListener() {};
 
         final AtomicInteger checkpointCount = new AtomicInteger(0);
         LedgerStorage storage = new DummyLedgerStorage() {
@@ -200,7 +199,7 @@ public void testSyncThreadShutdownOnError() throws Exception {
         conf.setFlushInterval(flushInterval);
         CheckpointSource checkpointSource = new DummyCheckpointSource();
         final CountDownLatch fatalLatch = new CountDownLatch(1);
-        LedgerDirsListener listener = new DummyLedgerDirsListener() {
+        LedgerDirsListener listener = new LedgerDirsListener() {
                 @Override
                 public void fatalError() {
                     fatalLatch.countDown();
@@ -232,9 +231,9 @@ public void testSyncThreadDisksFull() throws Exception {
         conf.setFlushInterval(flushInterval);
         CheckpointSource checkpointSource = new DummyCheckpointSource();
         final CountDownLatch diskFullLatch = new CountDownLatch(1);
-        LedgerDirsListener listener = new DummyLedgerDirsListener() {
+        LedgerDirsListener listener = new LedgerDirsListener() {
                 @Override
-                public void allDisksFull() {
+                public void allDisksFull(boolean highPriorityWritesAllowed) {
                     diskFullLatch.countDown();
                 }
             };
@@ -363,34 +362,4 @@ public void registerLedgerDeletionListener(LedgerDeletionListener listener) {
         }
     }
 
-    private static class DummyLedgerDirsListener
-        implements LedgerDirsManager.LedgerDirsListener {
-        @Override
-        public void diskFailed(File disk) {
-        }
-
-        @Override
-        public void diskAlmostFull(File disk) {
-        }
-
-        @Override
-        public void diskFull(File disk) {
-        }
-
-        @Override
-        public void allDisksFull() {
-        }
-
-        @Override
-        public void fatalError() {
-        }
-
-        @Override
-        public void diskWritable(File disk) {
-        }
-
-        @Override
-        public void diskJustWritable(File disk) {
-        }
-    }
 }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java
new file mode 100644
index 000000000..5901c2f58
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.proto;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.same;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelPromise;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.proto.BookieProtocol.ParsedAddRequest;
+import org.apache.bookkeeper.proto.BookieProtocol.Response;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Unit test {@link WriteEntryProcessor}.
+ */
+public class WriteEntryProcessorTest {
+
+    private ParsedAddRequest request;
+    private WriteEntryProcessor processor;
+    private Channel channel;
+    private BookieRequestProcessor requestProcessor;
+    private Bookie bookie;
+
+    @Before
+    public void setup() {
+        request = ParsedAddRequest.create(
+            BookieProtocol.CURRENT_PROTOCOL_VERSION,
+            System.currentTimeMillis(),
+            System.currentTimeMillis() + 1,
+            (short) 0,
+            new byte[0],
+            Unpooled.wrappedBuffer("test-entry-data".getBytes(UTF_8)));
+        channel = mock(Channel.class);
+        bookie = mock(Bookie.class);
+        requestProcessor = mock(BookieRequestProcessor.class);
+        when(requestProcessor.getBookie()).thenReturn(bookie);
+        when(requestProcessor.getAddEntryStats())
+            .thenReturn(NullStatsLogger.INSTANCE.getOpStatsLogger("add_entry"));
+        when(requestProcessor.getAddRequestStats())
+            .thenReturn(NullStatsLogger.INSTANCE.getOpStatsLogger("add_requests"));
+        processor = WriteEntryProcessor.create(
+            request,
+            channel,
+            requestProcessor);
+    }
+
+    private void reinitRequest(short flags) {
+        request.release();
+        request.recycle();
+        processor.recycle();
+
+        request = ParsedAddRequest.create(
+            BookieProtocol.CURRENT_PROTOCOL_VERSION,
+            System.currentTimeMillis(),
+            System.currentTimeMillis() + 1,
+            flags,
+            new byte[0],
+            Unpooled.wrappedBuffer("test-entry-data".getBytes(UTF_8)));
+        processor = WriteEntryProcessor.create(
+            request,
+            channel,
+            requestProcessor);
+    }
+
+    @Test
+    public void testNoneHighPriorityWritesOnReadOnlyBookie() throws Exception {
+        when(bookie.isReadOnly()).thenReturn(true);
+        when(channel.voidPromise()).thenReturn(mock(ChannelPromise.class));
+
+        AtomicReference<Object> writtenObject = new AtomicReference<>();
+        CountDownLatch latch = new CountDownLatch(1);
+        doAnswer(invocationOnMock -> {
+            writtenObject.set(invocationOnMock.getArgument(0));
+            latch.countDown();
+            return null;
+        }).when(channel).writeAndFlush(any(), any(ChannelPromise.class));
+
+        processor.run();
+
+        verify(channel, times(1)).writeAndFlush(any(), any(ChannelPromise.class));
+
+        latch.await();
+
+        assertTrue(writtenObject.get() instanceof Response);
+        Response response = (Response) writtenObject.get();
+        assertEquals(BookieProtocol.EREADONLY, response.getErrorCode());
+
+        response.release();
+        response.recycle();
+    }
+
+    @Test
+    public void testHighPriorityWritesOnReadOnlyBookieWhenHighPriorityWritesDisallowed() throws Exception {
+        reinitRequest(BookieProtocol.FLAG_HIGH_PRIORITY);
+
+        when(bookie.isReadOnly()).thenReturn(true);
+        when(bookie.isAvailableForHighPriorityWrites()).thenReturn(false);
+        when(channel.voidPromise()).thenReturn(mock(ChannelPromise.class));
+
+        AtomicReference<Object> writtenObject = new AtomicReference<>();
+        CountDownLatch latch = new CountDownLatch(1);
+        doAnswer(invocationOnMock -> {
+            writtenObject.set(invocationOnMock.getArgument(0));
+            latch.countDown();
+            return null;
+        }).when(channel).writeAndFlush(any(), any(ChannelPromise.class));
+
+        processor.run();
+
+        verify(channel, times(1)).writeAndFlush(any(), any(ChannelPromise.class));
+
+        latch.await();
+
+        assertTrue(writtenObject.get() instanceof Response);
+        Response response = (Response) writtenObject.get();
+        assertEquals(BookieProtocol.EREADONLY, response.getErrorCode());
+
+        response.release();
+        response.recycle();
+    }
+
+    @Test
+    public void testHighPriorityWritesOnReadOnlyBookieWhenHighPriorityWritesAllowed() throws Exception {
+        reinitRequest(BookieProtocol.FLAG_HIGH_PRIORITY);
+
+        when(bookie.isReadOnly()).thenReturn(true);
+        when(bookie.isAvailableForHighPriorityWrites()).thenReturn(true);
+        when(channel.voidPromise()).thenReturn(mock(ChannelPromise.class));
+        doAnswer(invocationOnMock -> {
+            processor.writeComplete(0, request.ledgerId, request.entryId, null, null);
+            return null;
+        }).when(bookie).addEntry(any(ByteBuf.class), eq(false), same(processor), same(channel), eq(new byte[0]));
+
+        AtomicReference<Object> writtenObject = new AtomicReference<>();
+        CountDownLatch latch = new CountDownLatch(1);
+        doAnswer(invocationOnMock -> {
+            writtenObject.set(invocationOnMock.getArgument(0));
+            latch.countDown();
+            return null;
+        }).when(channel).writeAndFlush(any(), any(ChannelPromise.class));
+
+        processor.run();
+
+        verify(bookie, times(1))
+            .addEntry(any(ByteBuf.class), eq(false), same(processor), same(channel), eq(new byte[0]));
+        verify(channel, times(1)).writeAndFlush(any(), any(ChannelPromise.class));
+
+        latch.await();
+
+        assertTrue(writtenObject.get() instanceof Response);
+        Response response = (Response) writtenObject.get();
+        assertEquals(BookieProtocol.EOK, response.getErrorCode());
+
+        response.release();
+        response.recycle();
+    }
+
+    @Test
+    public void testNormalWritesOnWritableBookie() throws Exception {
+        when(bookie.isReadOnly()).thenReturn(false);
+        when(channel.voidPromise()).thenReturn(mock(ChannelPromise.class));
+        doAnswer(invocationOnMock -> {
+            processor.writeComplete(0, request.ledgerId, request.entryId, null, null);
+            return null;
+        }).when(bookie).addEntry(any(ByteBuf.class), eq(false), same(processor), same(channel), eq(new byte[0]));
+
+        AtomicReference<Object> writtenObject = new AtomicReference<>();
+        CountDownLatch latch = new CountDownLatch(1);
+        doAnswer(invocationOnMock -> {
+            writtenObject.set(invocationOnMock.getArgument(0));
+            latch.countDown();
+            return null;
+        }).when(channel).writeAndFlush(any(), any(ChannelPromise.class));
+
+        processor.run();
+
+        verify(bookie, times(1))
+            .addEntry(any(ByteBuf.class), eq(false), same(processor), same(channel), eq(new byte[0]));
+        verify(channel, times(1)).writeAndFlush(any(), any(ChannelPromise.class));
+
+        latch.await();
+
+        assertTrue(writtenObject.get() instanceof Response);
+        Response response = (Response) writtenObject.get();
+        assertEquals(BookieProtocol.EOK, response.getErrorCode());
+
+        response.release();
+        response.recycle();
+    }
+
+}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3Test.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3Test.java
new file mode 100644
index 000000000..8f54ddbc4
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3Test.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.proto;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.same;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.protobuf.ByteString;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.DefaultChannelPromise;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.AddRequest;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.BKPacketHeader;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.OperationType;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.ProtocolVersion;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.Request;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.Response;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Unit test {@link WriteEntryProcessor}.
+ */
+public class WriteEntryProcessorV3Test {
+
+    private Request request;
+    private WriteEntryProcessorV3 processor;
+    private Channel channel;
+    private BookieRequestProcessor requestProcessor;
+    private Bookie bookie;
+
+    @Before
+    public void setup() {
+        request = Request.newBuilder()
+            .setHeader(BKPacketHeader.newBuilder()
+                .setTxnId(System.currentTimeMillis())
+                .setVersion(ProtocolVersion.VERSION_THREE)
+                .setOperation(OperationType.ADD_ENTRY)
+                .build())
+            .setAddRequest(AddRequest.newBuilder()
+                .setLedgerId(System.currentTimeMillis())
+                .setEntryId(System.currentTimeMillis() + 1)
+                .setBody(ByteString.copyFromUtf8("test-entry-data"))
+                .setMasterKey(ByteString.copyFrom(new byte[0]))
+                .build())
+            .build();
+        channel = mock(Channel.class);
+        bookie = mock(Bookie.class);
+        requestProcessor = mock(BookieRequestProcessor.class);
+        when(requestProcessor.getBookie()).thenReturn(bookie);
+        when(requestProcessor.getAddEntryStats())
+            .thenReturn(NullStatsLogger.INSTANCE.getOpStatsLogger("add_entry"));
+        when(requestProcessor.getAddRequestStats())
+            .thenReturn(NullStatsLogger.INSTANCE.getOpStatsLogger("add_requests"));
+        processor = new WriteEntryProcessorV3(
+            request,
+            channel,
+            requestProcessor);
+    }
+
+    private void reinitRequest(int priority) {
+        request = Request.newBuilder(request)
+            .setHeader(BKPacketHeader.newBuilder(request.getHeader())
+                .setPriority(priority)
+                .build())
+            .build();
+
+        processor = new WriteEntryProcessorV3(
+            request,
+            channel,
+            requestProcessor);
+    }
+
+    @Test
+    public void testNoneHighPriorityWritesOnReadOnlyBookie() throws Exception {
+        when(bookie.isReadOnly()).thenReturn(true);
+        when(channel.voidPromise()).thenReturn(mock(ChannelPromise.class));
+
+        ChannelPromise promise = new DefaultChannelPromise(channel);
+        AtomicReference<Object> writtenObject = new AtomicReference<>();
+        CountDownLatch latch = new CountDownLatch(1);
+        doAnswer(invocationOnMock -> {
+            writtenObject.set(invocationOnMock.getArgument(0));
+            latch.countDown();
+            return promise;
+        }).when(channel).writeAndFlush(any());
+
+        processor.run();
+
+        verify(channel, times(1)).writeAndFlush(any(Response.class));
+
+        latch.await();
+
+        assertTrue(writtenObject.get() instanceof Response);
+        Response response = (Response) writtenObject.get();
+        assertEquals(StatusCode.EREADONLY, response.getStatus());
+    }
+
+    @Test
+    public void testHighPriorityWritesOnReadOnlyBookieWhenHighPriorityWritesDisallowed() throws Exception {
+        reinitRequest(100);
+
+        when(bookie.isReadOnly()).thenReturn(true);
+        when(bookie.isAvailableForHighPriorityWrites()).thenReturn(false);
+        when(channel.voidPromise()).thenReturn(mock(ChannelPromise.class));
+
+        ChannelPromise promise = new DefaultChannelPromise(channel);
+        AtomicReference<Object> writtenObject = new AtomicReference<>();
+        CountDownLatch latch = new CountDownLatch(1);
+        doAnswer(invocationOnMock -> {
+            writtenObject.set(invocationOnMock.getArgument(0));
+            latch.countDown();
+            return promise;
+        }).when(channel).writeAndFlush(any());
+
+        processor.run();
+
+        verify(channel, times(1)).writeAndFlush(any(Response.class));
+
+        latch.await();
+
+        assertTrue(writtenObject.get() instanceof Response);
+        Response response = (Response) writtenObject.get();
+        assertEquals(StatusCode.EREADONLY, response.getStatus());
+    }
+
+    @Test
+    public void testHighPriorityWritesOnReadOnlyBookieWhenHighPriorityWritesAllowed() throws Exception {
+        reinitRequest(BookieProtocol.FLAG_HIGH_PRIORITY);
+
+        when(bookie.isReadOnly()).thenReturn(true);
+        when(bookie.isAvailableForHighPriorityWrites()).thenReturn(true);
+        when(channel.voidPromise()).thenReturn(mock(ChannelPromise.class));
+
+        doAnswer(invocationOnMock -> {
+            WriteCallback wc = invocationOnMock.getArgument(2);
+
+            wc.writeComplete(
+                0,
+                request.getAddRequest().getLedgerId(),
+                request.getAddRequest().getEntryId(),
+                null,
+                null);
+            return null;
+        }).when(bookie).addEntry(
+            any(ByteBuf.class),
+            eq(false),
+            any(WriteCallback.class),
+            same(channel),
+            eq(new byte[0]));
+
+        ChannelPromise promise = new DefaultChannelPromise(channel);
+        AtomicReference<Object> writtenObject = new AtomicReference<>();
+        CountDownLatch latch = new CountDownLatch(1);
+        doAnswer(invocationOnMock -> {
+            writtenObject.set(invocationOnMock.getArgument(0));
+            latch.countDown();
+            return promise;
+        }).when(channel).writeAndFlush(any());
+
+        processor.run();
+
+        verify(bookie, times(1))
+            .addEntry(any(ByteBuf.class), eq(false), any(WriteCallback.class), same(channel), eq(new byte[0]));
+        verify(channel, times(1)).writeAndFlush(any(Response.class));
+
+        latch.await();
+
+        assertTrue(writtenObject.get() instanceof Response);
+        Response response = (Response) writtenObject.get();
+        assertEquals(StatusCode.EOK, response.getStatus());
+    }
+
+    @Test
+    public void testNormalWritesOnWritableBookie() throws Exception {
+        when(bookie.isReadOnly()).thenReturn(false);
+        when(channel.voidPromise()).thenReturn(mock(ChannelPromise.class));
+        when(channel.writeAndFlush(any())).thenReturn(mock(ChannelPromise.class));
+        doAnswer(invocationOnMock -> {
+            WriteCallback wc = invocationOnMock.getArgument(2);
+
+            wc.writeComplete(
+                0,
+                request.getAddRequest().getLedgerId(),
+                request.getAddRequest().getEntryId(),
+                null,
+                null);
+            return null;
+        }).when(bookie).addEntry(
+            any(ByteBuf.class), eq(false), any(WriteCallback.class), same(channel), eq(new byte[0]));
+
+        ChannelPromise promise = new DefaultChannelPromise(channel);
+        AtomicReference<Object> writtenObject = new AtomicReference<>();
+        CountDownLatch latch = new CountDownLatch(1);
+        doAnswer(invocationOnMock -> {
+            writtenObject.set(invocationOnMock.getArgument(0));
+            latch.countDown();
+            return promise;
+        }).when(channel).writeAndFlush(any());
+
+        processor.run();
+
+        verify(bookie, times(1))
+            .addEntry(any(ByteBuf.class), eq(false), any(WriteCallback.class), same(channel), eq(new byte[0]));
+        verify(channel, times(1)).writeAndFlush(any(Response.class));
+
+        latch.await();
+
+        assertTrue(writtenObject.get() instanceof Response);
+        Response response = (Response) writtenObject.get();
+        assertEquals(StatusCode.EOK, response.getStatus());
+    }
+
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services