You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2018/04/04 00:15:40 UTC
[bookkeeper] branch master updated: longPollThreadPool can not be
null
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 6bb5bd0 longPollThreadPool can not be null
6bb5bd0 is described below
commit 6bb5bd09ab75b66c71827d07eb94c47ab8867c0f
Author: Sijie Guo <si...@apache.org>
AuthorDate: Tue Apr 3 17:15:32 2018 -0700
longPollThreadPool can not be null
Descriptions of the changes in this PR:
*Problem*
The long polling logic is built with the assumption that there is a thread pool for scheduling deferred reads.
So if people happens to set `numLongPollWorkerThreads` to zero or negative, a null value is passed into long poll requests which causes NPE.
*Solution*
If `numLongPollWorkerThreads` is set to zero or negative, fall back to use read thread pool. If there is no read thread pool, create a thread pool
aligned with num processors.
With this change, turn the default value of `long poll threads` to zero. so no additional threads are needed if long poll feature is not used (e.g. at pulsar).
Author: Sijie Guo <si...@apache.org>
Reviewers: Enrico Olivelli <eo...@gmail.com>, Yiming Zang <yz...@gmail.com>, Matteo Merli <mm...@apache.org>
This closes #1308 from sijie/validate_number_long_poll_threads
---
.../bookkeeper/conf/ServerConfiguration.java | 9 +++--
.../bookkeeper/processor/RequestProcessor.java | 2 +-
.../bookkeeper/proto/BookieRequestProcessor.java | 22 +++++++++---
.../proto/TestBookieRequestProcessor.java | 39 ++++++++++++++++++++++
conf/bk_server.conf | 2 +-
site/_data/config/bk_server.yaml | 2 +-
6 files changed, 67 insertions(+), 9 deletions(-)
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
index 2aaea61..e9950fd 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
@@ -1288,10 +1288,15 @@ public class ServerConfiguration extends AbstractConfiguration<ServerConfigurati
/**
* Get the number of threads that should handle long poll requests.
- * @return
+ *
+ * <p>If the number of threads is zero or negative, bookie will fallback to
+ * use read threads. If there is no read threads used, it will create a thread pool
+ * with {@link Runtime#availableProcessors()} threads.
+ *
+ * @return the number of threads that should handle long poll requests, default value is 0.
*/
public int getNumLongPollWorkerThreads() {
- return getInt(NUM_LONG_POLL_WORKER_THREADS, 10);
+ return getInt(NUM_LONG_POLL_WORKER_THREADS, 0);
}
/**
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/RequestProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/RequestProcessor.java
index 19f095c..2c8cf7a 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/RequestProcessor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/RequestProcessor.java
@@ -25,7 +25,7 @@ import io.netty.channel.Channel;
/**
* A request processor that is used for processing requests at bookie side.
*/
-public interface RequestProcessor {
+public interface RequestProcessor extends AutoCloseable {
/**
* Close the request processor.
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
index 7e65ced..4102e75 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
@@ -56,6 +56,8 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
+import lombok.AccessLevel;
+import lombok.Getter;
import org.apache.bookkeeper.auth.AuthProviderFactoryFactory;
import org.apache.bookkeeper.auth.AuthToken;
import org.apache.bookkeeper.bookie.Bookie;
@@ -92,6 +94,7 @@ public class BookieRequestProcessor implements RequestProcessor {
/**
* The threadpool used to execute all read entry requests issued to this server.
*/
+ @Getter(AccessLevel.PACKAGE)
private final OrderedExecutor readThreadPool;
/**
@@ -108,6 +111,7 @@ public class BookieRequestProcessor implements RequestProcessor {
* The threadpool used to execute all long poll requests issued to this server
* after they are done waiting.
*/
+ @Getter(AccessLevel.PACKAGE)
private final OrderedExecutor longPollThreadPool;
/**
@@ -158,10 +162,18 @@ public class BookieRequestProcessor implements RequestProcessor {
"BookieWriteThreadPool",
serverCfg.getMaxPendingAddRequestPerThread(),
statsLogger);
- this.longPollThreadPool = createExecutor(
- this.serverCfg.getNumLongPollWorkerThreads(),
+ if (serverCfg.getNumLongPollWorkerThreads() <= 0 && readThreadPool != null) {
+ this.longPollThreadPool = this.readThreadPool;
+ } else {
+ int numThreads = this.serverCfg.getNumLongPollWorkerThreads();
+ if (numThreads <= 0) {
+ numThreads = Runtime.getRuntime().availableProcessors();
+ }
+ this.longPollThreadPool = createExecutor(
+ numThreads,
"BookieLongPollThread-" + serverCfg.getBookiePort(),
OrderedExecutor.NO_TASK_LIMIT, statsLogger);
+ }
this.highPriorityThreadPool = createExecutor(
this.serverCfg.getNumHighPriorityWorkerThreads(),
"BookieHighPriorityThread-" + serverCfg.getBookiePort(),
@@ -203,7 +215,9 @@ public class BookieRequestProcessor implements RequestProcessor {
public void close() {
shutdownExecutor(writeThreadPool);
shutdownExecutor(readThreadPool);
- shutdownExecutor(longPollThreadPool);
+ if (serverCfg.getNumLongPollWorkerThreads() > 0 || readThreadPool == null) {
+ shutdownExecutor(longPollThreadPool);
+ }
shutdownExecutor(highPriorityThreadPool);
}
@@ -362,7 +376,7 @@ public class BookieRequestProcessor implements RequestProcessor {
final ReadEntryProcessorV3 read;
final OrderedExecutor threadPool;
if (RequestUtils.isLongPollReadRequest(r.getReadRequest())) {
- ExecutorService lpThread = null == longPollThreadPool ? null : longPollThreadPool.chooseThread(c);
+ ExecutorService lpThread = longPollThreadPool.chooseThread(c);
read = new LongPollReadEntryProcessorV3(r, c, this, fenceThread,
lpThread, requestTimer);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBookieRequestProcessor.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBookieRequestProcessor.java
index c7601d0..b212e49 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBookieRequestProcessor.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBookieRequestProcessor.java
@@ -21,17 +21,56 @@
package org.apache.bookkeeper.proto;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
import com.google.protobuf.ByteString;
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.proto.BookkeeperProtocol.AddRequest;
import org.apache.bookkeeper.proto.BookkeeperProtocol.ReadRequest;
+import org.apache.bookkeeper.stats.NullStatsLogger;
import org.junit.Test;
/**
* Test utility methods from bookie request processor.
*/
public class TestBookieRequestProcessor {
+
+ @Test
+ public void testConstructLongPollThreads() throws Exception {
+ // long poll threads == read threads
+ ServerConfiguration conf = new ServerConfiguration();
+ try (BookieRequestProcessor processor = new BookieRequestProcessor(
+ conf, mock(Bookie.class), NullStatsLogger.INSTANCE, null)) {
+ assertSame(processor.getReadThreadPool(), processor.getLongPollThreadPool());
+ }
+
+ // force create long poll threads if there is no read threads
+ conf = new ServerConfiguration();
+ conf.setNumReadWorkerThreads(0);
+ try (BookieRequestProcessor processor = new BookieRequestProcessor(
+ conf, mock(Bookie.class), NullStatsLogger.INSTANCE, null)) {
+ assertNull(processor.getReadThreadPool());
+ assertNotNull(processor.getLongPollThreadPool());
+ }
+
+ // long poll threads and no read threads
+ conf = new ServerConfiguration();
+ conf.setNumReadWorkerThreads(2);
+ conf.setNumLongPollWorkerThreads(2);
+ try (BookieRequestProcessor processor = new BookieRequestProcessor(
+ conf, mock(Bookie.class), NullStatsLogger.INSTANCE, null)) {
+ assertNotNull(processor.getReadThreadPool());
+ assertNotNull(processor.getLongPollThreadPool());
+ assertNotSame(processor.getReadThreadPool(), processor.getLongPollThreadPool());
+ }
+ }
+
@Test
public void testFlagsV3() {
ReadRequest read = ReadRequest.newBuilder()
diff --git a/conf/bk_server.conf b/conf/bk_server.conf
index 8c294b9..8016a78 100755
--- a/conf/bk_server.conf
+++ b/conf/bk_server.conf
@@ -107,7 +107,7 @@ bookiePort=3181
# numReadWorkerThreads=8
# The number of threads that should handle long poll requests.
-# numLongPollWorkerThreads=10
+# numLongPollWorkerThreads=0
# The number of threads used for handling journal callback. If a zero or negative number is provided,
# the callbacks are executed directly at force write threads.
diff --git a/site/_data/config/bk_server.yaml b/site/_data/config/bk_server.yaml
index 6eeb329..ce45f20 100644
--- a/site/_data/config/bk_server.yaml
+++ b/site/_data/config/bk_server.yaml
@@ -55,7 +55,7 @@ groups:
default: 8
- param: numLongPollWorkerThreads
description: The number of threads that handle long poll requests. If zero, long poll requests are handled by [Netty threads](//netty.io/wiki/thread-model.html) directly.
- default: 10
+ default: 0
- param: numJournalCallbackThreads
description: The number of threads that handle journal callbacks. If zero, journal callbacks are executed directly on force write threads.
default: 1
--
To stop receiving notification emails like this one, please contact
sijie@apache.org.