You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2017/11/13 19:02:09 UTC

[GitHub] sijie closed pull request #706: Bookies should not queue read request indefinitely

sijie closed pull request #706: Bookies should not queue read request indefinitely
URL: https://github.com/apache/bookkeeper/pull/706
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bookkeeper-common/pom.xml b/bookkeeper-common/pom.xml
index b369630e8..9051b59a6 100644
--- a/bookkeeper-common/pom.xml
+++ b/bookkeeper-common/pom.xml
@@ -47,6 +47,11 @@
       <version>${commons-lang3.version}</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>com.google.errorprone</groupId>
+      <artifactId>error_prone_annotations</artifactId>
+      <version>2.1.2</version>
+    </dependency>
   </dependencies>
   <build>
     <plugins>
diff --git a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/BoundedScheduledExecutorService.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/BoundedScheduledExecutorService.java
new file mode 100644
index 000000000..b7b91510e
--- /dev/null
+++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/BoundedScheduledExecutorService.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.common.util;
+
+import com.google.common.util.concurrent.ForwardingListeningExecutorService;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListenableScheduledFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.ListeningScheduledExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Implements {@link ListeningScheduledExecutorService} and allows limiting the number
+ * of tasks to be scheduled in the thread's queue.
+ *
+ */
+public class BoundedScheduledExecutorService extends ForwardingListeningExecutorService
+        implements ListeningScheduledExecutorService {
+    BlockingQueue<Runnable> queue;
+    ListeningScheduledExecutorService thread;
+    int maxTasksInQueue;
+
+    public BoundedScheduledExecutorService(ScheduledThreadPoolExecutor thread, int maxTasksInQueue) {
+        this.queue = thread.getQueue();
+        this.thread = MoreExecutors.listeningDecorator(thread);
+        this.maxTasksInQueue = maxTasksInQueue;
+    }
+
+    @Override
+    protected ListeningExecutorService delegate() {
+        return this.thread;
+    }
+
+    @Override
+    public ListenableScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
+        this.checkQueue();
+        return this.thread.schedule(command, delay, unit);
+    }
+
+    @Override
+    public <V> ListenableScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
+        this.checkQueue();
+        return this.thread.schedule(callable, delay, unit);
+    }
+
+    @Override
+    public ListenableScheduledFuture<?> scheduleAtFixedRate(Runnable command,
+                                                            long initialDelay, long period, TimeUnit unit) {
+        this.checkQueue();
+        return this.thread.scheduleAtFixedRate(command, initialDelay, period, unit);
+    }
+
+    @Override
+    public ListenableScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
+                                                               long initialDelay, long delay, TimeUnit unit) {
+        this.checkQueue();
+        return this.thread.scheduleAtFixedRate(command, initialDelay, delay, unit);
+    }
+
+    @Override
+    public <T> ListenableFuture<T> submit(Callable<T> task) {
+        this.checkQueue();
+        return super.submit(task);
+    }
+
+    @Override
+    public ListenableFuture<?> submit(Runnable task) {
+        this.checkQueue();
+        return super.submit(task);
+    }
+
+    @Override
+    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
+        this.checkQueue();
+        return super.invokeAll(tasks);
+    }
+
+    @Override
+    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
+                                         long timeout, TimeUnit unit) throws InterruptedException {
+        this.checkQueue();
+        return super.invokeAll(tasks, timeout, unit);
+    }
+
+    @Override
+    public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
+        this.checkQueue();
+        return super.invokeAny(tasks);
+    }
+
+    @Override
+    public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout,
+                           TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+        this.checkQueue();
+        return super.invokeAny(tasks, timeout, unit);
+    }
+
+    @Override
+    public <T> ListenableFuture<T> submit(Runnable task, T result) {
+        this.checkQueue();
+        return super.submit(task, result);
+    }
+
+    @Override
+    public void execute(Runnable command) {
+        this.checkQueue();
+        super.execute(command);
+    }
+
+    private void checkQueue() {
+        if (this.maxTasksInQueue > 0 && this.queue.size() >= this.maxTasksInQueue) {
+            throw new RejectedExecutionException("Queue at limit of " + this.maxTasksInQueue + " items");
+        }
+    }
+
+}
+
diff --git a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/OrderedScheduler.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/OrderedScheduler.java
index 41a7fa06f..fb07b1f73 100644
--- a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/OrderedScheduler.java
+++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/OrderedScheduler.java
@@ -21,7 +21,6 @@
 
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningScheduledExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import java.util.Random;
 import java.util.concurrent.ExecutionException;
@@ -31,6 +30,7 @@
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
+
 import org.apache.bookkeeper.stats.Gauge;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.OpStatsLogger;
@@ -52,7 +52,7 @@
  * method.
  */
 public class OrderedScheduler {
-
+    public static final int NO_TASK_LIMIT = -1;
     protected static final long WARN_TIME_MICRO_SEC_DEFAULT = TimeUnit.SECONDS.toMicros(1);
 
     final String name;
@@ -63,6 +63,7 @@
     final OpStatsLogger taskPendingStats;
     final boolean traceTaskExecution;
     final long warnTimeMicroSec;
+    final int maxTasksInQueue;
 
     /**
      * Create a builder to build ordered scheduler.
@@ -88,6 +89,7 @@ public static SchedulerBuilder newSchedulerBuilder() {
         protected StatsLogger statsLogger = NullStatsLogger.INSTANCE;
         protected boolean traceTaskExecution = false;
         protected long warnTimeMicroSec = WARN_TIME_MICRO_SEC_DEFAULT;
+        protected int maxTasksInQueue = NO_TASK_LIMIT;
 
         public AbstractBuilder<T> name(String name) {
             this.name = name;
@@ -99,6 +101,11 @@ public static SchedulerBuilder newSchedulerBuilder() {
             return this;
         }
 
+        public AbstractBuilder<T> maxTasksInQueue(int num) {
+            this.maxTasksInQueue = num;
+            return this;
+        }
+
         public AbstractBuilder<T> threadFactory(ThreadFactory threadFactory) {
             this.threadFactory = threadFactory;
             return this;
@@ -130,7 +137,8 @@ public T build() {
                 threadFactory,
                 statsLogger,
                 traceTaskExecution,
-                warnTimeMicroSec);
+                warnTimeMicroSec,
+                maxTasksInQueue);
         }
 
     }
@@ -179,21 +187,24 @@ protected OrderedScheduler(String baseName,
                                ThreadFactory threadFactory,
                                StatsLogger statsLogger,
                                boolean traceTaskExecution,
-                               long warnTimeMicroSec) {
+                               long warnTimeMicroSec,
+                               int maxTasksInQueue) {
         checkArgument(numThreads > 0);
         checkArgument(!StringUtils.isBlank(baseName));
 
+        this.maxTasksInQueue = maxTasksInQueue;
         this.warnTimeMicroSec = warnTimeMicroSec;
         name = baseName;
         threads = new ListeningScheduledExecutorService[numThreads];
         threadIds = new long[numThreads];
         for (int i = 0; i < numThreads; i++) {
-            final ScheduledThreadPoolExecutor thread =  new ScheduledThreadPoolExecutor(1,
+            final ScheduledThreadPoolExecutor thread = new ScheduledThreadPoolExecutor(1,
                     new ThreadFactoryBuilder()
                         .setNameFormat(name + "-" + getClass().getSimpleName() + "-" + i + "-%d")
                         .setThreadFactory(threadFactory)
                         .build());
-            threads[i] = MoreExecutors.listeningDecorator(thread);
+            threads[i] = new BoundedScheduledExecutorService(thread, this.maxTasksInQueue);
+
             final int idx = i;
             try {
                 threads[idx].submit(new SafeRunnable() {
diff --git a/bookkeeper-proto/src/main/proto/BookkeeperProtocol.proto b/bookkeeper-proto/src/main/proto/BookkeeperProtocol.proto
index b43e69148..38ed3c5d6 100644
--- a/bookkeeper-proto/src/main/proto/BookkeeperProtocol.proto
+++ b/bookkeeper-proto/src/main/proto/BookkeeperProtocol.proto
@@ -45,6 +45,7 @@ enum StatusCode {
     EBADVERSION = 503;
     EFENCED = 504;
     EREADONLY = 505;
+    ETOOMANYREQUESTS = 506;
 }
 
 /**
diff --git a/bookkeeper-server/conf/bk_server.conf b/bookkeeper-server/conf/bk_server.conf
index 11cbff271..b4f37e441 100755
--- a/bookkeeper-server/conf/bk_server.conf
+++ b/bookkeeper-server/conf/bk_server.conf
@@ -147,6 +147,14 @@ journalDirectory=/tmp/bk-txn
 # be handled by netty threads directly.
 # numReadWorkerThreads=1
 
+# If read workers threads are enabled, limit the number of pending requests, to
+# avoid the executor queue to grow indefinitely
+# maxPendingReadRequestsPerThread=10000
+
+# If add workers threads are enabled, limit the number of pending requests, to
+# avoid the executor queue to grow indefinitely
+# maxPendingAddRequestsPerThread=10000
+
 # Whether force compaction is allowed when the disk is full or almost full.
 # Forcing GC may get some space back, but may also fill up disk space more quickly.
 # This is because new log files are created before GC, while old garbage
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
index ea60a720a..5ff664106 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
@@ -83,6 +83,8 @@ public static BKException create(int code) {
             return new BKUnclosedFragmentException();
         case Code.WriteOnReadOnlyBookieException:
             return new BKWriteOnReadOnlyBookieException();
+        case Code.TooManyRequestsException:
+            return new BKTooManyRequestsException();
         case Code.ReplicationException:
             return new BKReplicationException();
         case Code.ClientClosedException:
@@ -280,6 +282,12 @@ public BKWriteOnReadOnlyBookieException() {
         }
     }
 
+    public static class BKTooManyRequestsException extends BKException {
+        public BKTooManyRequestsException() {
+            super(Code.TooManyRequestsException);
+        }
+    }
+
     public static class BKReplicationException extends BKException {
         public BKReplicationException() {
             super(Code.ReplicationException);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/BKException.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/BKException.java
index 737509db8..cf7eda2d3 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/BKException.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/BKException.java
@@ -214,7 +214,10 @@ public static String getMessage(int code) {
          * Write operations failed due to bookies are readonly.
          */
         int WriteOnReadOnlyBookieException = -104;
-        //-105 reserved for TooManyRequestsException
+        /**
+         * Operations failed due to too many requests in the queue.
+         */
+        int TooManyRequestsException = -105;
         /**
          * Ledger id overflow happens on ledger manager.
          *
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
index 8dcb1b320..ad7002b6c 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
@@ -127,6 +127,8 @@
     // Worker Thread parameters.
     protected final static String NUM_ADD_WORKER_THREADS = "numAddWorkerThreads";
     protected final static String NUM_READ_WORKER_THREADS = "numReadWorkerThreads";
+    protected final static String MAX_PENDING_READ_REQUESTS_PER_THREAD = "maxPendingReadRequestsPerThread";
+    protected final static String MAX_PENDING_ADD_REQUESTS_PER_THREAD = "maxPendingAddRequestsPerThread";
     protected final static String NUM_LONG_POLL_WORKER_THREADS = "numLongPollWorkerThreads";
 
     // Long poll parameters
@@ -1350,6 +1352,48 @@ public ServerConfiguration setRequestTimerTickDurationMs(int tickDuration) {
     }
 
     /**
+     * Set the max number of pending read requests for each read worker thread. After the quota is reached, new requests
+     * will be failed immediately
+     *
+     * @param maxPendingReadRequestsPerThread
+     * @return server configuration
+     */
+    public ServerConfiguration setMaxPendingReadRequestPerThread(int maxPendingReadRequestsPerThread) {
+        setProperty(MAX_PENDING_READ_REQUESTS_PER_THREAD, maxPendingReadRequestsPerThread);
+        return this;
+    }
+
+    /**
+     * If read workers threads are enabled, limit the number of pending requests, to avoid the executor queue to grow
+     * indefinitely (default: 10000 entries)
+     */
+    public int getMaxPendingReadRequestPerThread() {
+        return getInt(MAX_PENDING_READ_REQUESTS_PER_THREAD, 10000);
+    }
+
+    /**
+     * Set the max number of pending add requests for each add worker thread. After the quota is reached, new requests
+     * will be failed immediately
+     *
+     * @param maxPendingAddRequestsPerThread
+     * @return server configuration
+     */
+    public ServerConfiguration setMaxPendingAddRequestPerThread(int maxPendingAddRequestsPerThread) {
+        setProperty(MAX_PENDING_ADD_REQUESTS_PER_THREAD, maxPendingAddRequestsPerThread);
+        return this;
+    }
+
+    /**
+     * If add workers threads are enabled, limit the number of pending requests, to avoid the executor queue to grow
+     * indefinitely (default: 10000 entries)
+     */
+    public int getMaxPendingAddRequestPerThread() {
+        return getInt(MAX_PENDING_ADD_REQUESTS_PER_THREAD, 10000);
+    }
+
+
+
+    /**
      * Get the tick duration in milliseconds.
      * @return
      */
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
index 2c7a82835..879489ad3 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
@@ -175,6 +175,11 @@ public static short getFlags(int packetHeader) {
      */
     public static final int EREADONLY = 105;
 
+    /**
+     * Too many concurrent requests
+     */
+    public static final int ETOOMANYREQUESTS = 106;
+
     public static final short FLAG_NONE = 0x0;
     public static final short FLAG_DO_FENCING = 0x0001;
     public static final short FLAG_RECOVERY_ADD = 0x0002;
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
index 81208f3d1..7c5114350 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
@@ -28,10 +28,12 @@
 import io.netty.util.concurrent.GenericFutureListener;
 import io.netty.util.HashedWheelTimer;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 import org.apache.bookkeeper.auth.AuthProviderFactoryFactory;
 import org.apache.bookkeeper.auth.AuthToken;
 import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.processor.RequestProcessor;
 import org.apache.bookkeeper.stats.Counter;
@@ -135,12 +137,14 @@ public BookieRequestProcessor(ServerConfiguration serverCfg, Bookie bookie,
             StatsLogger statsLogger, SecurityHandlerFactory shFactory) throws SecurityException {
         this.serverCfg = serverCfg;
         this.bookie = bookie;
-        this.readThreadPool = createExecutor(this.serverCfg.getNumReadWorkerThreads(), "BookieReadThread-" + serverCfg.getBookiePort());
-        this.writeThreadPool = createExecutor(this.serverCfg.getNumAddWorkerThreads(), "BookieWriteThread-" + serverCfg.getBookiePort());
+        this.readThreadPool = createExecutor(this.serverCfg.getNumReadWorkerThreads(), "BookieReadThread-" + serverCfg.getBookiePort(),
+                serverCfg.getMaxPendingReadRequestPerThread());
+        this.writeThreadPool = createExecutor(this.serverCfg.getNumAddWorkerThreads(), "BookieWriteThread-" + serverCfg.getBookiePort(),
+                serverCfg.getMaxPendingAddRequestPerThread());
         this.longPollThreadPool =
             createExecutor(
                 this.serverCfg.getNumLongPollWorkerThreads(),
-                "BookieLongPollThread-" + serverCfg.getBookiePort());
+                "BookieLongPollThread-" + serverCfg.getBookiePort(), OrderedScheduler.NO_TASK_LIMIT);
         this.requestTimer = new HashedWheelTimer(
             new ThreadFactoryBuilder().setNameFormat("BookieRequestTimer-%d").build(),
             this.serverCfg.getRequestTimerTickDurationMs(),
@@ -180,11 +184,11 @@ public void close() {
         shutdownExecutor(readThreadPool);
     }
 
-    private OrderedSafeExecutor createExecutor(int numThreads, String nameFormat) {
+    private OrderedSafeExecutor createExecutor(int numThreads, String nameFormat, int maxTasksInQueue) {
         if (numThreads <= 0) {
             return null;
         } else {
-            return OrderedSafeExecutor.newBuilder().numThreads(numThreads).name(nameFormat).build();
+            return OrderedSafeExecutor.newBuilder().numThreads(numThreads).name(nameFormat).maxTasksInQueue(maxTasksInQueue).build();
         }
     }
 
@@ -288,7 +292,24 @@ private void processAddRequestV3(final BookkeeperProtocol.Request r, final Chann
         if (null == writeThreadPool) {
             write.run();
         } else {
-            writeThreadPool.submitOrdered(r.getAddRequest().getLedgerId(), write);
+            try {
+                writeThreadPool.submitOrdered(r.getAddRequest().getLedgerId(), write);
+            } catch (RejectedExecutionException e) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Failed to process request to add entry at {}:{}. Too many pending requests",
+                            r.getAddRequest().getLedgerId(), r.getAddRequest().getEntryId());
+                }
+                BookkeeperProtocol.AddResponse.Builder addResponse = BookkeeperProtocol.AddResponse.newBuilder()
+                        .setLedgerId(r.getAddRequest().getLedgerId())
+                        .setEntryId(r.getAddRequest().getEntryId())
+                        .setStatus(BookkeeperProtocol.StatusCode.ETOOMANYREQUESTS);
+                BookkeeperProtocol.Response.Builder response = BookkeeperProtocol.Response.newBuilder()
+                        .setHeader(write.getHeader())
+                        .setStatus(addResponse.getStatus())
+                        .setAddResponse(addResponse);
+                BookkeeperProtocol.Response resp = response.build();
+                write.sendResponse(addResponse.getStatus(), resp, addRequestStats);
+            }
         }
     }
 
@@ -316,7 +337,25 @@ private void processReadRequestV3(final BookkeeperProtocol.Request r, final Chan
             if (null == readThreadPool) {
                 read.run();
             } else {
-                readThreadPool.submitOrdered(r.getReadRequest().getLedgerId(), read);
+                try {
+                    readThreadPool.submitOrdered(r.getReadRequest().getLedgerId(), read);
+                } catch (RejectedExecutionException e) {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Failed to process request to read entry at {}:{}. Too many pending requests",
+                                r.getReadRequest().getLedgerId(), r.getReadRequest().getEntryId());
+                    }
+                    BookkeeperProtocol.ReadResponse.Builder readResponse =
+                            BookkeeperProtocol.ReadResponse.newBuilder()
+                                    .setLedgerId(r.getAddRequest().getLedgerId())
+                                    .setEntryId(r.getAddRequest().getEntryId())
+                                    .setStatus(BookkeeperProtocol.StatusCode.ETOOMANYREQUESTS);
+                    BookkeeperProtocol.Response.Builder response = BookkeeperProtocol.Response.newBuilder()
+                            .setHeader(read.getHeader())
+                            .setStatus(readResponse.getStatus())
+                            .setReadResponse(readResponse);
+                    BookkeeperProtocol.Response resp = response.build();
+                    read.sendResponse(readResponse.getStatus(), resp, readRequestStats);
+                }
             }
         }
     }
@@ -378,7 +417,17 @@ private void processAddRequest(final BookieProtocol.Request r, final Channel c)
         if (null == writeThreadPool) {
             write.run();
         } else {
-            writeThreadPool.submitOrdered(r.getLedgerId(), write);
+            try {
+                writeThreadPool.submitOrdered(r.getLedgerId(), write);
+            } catch (RejectedExecutionException e) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Failed to process request to add entry at {}:{}. Too many pending requests", r.ledgerId,
+                            r.entryId);
+                }
+
+                write.sendResponse(BookieProtocol.ETOOMANYREQUESTS,
+                        ResponseBuilder.buildErrorResponse(BookieProtocol.ETOOMANYREQUESTS, r), addRequestStats);
+            }
         }
     }
 
@@ -387,7 +436,17 @@ private void processReadRequest(final BookieProtocol.Request r, final Channel c)
         if (null == readThreadPool) {
             read.run();
         } else {
-            readThreadPool.submitOrdered(r.getLedgerId(), read);
+            try {
+                readThreadPool.submitOrdered(r.getLedgerId(), read);
+            } catch (RejectedExecutionException e) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Failed to process request to read entry at {}:{}. Too many pending requests", r.ledgerId,
+                            r.entryId);
+                }
+
+                read.sendResponse(BookieProtocol.ETOOMANYREQUESTS,
+                        ResponseBuilder.buildErrorResponse(BookieProtocol.ETOOMANYREQUESTS, r), readRequestStats);
+            }
         }
     }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index 0c8f25ad9..c392c794f 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -1040,6 +1040,8 @@ private StatusCode getStatusCodeFromErrorCode(int errorCode) {
                 return StatusCode.EFENCED;
             case BookieProtocol.EREADONLY:
                 return StatusCode.EREADONLY;
+            case BookieProtocol.ETOOMANYREQUESTS:
+                return StatusCode.ETOOMANYREQUESTS;
             default:
                 throw new IllegalArgumentException("Invalid error code: " + errorCode);
         }
@@ -1771,6 +1773,9 @@ private Integer statusCodeToExceptionCode(StatusCode status) {
             case EREADONLY:
                 rcToRet = BKException.Code.WriteOnReadOnlyBookieException;
                 break;
+            case ETOOMANYREQUESTS:
+                rcToRet = BKException.Code.TooManyRequestsException;
+                break;
             default:
                 break;
         }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/OrderedSafeExecutor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/OrderedSafeExecutor.java
index d832d1878..8df8d5b8a 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/OrderedSafeExecutor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/OrderedSafeExecutor.java
@@ -58,7 +58,7 @@ public OrderedSafeExecutor build() {
                 threadFactory = Executors.defaultThreadFactory();
             }
             return new OrderedSafeExecutor(name, numThreads, threadFactory, statsLogger,
-                                           traceTaskExecution, warnTimeMicroSec);
+                                           traceTaskExecution, warnTimeMicroSec, maxTasksInQueue);
         }
 
     }
@@ -78,11 +78,13 @@ public OrderedSafeExecutor build() {
      *            - should we stat task execution
      * @param warnTimeMicroSec
      *            - log long task exec warning after this interval
+     * @param maxTasksInQueue
+     *            - maximum items allowed in a thread queue. -1 for no limit
      */
     private OrderedSafeExecutor(String baseName, int numThreads, ThreadFactory threadFactory,
                                 StatsLogger statsLogger, boolean traceTaskExecution,
-                                long warnTimeMicroSec) {
-        super(baseName, numThreads, threadFactory, statsLogger, traceTaskExecution, warnTimeMicroSec);
+                                long warnTimeMicroSec, int maxTasksInQueue) {
+        super(baseName, numThreads, threadFactory, statsLogger, traceTaskExecution, warnTimeMicroSec, maxTasksInQueue);
     }
 
     /**
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestMaxSizeWorkersQueue.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestMaxSizeWorkersQueue.java
new file mode 100644
index 000000000..49617ec8f
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestMaxSizeWorkersQueue.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.client;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Enumeration;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
+import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.junit.Test;
+
+
+public class TestMaxSizeWorkersQueue extends BookKeeperClusterTestCase {
+    DigestType digestType = DigestType.CRC32;
+
+    public TestMaxSizeWorkersQueue() {
+        super(1);
+
+        baseConf.setNumReadWorkerThreads(1);
+        baseConf.setNumAddWorkerThreads(1);
+
+        // Configure very small queue sizes
+        baseConf.setMaxPendingReadRequestPerThread(1);
+        baseConf.setMaxPendingAddRequestPerThread(1);
+    }
+
+    @Test(timeout = 60000)
+    public void testReadRejected() throws Exception {
+        LedgerHandle lh = bkc.createLedger(1, 1, digestType, new byte[0]);
+        byte[] content = new byte[100];
+
+        final int N = 1000;
+        // Write few entries
+        for (int i = 0; i < N; i++) {
+            lh.addEntry(content);
+        }
+
+        // Read asynchronously:
+        // - 1st read must always succeed
+        // - Subsequent reads may fail, depending on timing
+        // - At least few, we expect to fail with TooManyRequestException
+        final CountDownLatch counter = new CountDownLatch(2);
+
+        final AtomicInteger rcFirstReadOperation = new AtomicInteger();
+
+        lh.asyncReadEntries(0, 0, new ReadCallback() {
+            @Override
+            public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq, Object ctx) {
+                rcFirstReadOperation.set(rc);
+                counter.countDown();
+            }
+        }, lh);
+
+        final AtomicInteger rcSecondReadOperation = new AtomicInteger();
+
+        lh.asyncReadEntries(0, N - 1, new ReadCallback() {
+            @Override
+            public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq, Object ctx) {
+                rcSecondReadOperation.set(rc);
+                counter.countDown();
+            }
+        }, lh);
+
+        counter.await();
+
+        assertEquals(BKException.Code.OK, rcFirstReadOperation.get());
+        assertEquals(BKException.Code.TooManyRequestsException, rcSecondReadOperation.get());
+    }
+
+    @Test(timeout = 60000)
+    public void testAddRejected() throws Exception {
+        LedgerHandle lh = bkc.createLedger(1, 1, digestType, new byte[0]);
+        byte[] content = new byte[100];
+
+        final int N = 1000;
+
+        // Write asynchronously, and expect at least few writes to have failed with NotEnoughBookies,
+        // because when we get the TooManyRequestException, the client will try to form a new ensemble and that
+        // operation will fail since we only have 1 bookie available
+        final CountDownLatch counter = new CountDownLatch(N);
+        final AtomicBoolean receivedTooManyRequestsException = new AtomicBoolean();
+
+        // Write few entries
+        for (int i = 0; i < N; i++) {
+            lh.asyncAddEntry(content, new AddCallback() {
+                @Override
+                public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) {
+                    if (rc == BKException.Code.NotEnoughBookiesException) {
+                        receivedTooManyRequestsException.set(true);
+                    }
+
+                    counter.countDown();
+                }
+            }, null);
+        }
+
+        counter.await();
+
+        assertTrue(receivedTooManyRequestsException.get());
+    }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services