You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2018/03/22 06:17:58 UTC

[GitHub] sijie closed pull request #1280: Improve write rejection in DbLedgerStorage

sijie closed pull request #1280: Improve write rejection in DbLedgerStorage 
URL: https://github.com/apache/bookkeeper/pull/1280
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieException.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieException.java
index 99ae39e76..b7428b282 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieException.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieException.java
@@ -82,6 +82,7 @@ public static BookieException create(int code) {
         int CookieNotFoundException = -105;
         int MetadataStoreException = -106;
         int UnknownBookieIdException = -107;
+        int OperationRejectedException = -108;
     }
 
     public void setCode(int code) {
@@ -122,6 +123,9 @@ public String getMessage(int code) {
         case Code.UnknownBookieIdException:
             err = "Unknown bookie id";
             break;
+        case Code.OperationRejectedException:
+            err = "Operation rejected";
+            break;
         default:
             err = "Invalid operation";
             break;
@@ -174,6 +178,22 @@ public LedgerFencedException() {
         }
     }
 
+    /**
+     * Signals that a ledger has been fenced in a bookie. No more entries can be appended to that ledger.
+     */
+    public static class OperationRejectedException extends BookieException {
+        public OperationRejectedException() {
+            super(Code.OperationRejectedException);
+        }
+
+        @Override
+        public Throwable fillInStackTrace() {
+            // Since this exception is a way to signal a specific condition and it's triggered and very specific points,
+            // we can disable stack traces.
+            return null;
+        }
+    }
+
     /**
      * Signal that an invalid cookie is found when starting a bookie.
      *
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java
index f4db7a689..23840be27 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java
@@ -73,7 +73,7 @@ static ByteBuf createLedgerFenceEntry(Long ledgerId) {
      */
     abstract SettableFuture<Boolean> fenceAndLogInJournal(Journal journal) throws IOException;
 
-    abstract long addEntry(ByteBuf entry) throws IOException;
+    abstract long addEntry(ByteBuf entry) throws IOException, BookieException;
     abstract ByteBuf readEntry(long entryId) throws IOException;
 
     abstract long getLastAddConfirmed() throws IOException;
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java
index 84730cb76..a0f34eab6 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java
@@ -138,7 +138,7 @@ ByteBuf getExplicitLac() {
     }
 
     @Override
-    long addEntry(ByteBuf entry) throws IOException {
+    long addEntry(ByteBuf entry) throws IOException, BookieException {
         long ledgerId = entry.getLong(entry.readerIndex());
 
         if (ledgerId != this.ledgerId) {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
index 83ac2c0ae..dcfac3147 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
@@ -100,7 +100,7 @@ void initialize(ServerConfiguration conf,
      *
      * @return the entry id of the entry added
      */
-    long addEntry(ByteBuf entry) throws IOException;
+    long addEntry(ByteBuf entry) throws IOException, BookieException;
 
     /**
      * Read an entry from storage.
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
index ff9cd316a..55b094587 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
@@ -42,13 +42,13 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.StampedLock;
 
 import org.apache.bookkeeper.bookie.Bookie;
 import org.apache.bookkeeper.bookie.Bookie.NoEntryException;
 import org.apache.bookkeeper.bookie.BookieException;
+import org.apache.bookkeeper.bookie.BookieException.OperationRejectedException;
 import org.apache.bookkeeper.bookie.CheckpointSource;
 import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
 import org.apache.bookkeeper.bookie.Checkpointer;
@@ -67,6 +67,7 @@
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.proto.BookieProtocol;
+import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.Gauge;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.OpStatsLogger;
@@ -78,8 +79,7 @@
 import org.slf4j.LoggerFactory;
 
 /**
- * Implementation of LedgerStorage that uses RocksDB to keep the indexes for
- * entries stored in EntryLogs.
+ * Implementation of LedgerStorage that uses RocksDB to keep the indexes for entries stored in EntryLogs.
  */
 public class DbLedgerStorage implements CompactableLedgerStorage {
 
@@ -90,8 +90,7 @@
      *
      * <p>This class is used for holding all the transient states for a given ledger.
      */
-    private static class TransientLedgerInfo
-            extends Watchable<LastAddConfirmedUpdateNotification>
+    private static class TransientLedgerInfo extends Watchable<LastAddConfirmedUpdateNotification>
             implements AutoCloseable {
 
         // lac
@@ -139,8 +138,7 @@ long setLastAddConfirmed(long lac) {
         }
 
         synchronized boolean waitForLastAddConfirmedUpdate(long previousLAC,
-                                                           Watcher<LastAddConfirmedUpdateNotification> watcher)
-                throws IOException {
+                Watcher<LastAddConfirmedUpdateNotification> watcher) throws IOException {
             lastAccessed = System.currentTimeMillis();
             if ((lac != NOT_ASSIGNED_LAC && lac > previousLAC) || isClosed || ledgerIndex.get(ledgerId).getFenced()) {
                 return false;
@@ -155,7 +153,7 @@ public ByteBuf getExplicitLac() {
             synchronized (this) {
                 if (explicitLac != null) {
                     retLac = Unpooled.buffer(explicitLac.capacity());
-                    explicitLac.rewind(); //copy from the beginning
+                    explicitLac.rewind(); // copy from the beginning
                     retLac.writeBytes(explicitLac);
                     explicitLac.rewind();
                     return retLac;
@@ -194,13 +192,13 @@ void notifyWatchers(long lastAddConfirmed) {
 
         @Override
         public void close() {
-           synchronized (this) {
-               if (isClosed) {
-                   return;
-               }
-               isClosed = true;
-           }
-           // notify watchers
+            synchronized (this) {
+                if (isClosed) {
+                    return;
+                }
+                isClosed = true;
+            }
+            // notify watchers
             notifyWatchers(Long.MAX_VALUE);
         }
 
@@ -217,16 +215,15 @@ public void close() {
     private GarbageCollectorThread gcThread;
 
     // Write cache where all new entries are inserted into
-    protected WriteCache writeCache;
+    protected volatile WriteCache writeCache;
 
     // Write cache that is used to swap with writeCache during flushes
-    protected WriteCache writeCacheBeingFlushed;
+    protected volatile WriteCache writeCacheBeingFlushed;
 
     // Cache where we insert entries for speculative reading
     private ReadCache readCache;
 
-    private final ReentrantReadWriteLock writeCacheMutex = new ReentrantReadWriteLock();
-    private final Condition flushWriteCacheCondition = writeCacheMutex.writeLock().newCondition();
+    private final StampedLock writeCacheRotationLock = new StampedLock();
 
     protected final ReentrantLock flushMutex = new ReentrantLock();
 
@@ -243,10 +240,14 @@ public void close() {
     static final String READ_AHEAD_CACHE_BATCH_SIZE = "dbStorage_readAheadCacheBatchSize";
     static final String READ_AHEAD_CACHE_MAX_SIZE_MB = "dbStorage_readAheadCacheMaxSizeMb";
 
+    static final String MAX_THROTTLE_TIME_MILLIS = "dbStorage_maxThrottleTimeMs";
+
     private static final long DEFAULT_WRITE_CACHE_MAX_SIZE_MB = 16;
     private static final long DEFAULT_READ_CACHE_MAX_SIZE_MB = 16;
     private static final int DEFAULT_READ_AHEAD_CACHE_BATCH_SIZE = 100;
 
+    private static final long DEFAUL_MAX_THROTTLE_TIME_MILLIS = TimeUnit.SECONDS.toMillis(10);
+
     private static final int MB = 1024 * 1024;
 
     private final CopyOnWriteArrayList<LedgerDeletionListener> ledgerDeletionListeners = Lists
@@ -260,6 +261,8 @@ public void close() {
     private long readCacheMaxSize;
     private int readAheadCacheBatchSize;
 
+    private long maxThrottleTimeNanos;
+
     private StatsLogger stats;
 
     private OpStatsLogger addEntryStats;
@@ -271,10 +274,13 @@ public void close() {
     private OpStatsLogger flushStats;
     private OpStatsLogger flushSizeStats;
 
+    private Counter throttledWriteRequests;
+    private Counter rejectedWriteRequests;
+
     @Override
     public void initialize(ServerConfiguration conf, LedgerManager ledgerManager, LedgerDirsManager ledgerDirsManager,
-        LedgerDirsManager indexDirsManager, StateManager stateManager, CheckpointSource checkpointSource,
-                           Checkpointer checkpointer, StatsLogger statsLogger) throws IOException {
+            LedgerDirsManager indexDirsManager, StateManager stateManager, CheckpointSource checkpointSource,
+            Checkpointer checkpointer, StatsLogger statsLogger) throws IOException {
         checkArgument(ledgerDirsManager.getAllLedgerDirs().size() == 1,
                 "Db implementation only allows for one storage dir");
 
@@ -290,6 +296,9 @@ public void initialize(ServerConfiguration conf, LedgerManager ledgerManager, Le
         readCacheMaxSize = conf.getLong(READ_AHEAD_CACHE_MAX_SIZE_MB, DEFAULT_READ_CACHE_MAX_SIZE_MB) * MB;
         readAheadCacheBatchSize = conf.getInt(READ_AHEAD_CACHE_BATCH_SIZE, DEFAULT_READ_AHEAD_CACHE_BATCH_SIZE);
 
+        long maxThrottleTimeMillis = conf.getLong(MAX_THROTTLE_TIME_MILLIS, DEFAUL_MAX_THROTTLE_TIME_MILLIS);
+        maxThrottleTimeNanos = TimeUnit.MILLISECONDS.toNanos(maxThrottleTimeMillis);
+
         readCache = new ReadCache(readCacheMaxSize);
 
         this.stats = statsLogger;
@@ -381,6 +390,9 @@ public Long getSample() {
         readAheadBatchSizeStats = stats.getOpStatsLogger("readahead-batch-size");
         flushStats = stats.getOpStatsLogger("flush");
         flushSizeStats = stats.getOpStatsLogger("flush-size");
+
+        throttledWriteRequests = stats.getCounter("throttled-write-requests");
+        rejectedWriteRequests = stats.getCounter("rejected-write-requests");
     }
 
     @Override
@@ -467,7 +479,7 @@ public void setMasterKey(long ledgerId, byte[] masterKey) throws IOException {
     }
 
     @Override
-    public long addEntry(ByteBuf entry) throws IOException {
+    public long addEntry(ByteBuf entry) throws IOException, BookieException {
         long startTime = MathUtils.nowInNano();
 
         long ledgerId = entry.getLong(entry.readerIndex());
@@ -478,13 +490,23 @@ public long addEntry(ByteBuf entry) throws IOException {
             log.debug("Add entry. {}@{}, lac = {}", ledgerId, entryId, lac);
         }
 
-        // Waits if the write cache is being switched for a flush
-        writeCacheMutex.readLock().lock();
-        boolean inserted;
-        try {
-            inserted = writeCache.put(ledgerId, entryId, entry);
-        } finally {
-            writeCacheMutex.readLock().unlock();
+        // First we try to do an optimistic locking to get access to the current write cache.
+        // This is based on the fact that the write cache is only being rotated (swapped) every 1 minute. During the
+        // rest of the time, we can have multiple thread using the optimistic lock here without interfering.
+        long stamp = writeCacheRotationLock.tryOptimisticRead();
+        boolean inserted = false;
+
+        inserted = writeCache.put(ledgerId, entryId, entry);
+        if (!writeCacheRotationLock.validate(stamp)) {
+            // The write cache was rotated while we were inserting. We need to acquire the proper read lock and repeat
+            // the operation because we might have inserted in a write cache that was already being flushed and cleared,
+            // without being sure about this last entry being flushed or not.
+            stamp = writeCacheRotationLock.readLock();
+            try {
+                inserted = writeCache.put(ledgerId, entryId, entry);
+            } finally {
+                 writeCacheRotationLock.unlockRead(stamp);
+            }
         }
 
         if (!inserted) {
@@ -498,46 +520,49 @@ public long addEntry(ByteBuf entry) throws IOException {
         return entryId;
     }
 
-    private void triggerFlushAndAddEntry(long ledgerId, long entryId, ByteBuf entry) throws IOException {
+    private void triggerFlushAndAddEntry(long ledgerId, long entryId, ByteBuf entry)
+            throws IOException, BookieException {
         // Write cache is full, we need to trigger a flush so that it gets rotated
-        writeCacheMutex.writeLock().lock();
+        // If the flush has already been triggered or flush has already switched the
+        // cache, we don't need to trigger another flush
+        if (!isFlushOngoing.get() && hasFlushBeenTriggered.compareAndSet(false, true)) {
+            // Trigger an early flush in background
+            log.info("Write cache is full, triggering flush");
+            executor.execute(() -> {
+                try {
+                    flush();
+                } catch (IOException e) {
+                    log.error("Error during flush", e);
+                }
+            });
+        }
 
-        try {
-            // If the flush has already been triggered or flush has already switched the
-            // cache, we don't need to
-            // trigger another flush
-            if (!isFlushOngoing.get() && hasFlushBeenTriggered.compareAndSet(false, true)) {
-                // Trigger an early flush in background
-                log.info("Write cache is full, triggering flush");
-                executor.execute(() -> {
-                    try {
-                        flush();
-                    } catch (IOException e) {
-                        log.error("Error during flush", e);
-                    }
-                });
-            }
+        throttledWriteRequests.inc();
+        long absoluteTimeoutNanos = System.nanoTime() + maxThrottleTimeNanos;
 
-            long timeoutNs = TimeUnit.MILLISECONDS.toNanos(100);
-            while (hasFlushBeenTriggered.get()) {
-                if (timeoutNs <= 0L) {
-                    throw new IOException("Write cache was not trigger within the timeout, cannot add entry " + ledgerId
-                            + "@" + entryId);
+        while (System.nanoTime() < absoluteTimeoutNanos) {
+            long stamp = writeCacheRotationLock.readLock();
+            try {
+                if (writeCache.put(ledgerId, entryId, entry)) {
+                    // We succeeded in putting the entry in write cache in the
+                    return;
                 }
-                timeoutNs = flushWriteCacheCondition.awaitNanos(timeoutNs);
+            } finally {
+                 writeCacheRotationLock.unlockRead(stamp);
             }
 
-            if (!writeCache.put(ledgerId, entryId, entry)) {
-                // Still wasn't able to cache entry
-                throw new IOException("Error while inserting entry in write cache" + ledgerId + "@" + entryId);
+            // Wait some time and try again
+            try {
+                Thread.sleep(1);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                throw new IOException("Interrupted when adding entry " + ledgerId + "@" + entryId);
             }
-
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new IOException("Interrupted when adding entry " + ledgerId + "@" + entryId);
-        } finally {
-            writeCacheMutex.writeLock().unlock();
         }
+
+        // Timeout expired and we weren't able to insert in write cache
+        rejectedWriteRequests.inc();
+        throw new OperationRejectedException();
     }
 
     @Override
@@ -551,29 +576,41 @@ public ByteBuf getEntry(long ledgerId, long entryId) throws IOException {
             return getLastEntry(ledgerId);
         }
 
-        writeCacheMutex.readLock().lock();
-        try {
-            // First try to read from the write cache of recent entries
-            ByteBuf entry = writeCache.get(ledgerId, entryId);
-            if (entry != null) {
-                recordSuccessfulEvent(readCacheHitStats, startTime);
-                recordSuccessfulEvent(readEntryStats, startTime);
-                return entry;
+        // We need to try to read from both write caches, since recent entries could be found in either of the two. The
+        // write caches are already thread safe on their own, here we just need to make sure we get references to both
+        // of them. Using an optimistic lock since the read lock is always free, unless we're swapping the caches.
+        long stamp = writeCacheRotationLock.tryOptimisticRead();
+        WriteCache localWriteCache = writeCache;
+        WriteCache localWriteCacheBeingFlushed = writeCacheBeingFlushed;
+        if (!writeCacheRotationLock.validate(stamp)) {
+            // Fallback to regular read lock approach
+            stamp = writeCacheRotationLock.readLock();
+            try {
+                localWriteCache = writeCache;
+                localWriteCacheBeingFlushed = writeCacheBeingFlushed;
+            } finally {
+                writeCacheRotationLock.unlockRead(stamp);
             }
+        }
 
-            // If there's a flush going on, the entry might be in the flush buffer
-            entry = writeCacheBeingFlushed.get(ledgerId, entryId);
-            if (entry != null) {
-                recordSuccessfulEvent(readCacheHitStats, startTime);
-                recordSuccessfulEvent(readEntryStats, startTime);
-                return entry;
-            }
-        } finally {
-            writeCacheMutex.readLock().unlock();
+        // First try to read from the write cache of recent entries
+        ByteBuf entry = localWriteCache.get(ledgerId, entryId);
+        if (entry != null) {
+            recordSuccessfulEvent(readCacheHitStats, startTime);
+            recordSuccessfulEvent(readEntryStats, startTime);
+            return entry;
+        }
+
+        // If there's a flush going on, the entry might be in the flush buffer
+        entry = localWriteCacheBeingFlushed.get(ledgerId, entryId);
+        if (entry != null) {
+            recordSuccessfulEvent(readCacheHitStats, startTime);
+            recordSuccessfulEvent(readEntryStats, startTime);
+            return entry;
         }
 
         // Try reading from read-ahead cache
-        ByteBuf entry = readCache.get(ledgerId, entryId);
+        entry = readCache.get(ledgerId, entryId);
         if (entry != null) {
             recordSuccessfulEvent(readCacheHitStats, startTime);
             recordSuccessfulEvent(readEntryStats, startTime);
@@ -650,7 +687,7 @@ private void fillReadAheadCache(long orginalLedgerId, long firstEntryId, long fi
     public ByteBuf getLastEntry(long ledgerId) throws IOException {
         long startTime = MathUtils.nowInNano();
 
-        writeCacheMutex.readLock().lock();
+        long stamp = writeCacheRotationLock.readLock();
         try {
             // First try to read from the write cache of recent entries
             ByteBuf entry = writeCache.getLastEntry(ledgerId);
@@ -687,7 +724,7 @@ public ByteBuf getLastEntry(long ledgerId) throws IOException {
                 return entry;
             }
         } finally {
-            writeCacheMutex.readLock().unlock();
+            writeCacheRotationLock.unlockRead(stamp);
         }
 
         // Search the last entry in storage
@@ -706,11 +743,11 @@ public ByteBuf getLastEntry(long ledgerId) throws IOException {
 
     @VisibleForTesting
     boolean isFlushRequired() {
-        writeCacheMutex.readLock().lock();
+        long stamp = writeCacheRotationLock.readLock();
         try {
             return !writeCache.isEmpty();
         } finally {
-            writeCacheMutex.readLock().unlock();
+            writeCacheRotationLock.unlockRead(stamp);
         }
     }
 
@@ -810,7 +847,7 @@ public void checkpoint(Checkpoint checkpoint) throws IOException {
      * Swap the current write cache with the replacement cache.
      */
     private void swapWriteCache() {
-        writeCacheMutex.writeLock().lock();
+        long stamp = writeCacheRotationLock.writeLock();
         try {
             // First, swap the current write-cache map with an empty one so that writes will
             // go on unaffected. Only a single flush is happening at the same time
@@ -820,12 +857,11 @@ private void swapWriteCache() {
 
             // since the cache is switched, we can allow flush to be triggered
             hasFlushBeenTriggered.set(false);
-            flushWriteCacheCondition.signalAll();
         } finally {
             try {
                 isFlushOngoing.set(true);
             } finally {
-                writeCacheMutex.writeLock().unlock();
+                writeCacheRotationLock.unlockWrite(stamp);
             }
         }
     }
@@ -844,11 +880,11 @@ public void deleteLedger(long ledgerId) throws IOException {
         }
 
         // Delete entries from this ledger that are still in the write cache
-        writeCacheMutex.readLock().lock();
+        long stamp = writeCacheRotationLock.readLock();
         try {
             writeCache.deleteLedger(ledgerId);
         } finally {
-            writeCacheMutex.readLock().unlock();
+            writeCacheRotationLock.unlockRead(stamp);
         }
 
         entryLocationIndex.delete(ledgerId);
@@ -944,7 +980,6 @@ private void updateCachedLacIfNeeded(long ledgerId, long lac) {
         }
     }
 
-
     @Override
     public void flushEntriesLocationsIndex() throws IOException {
         // No-op. Location index is already flushed in updateEntriesLocations() call
@@ -953,8 +988,7 @@ public void flushEntriesLocationsIndex() throws IOException {
     /**
      * Add an already existing ledger to the index.
      *
-     * <p>This method is only used as a tool to help the migration from
-     * InterleaveLedgerStorage to DbLedgerStorage
+     * <p>This method is only used as a tool to help the migration from InterleaveLedgerStorage to DbLedgerStorage
      *
      * @param ledgerId
      *            the ledger id
@@ -988,6 +1022,7 @@ public long addLedgerToIndex(long ledgerId, boolean isFenced, byte[] masterKey,
 
         return numberOfEntries.get();
     }
+
     @Override
     public void registerLedgerDeletionListener(LedgerDeletionListener listener) {
         ledgerDeletionListeners.add(listener);
@@ -1020,7 +1055,7 @@ public static void readLedgerIndexEntries(long ledgerId, ServerConfiguration ser
         checkNotNull(processor, "LedgerLoggger info processor can't null");
 
         LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(serverConf, serverConf.getLedgerDirs(),
-            new DiskChecker(serverConf.getDiskUsageThreshold(), serverConf.getDiskUsageWarnThreshold()));
+                new DiskChecker(serverConf.getDiskUsageThreshold(), serverConf.getDiskUsageWarnThreshold()));
         String ledgerBasePath = ledgerDirsManager.getAllLedgerDirs().get(0).toString();
 
         EntryLocationIndex entryLocationIndex = new EntryLocationIndex(serverConf,
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
index 993700092..eaf2473a2 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
@@ -20,12 +20,12 @@
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.Channel;
 import io.netty.util.Recycler;
-import io.netty.util.Recycler.Handle;
 
 import java.io.IOException;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.bookkeeper.bookie.BookieException;
+import org.apache.bookkeeper.bookie.BookieException.OperationRejectedException;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookieProtocol.ParsedAddRequest;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
@@ -76,6 +76,13 @@ protected void processPacket() {
             } else {
                 requestProcessor.bookie.addEntry(addData, false, this, channel, request.getMasterKey());
             }
+        } catch (OperationRejectedException e) {
+            // Avoid to log each occurence of this exception as this can happen when the ledger storage is
+            // unable to keep up with the write rate.
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Operation rejected while writing {}", request, e);
+            }
+            rc = BookieProtocol.EIO;
         } catch (IOException e) {
             LOG.error("Error writing {}", request, e);
             rc = BookieProtocol.EIO;
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
index 7bd78d3f4..0854bf538 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
@@ -29,6 +29,7 @@
 import java.util.concurrent.TimeUnit;
 
 import org.apache.bookkeeper.bookie.BookieException;
+import org.apache.bookkeeper.bookie.BookieException.OperationRejectedException;
 import org.apache.bookkeeper.client.api.WriteFlag;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.AddRequest;
@@ -120,6 +121,13 @@ public void writeComplete(int rc, long ledgerId, long entryId,
                 requestProcessor.bookie.addEntry(entryToAdd, ackBeforeSync, wcb, channel, masterKey);
             }
             status = StatusCode.EOK;
+        } catch (OperationRejectedException e) {
+            // Avoid to log each occurence of this exception as this can happen when the ledger storage is
+            // unable to keep up with the write rate.
+            if (logger.isDebugEnabled()) {
+                logger.debug("Operation rejected while writing {}", request, e);
+            }
+            status = StatusCode.EIO;
         } catch (IOException e) {
             logger.error("Error writing entry:{} to ledger:{}",
                     entryId, ledgerId, e);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
index d4a98cee0..cc02e4e8c 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
@@ -413,7 +413,7 @@ public void testGetEntryLogsSet() throws Exception {
         }
 
         @Override
-        public Boolean call() throws IOException {
+        public Boolean call() throws IOException, BookieException {
             try {
                 ledgerStorage.addEntry(generateEntry(ledgerId, entryId));
             } catch (IOException e) {
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWriteCacheTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWriteCacheTest.java
index df810826d..4894814d7 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWriteCacheTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWriteCacheTest.java
@@ -30,6 +30,7 @@
 import java.io.IOException;
 
 import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.bookie.BookieException.OperationRejectedException;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.conf.TestBKConfiguration;
 import org.junit.After;
@@ -85,6 +86,7 @@ public void setup() throws Exception {
         conf.setGcWaitTime(gcWaitTime);
         conf.setLedgerStorageClass(MockedDbLedgerStorage.class.getName());
         conf.setProperty(DbLedgerStorage.WRITE_CACHE_MAX_SIZE_MB, 1);
+        conf.setProperty(DbLedgerStorage.MAX_THROTTLE_TIME_MILLIS, 1000);
         conf.setLedgerDirNames(new String[] { tmpDir.toString() });
         Bookie bookie = new Bookie(conf);
 
@@ -131,7 +133,7 @@ public void writeCacheFull() throws Exception {
         try {
             storage.addEntry(entry);
             fail("Should have thrown exception");
-        } catch (IOException e) {
+        } catch (OperationRejectedException e) {
             // Expected
         }
     }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services