You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/07/04 01:33:22 UTC
[pulsar] branch master updated: [improve][broker] Able to use separated IO threads for BookKeeper Client (#16333)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 904c91f4c92 [improve][broker] Able to use separated IO threads for BookKeeper Client (#16333)
904c91f4c92 is described below
commit 904c91f4c92b4307b8acbce4538199e8430c9bc3
Author: lipenghui <pe...@apache.org>
AuthorDate: Mon Jul 4 09:33:15 2022 +0800
[improve][broker] Able to use separated IO threads for BookKeeper Client (#16333)
### Motivation
The broker always use the broker IO thread pool as the BookKeeper Client IO thread pool.
The PR provides the ability to use a separated IO thead pool for BookKeeper Client.
### Modification
Introduced two new configurations but will not change the default behavior.
```
# Number of BookKeeper client IO threads
# Default is Runtime.getRuntime().availableProcessors() * 2
bookkeeperClientNumIoThreads=
# Use separated IO threads for BookKeeper client
# Default is false, which will use Pulsar IO threads
bookkeeperClientSeparatedIoThreadsEnabled=false
```
---
conf/broker.conf | 8 ++++++++
conf/standalone.conf | 8 ++++++++
.../apache/pulsar/broker/ServiceConfiguration.java | 11 +++++++++++
.../pulsar/broker/BookKeeperClientFactoryImpl.java | 19 +++++++++++++-----
.../broker/BookKeeperClientFactoryImplTest.java | 23 ++++++++++++++++++++++
.../common/naming/ServiceConfigurationTest.java | 16 +++++++++++++++
site2/docs/reference-configuration.md | 2 ++
7 files changed, 82 insertions(+), 5 deletions(-)
diff --git a/conf/broker.conf b/conf/broker.conf
index 91e20059296..d93811d04a7 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -845,6 +845,14 @@ bookkeeperClientTimeoutInSeconds=30
# Default is Runtime.getRuntime().availableProcessors()
bookkeeperClientNumWorkerThreads=
+# Number of BookKeeper client IO threads
+# Default is Runtime.getRuntime().availableProcessors() * 2
+bookkeeperClientNumIoThreads=
+
+# Use separated IO threads for BookKeeper client
+# Default is false, which will use Pulsar IO threads
+bookkeeperClientSeparatedIoThreadsEnabled=false
+
# Speculative reads are initiated if a read request doesn't complete within a certain time
# Using a value of 0, is disabling the speculative reads
bookkeeperClientSpeculativeReadTimeoutInMillis=0
diff --git a/conf/standalone.conf b/conf/standalone.conf
index a47692ef454..7796e3e41e8 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -546,6 +546,14 @@ bookkeeperClientTimeoutInSeconds=30
# Default is Runtime.getRuntime().availableProcessors()
bookkeeperClientNumWorkerThreads=
+# Number of BookKeeper client IO threads
+# Default is Runtime.getRuntime().availableProcessors() * 2
+bookkeeperClientNumIoThreads=
+
+# Use separated IO threads for BookKeeper client
+# Default is false, which will use Pulsar IO threads
+bookkeeperClientSeparatedIoThreadsEnabled=false
+
# Speculative reads are initiated if a read request doesn't complete within a certain time
# Using a value of 0, is disabling the speculative reads
bookkeeperClientSpeculativeReadTimeoutInMillis=0
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index fd138abfce6..43ef83f54ed 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -1683,6 +1683,17 @@ public class ServiceConfiguration implements PulsarConfiguration {
)
private int bookkeeperClientNumWorkerThreads = Runtime.getRuntime().availableProcessors();
+ @FieldContext(
+ category = CATEGORY_STORAGE_BK,
+ doc = "Number of BookKeeper client IO threads. Default is Runtime.getRuntime().availableProcessors() * 2"
+ )
+ private int bookkeeperClientNumIoThreads = Runtime.getRuntime().availableProcessors() * 2;
+
+ @FieldContext(
+ category = CATEGORY_STORAGE_BK,
+ doc = "Use separated IO threads for BookKeeper client. Default is false, which will use Pulsar IO threads"
+ )
+ private boolean bookkeeperClientSeparatedIoThreadsEnabled = false;
/**** --- Managed Ledger. --- ****/
@FieldContext(
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
index 62fdd20b1ac..f3d69202a08 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
@@ -80,16 +80,24 @@ public class BookKeeperClientFactoryImpl implements BookKeeperClientFactory {
setDefaultEnsemblePlacementPolicy(bkConf, conf, store);
}
try {
- return BookKeeper.forConfig(bkConf)
- .allocator(PulsarByteBufAllocator.DEFAULT)
- .eventLoopGroup(eventLoopGroup)
- .statsLogger(statsLogger)
- .build();
+ return getBookKeeperBuilder(conf, eventLoopGroup, statsLogger, bkConf).build();
} catch (InterruptedException | BKException e) {
throw new IOException(e);
}
}
+ @VisibleForTesting
+ BookKeeper.Builder getBookKeeperBuilder(ServiceConfiguration conf, EventLoopGroup eventLoopGroup,
+ StatsLogger statsLogger, ClientConfiguration bkConf) {
+ BookKeeper.Builder builder = BookKeeper.forConfig(bkConf)
+ .allocator(PulsarByteBufAllocator.DEFAULT)
+ .statsLogger(statsLogger);
+ if (!conf.isBookkeeperClientSeparatedIoThreadsEnabled()) {
+ builder.eventLoopGroup(eventLoopGroup);
+ }
+ return builder;
+ }
+
@VisibleForTesting
ClientConfiguration createBkClientConfiguration(MetadataStoreExtended store, ServiceConfiguration conf) {
ClientConfiguration bkConf = new ClientConfiguration();
@@ -150,6 +158,7 @@ public class BookKeeperClientFactoryImpl implements BookKeeperClientFactory {
conf.getBookkeeperClientGetBookieInfoIntervalSeconds(), TimeUnit.SECONDS);
bkConf.setGetBookieInfoRetryIntervalSeconds(
conf.getBookkeeperClientGetBookieInfoRetryIntervalSeconds(), TimeUnit.SECONDS);
+ bkConf.setNumIOThreads(conf.getBookkeeperClientNumIoThreads());
PropertiesUtils.filterAndMapProperties(conf.getProperties(), "bookkeeper_")
.forEach((key, value) -> {
log.info("Applying BookKeeper client configuration setting {}={}", key, value);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/BookKeeperClientFactoryImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BookKeeperClientFactoryImplTest.java
index 0cef2fc45c7..e26b0aa7561 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/BookKeeperClientFactoryImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BookKeeperClientFactoryImplTest.java
@@ -31,12 +31,16 @@ import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
+import io.netty.channel.EventLoopGroup;
+import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.net.CachedDNSToSwitchMapping;
+import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.pulsar.bookie.rackawareness.BookieRackAffinityMapping;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+import org.powermock.reflect.Whitebox;
import org.testng.annotations.Test;
/**
@@ -276,4 +280,23 @@ public class BookKeeperClientFactoryImplTest {
}
+ @Test
+ public void testBookKeeperIoThreadsConfiguration() {
+ BookKeeperClientFactoryImpl factory = new BookKeeperClientFactoryImpl();
+ ServiceConfiguration conf = new ServiceConfiguration();
+ assertEquals(factory.createBkClientConfiguration(mock(MetadataStoreExtended.class), conf)
+ .getNumIOThreads(), Runtime.getRuntime().availableProcessors() * 2);
+ conf.setBookkeeperClientNumIoThreads(1);
+ assertEquals(factory.createBkClientConfiguration(mock(MetadataStoreExtended.class), conf)
+ .getNumIOThreads(), 1);
+ EventLoopGroup eventLoopGroup = mock(EventLoopGroup.class);
+ BookKeeper.Builder builder = factory.getBookKeeperBuilder(conf, eventLoopGroup,
+ mock(StatsLogger.class), mock(ClientConfiguration.class));
+ assertEquals(Whitebox.getInternalState(builder, "eventLoopGroup"), eventLoopGroup);
+ conf.setBookkeeperClientSeparatedIoThreadsEnabled(true);
+ builder = factory.getBookKeeperBuilder(conf, eventLoopGroup,
+ mock(StatsLogger.class), mock(ClientConfiguration.class));
+ assertNull(Whitebox.getInternalState(builder, "eventLoopGroup"));
+ }
+
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
index fc3fbc8981b..97202866fb5 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
@@ -237,4 +237,20 @@ public class ServiceConfigurationTest {
}
}
+
+ @Test
+ public void testBookKeeperClientIoThreads() throws Exception {
+ try (FileInputStream stream = new FileInputStream("../conf/broker.conf")) {
+ final ServiceConfiguration fileConfig = PulsarConfigurationLoader.create(stream, ServiceConfiguration.class);
+ assertFalse(fileConfig.isBookkeeperClientSeparatedIoThreadsEnabled());
+ assertEquals(fileConfig.getBookkeeperClientNumIoThreads(), Runtime.getRuntime().availableProcessors() * 2);
+ }
+ String confFile = "bookkeeperClientNumIoThreads=1\n" +
+ "bookkeeperClientSeparatedIoThreadsEnabled=true\n";
+ try (InputStream stream = new ByteArrayInputStream(confFile.getBytes())) {
+ final ServiceConfiguration conf = PulsarConfigurationLoader.create(stream, ServiceConfiguration.class);
+ assertTrue(conf.isBookkeeperClientSeparatedIoThreadsEnabled());
+ assertEquals(conf.getBookkeeperClientNumIoThreads(), 1);
+ }
+ }
}
diff --git a/site2/docs/reference-configuration.md b/site2/docs/reference-configuration.md
index ab20f653dde..b956314a53c 100644
--- a/site2/docs/reference-configuration.md
+++ b/site2/docs/reference-configuration.md
@@ -269,6 +269,8 @@ brokerServiceCompactionThresholdInBytes|If the estimated backlog size is greater
|bookkeeperClientAuthenticationParametersName| BookKeeper auth plugin implementation specifics parameters name and values ||
|bookkeeperClientAuthenticationParameters|||
|bookkeeperClientNumWorkerThreads| Number of BookKeeper client worker threads. Default is Runtime.getRuntime().availableProcessors() ||
+|bookkeeperClientNumIoThreads| Number of BookKeeper client IO threads. Default is Runtime.getRuntime().availableProcessors() * 2 ||
+|bookkeeperClientSeparatedIoThreadsEnabled| Use separated IO threads for BookKeeper client. Default is false, which will use Pulsar IO threads ||
|bookkeeperClientTimeoutInSeconds| Timeout for BK add / read operations |30|
|bookkeeperClientSpeculativeReadTimeoutInMillis| Speculative reads are initiated if a read request doesn’t complete within a certain time Using a value of 0, is disabling the speculative reads |0|
|bookkeeperNumberOfChannelsPerBookie| Number of channels per bookie |16|