You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2018/04/04 00:15:40 UTC

[bookkeeper] branch master updated: longPollThreadPool can not be null

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 6bb5bd0  longPollThreadPool can not be null
6bb5bd0 is described below

commit 6bb5bd09ab75b66c71827d07eb94c47ab8867c0f
Author: Sijie Guo <si...@apache.org>
AuthorDate: Tue Apr 3 17:15:32 2018 -0700

    longPollThreadPool can not be null
    
    Descriptions of the changes in this PR:
    
    *Problem*
    
    The long polling logic is built with the assumption that there is a thread pool for scheduling deferred reads.
    So if people happens to set `numLongPollWorkerThreads` to zero or negative, a null value is passed into long poll requests which causes NPE.
    
    *Solution*
    
    If `numLongPollWorkerThreads` is set to zero or negative, fall back to use read thread pool. If there is no read thread pool, create a thread pool
    aligned with num processors.
    
    With this change, turn the default value of `long poll threads` to zero. so no additional threads are needed if long poll feature is not used (e.g. at pulsar).
    
    Author: Sijie Guo <si...@apache.org>
    
    Reviewers: Enrico Olivelli <eo...@gmail.com>, Yiming Zang <yz...@gmail.com>, Matteo Merli <mm...@apache.org>
    
    This closes #1308 from sijie/validate_number_long_poll_threads
---
 .../bookkeeper/conf/ServerConfiguration.java       |  9 +++--
 .../bookkeeper/processor/RequestProcessor.java     |  2 +-
 .../bookkeeper/proto/BookieRequestProcessor.java   | 22 +++++++++---
 .../proto/TestBookieRequestProcessor.java          | 39 ++++++++++++++++++++++
 conf/bk_server.conf                                |  2 +-
 site/_data/config/bk_server.yaml                   |  2 +-
 6 files changed, 67 insertions(+), 9 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
index 2aaea61..e9950fd 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
@@ -1288,10 +1288,15 @@ public class ServerConfiguration extends AbstractConfiguration<ServerConfigurati
 
     /**
      * Get the number of threads that should handle long poll requests.
-     * @return
+     *
+     * <p>If the number of threads is zero or negative, bookie will fallback to
+     * use read threads. If there is no read threads used, it will create a thread pool
+     * with {@link Runtime#availableProcessors()} threads.
+     *
+     * @return the number of threads that should handle long poll requests, default value is 0.
      */
     public int getNumLongPollWorkerThreads() {
-        return getInt(NUM_LONG_POLL_WORKER_THREADS, 10);
+        return getInt(NUM_LONG_POLL_WORKER_THREADS, 0);
     }
 
     /**
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/RequestProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/RequestProcessor.java
index 19f095c..2c8cf7a 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/RequestProcessor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/RequestProcessor.java
@@ -25,7 +25,7 @@ import io.netty.channel.Channel;
 /**
  * A request processor that is used for processing requests at bookie side.
  */
-public interface RequestProcessor {
+public interface RequestProcessor extends AutoCloseable {
 
     /**
      * Close the request processor.
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
index 7e65ced..4102e75 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
@@ -56,6 +56,8 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 
+import lombok.AccessLevel;
+import lombok.Getter;
 import org.apache.bookkeeper.auth.AuthProviderFactoryFactory;
 import org.apache.bookkeeper.auth.AuthToken;
 import org.apache.bookkeeper.bookie.Bookie;
@@ -92,6 +94,7 @@ public class BookieRequestProcessor implements RequestProcessor {
     /**
      * The threadpool used to execute all read entry requests issued to this server.
      */
+    @Getter(AccessLevel.PACKAGE)
     private final OrderedExecutor readThreadPool;
 
     /**
@@ -108,6 +111,7 @@ public class BookieRequestProcessor implements RequestProcessor {
      * The threadpool used to execute all long poll requests issued to this server
      * after they are done waiting.
      */
+    @Getter(AccessLevel.PACKAGE)
     private final OrderedExecutor longPollThreadPool;
 
     /**
@@ -158,10 +162,18 @@ public class BookieRequestProcessor implements RequestProcessor {
                 "BookieWriteThreadPool",
                 serverCfg.getMaxPendingAddRequestPerThread(),
                 statsLogger);
-        this.longPollThreadPool = createExecutor(
-                this.serverCfg.getNumLongPollWorkerThreads(),
+        if (serverCfg.getNumLongPollWorkerThreads() <= 0 && readThreadPool != null) {
+            this.longPollThreadPool = this.readThreadPool;
+        } else {
+            int numThreads = this.serverCfg.getNumLongPollWorkerThreads();
+            if (numThreads <= 0) {
+                numThreads = Runtime.getRuntime().availableProcessors();
+            }
+            this.longPollThreadPool = createExecutor(
+                numThreads,
                 "BookieLongPollThread-" + serverCfg.getBookiePort(),
                 OrderedExecutor.NO_TASK_LIMIT, statsLogger);
+        }
         this.highPriorityThreadPool = createExecutor(
                 this.serverCfg.getNumHighPriorityWorkerThreads(),
                 "BookieHighPriorityThread-" + serverCfg.getBookiePort(),
@@ -203,7 +215,9 @@ public class BookieRequestProcessor implements RequestProcessor {
     public void close() {
         shutdownExecutor(writeThreadPool);
         shutdownExecutor(readThreadPool);
-        shutdownExecutor(longPollThreadPool);
+        if (serverCfg.getNumLongPollWorkerThreads() > 0 || readThreadPool == null) {
+            shutdownExecutor(longPollThreadPool);
+        }
         shutdownExecutor(highPriorityThreadPool);
     }
 
@@ -362,7 +376,7 @@ public class BookieRequestProcessor implements RequestProcessor {
         final ReadEntryProcessorV3 read;
         final OrderedExecutor threadPool;
         if (RequestUtils.isLongPollReadRequest(r.getReadRequest())) {
-            ExecutorService lpThread = null == longPollThreadPool ? null : longPollThreadPool.chooseThread(c);
+            ExecutorService lpThread = longPollThreadPool.chooseThread(c);
 
             read = new LongPollReadEntryProcessorV3(r, c, this, fenceThread,
                                                     lpThread, requestTimer);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBookieRequestProcessor.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBookieRequestProcessor.java
index c7601d0..b212e49 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBookieRequestProcessor.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBookieRequestProcessor.java
@@ -21,17 +21,56 @@
 package org.apache.bookkeeper.proto;
 
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
 
 import com.google.protobuf.ByteString;
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.AddRequest;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.ReadRequest;
+import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.junit.Test;
 
 /**
  * Test utility methods from bookie request processor.
  */
 public class TestBookieRequestProcessor {
+
+    @Test
+    public void testConstructLongPollThreads() throws Exception {
+        // long poll threads == read threads
+        ServerConfiguration conf = new ServerConfiguration();
+        try (BookieRequestProcessor processor = new BookieRequestProcessor(
+            conf, mock(Bookie.class), NullStatsLogger.INSTANCE, null)) {
+            assertSame(processor.getReadThreadPool(), processor.getLongPollThreadPool());
+        }
+
+        // force create long poll threads if there is no read threads
+        conf = new ServerConfiguration();
+        conf.setNumReadWorkerThreads(0);
+        try (BookieRequestProcessor processor = new BookieRequestProcessor(
+            conf, mock(Bookie.class), NullStatsLogger.INSTANCE, null)) {
+            assertNull(processor.getReadThreadPool());
+            assertNotNull(processor.getLongPollThreadPool());
+        }
+
+        // long poll threads and no read threads
+        conf = new ServerConfiguration();
+        conf.setNumReadWorkerThreads(2);
+        conf.setNumLongPollWorkerThreads(2);
+        try (BookieRequestProcessor processor = new BookieRequestProcessor(
+            conf, mock(Bookie.class), NullStatsLogger.INSTANCE, null)) {
+            assertNotNull(processor.getReadThreadPool());
+            assertNotNull(processor.getLongPollThreadPool());
+            assertNotSame(processor.getReadThreadPool(), processor.getLongPollThreadPool());
+        }
+    }
+
     @Test
     public void testFlagsV3() {
         ReadRequest read = ReadRequest.newBuilder()
diff --git a/conf/bk_server.conf b/conf/bk_server.conf
index 8c294b9..8016a78 100755
--- a/conf/bk_server.conf
+++ b/conf/bk_server.conf
@@ -107,7 +107,7 @@ bookiePort=3181
 # numReadWorkerThreads=8
 
 # The number of threads that should handle long poll requests.
-# numLongPollWorkerThreads=10
+# numLongPollWorkerThreads=0
 
 # The number of threads used for handling journal callback. If a zero or negative number is provided,
 # the callbacks are executed directly at force write threads.
diff --git a/site/_data/config/bk_server.yaml b/site/_data/config/bk_server.yaml
index 6eeb329..ce45f20 100644
--- a/site/_data/config/bk_server.yaml
+++ b/site/_data/config/bk_server.yaml
@@ -55,7 +55,7 @@ groups:
     default: 8
   - param: numLongPollWorkerThreads
     description: The number of threads that handle long poll requests. If zero, long poll requests are handled by [Netty threads](//netty.io/wiki/thread-model.html) directly.
-    default: 10
+    default: 0
   - param: numJournalCallbackThreads
     description: The number of threads that handle journal callbacks. If zero, journal callbacks are executed directly on force write threads.
     default: 1

-- 
To stop receiving notification emails like this one, please contact
sijie@apache.org.