You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/07/04 01:33:22 UTC

[pulsar] branch master updated: [improve][broker] Able to use separated IO threads for BookKeeper Client (#16333)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 904c91f4c92 [improve][broker] Able to use separated IO threads for BookKeeper Client (#16333)
904c91f4c92 is described below

commit 904c91f4c92b4307b8acbce4538199e8430c9bc3
Author: lipenghui <pe...@apache.org>
AuthorDate: Mon Jul 4 09:33:15 2022 +0800

    [improve][broker] Able to use separated IO threads for BookKeeper Client (#16333)
    
    ### Motivation
    
    The broker always use the broker IO thread pool as the BookKeeper Client IO thread pool.
    The PR provides the ability to use a separated IO thead pool for BookKeeper Client.
    
    ### Modification
    
    Introduced two new configurations but will not change the default behavior.
    
    ```
    # Number of BookKeeper client IO threads
    # Default is Runtime.getRuntime().availableProcessors() * 2
    bookkeeperClientNumIoThreads=
    
    # Use separated IO threads for BookKeeper client
    # Default is false, which will use Pulsar IO threads
    bookkeeperClientSeparatedIoThreadsEnabled=false
    ```
---
 conf/broker.conf                                   |  8 ++++++++
 conf/standalone.conf                               |  8 ++++++++
 .../apache/pulsar/broker/ServiceConfiguration.java | 11 +++++++++++
 .../pulsar/broker/BookKeeperClientFactoryImpl.java | 19 +++++++++++++-----
 .../broker/BookKeeperClientFactoryImplTest.java    | 23 ++++++++++++++++++++++
 .../common/naming/ServiceConfigurationTest.java    | 16 +++++++++++++++
 site2/docs/reference-configuration.md              |  2 ++
 7 files changed, 82 insertions(+), 5 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 91e20059296..d93811d04a7 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -845,6 +845,14 @@ bookkeeperClientTimeoutInSeconds=30
 # Default is Runtime.getRuntime().availableProcessors()
 bookkeeperClientNumWorkerThreads=
 
+# Number of BookKeeper client IO threads
+# Default is Runtime.getRuntime().availableProcessors() * 2
+bookkeeperClientNumIoThreads=
+
+# Use separated IO threads for BookKeeper client
+# Default is false, which will use Pulsar IO threads
+bookkeeperClientSeparatedIoThreadsEnabled=false
+
 # Speculative reads are initiated if a read request doesn't complete within a certain time
 # Using a value of 0, is disabling the speculative reads
 bookkeeperClientSpeculativeReadTimeoutInMillis=0
diff --git a/conf/standalone.conf b/conf/standalone.conf
index a47692ef454..7796e3e41e8 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -546,6 +546,14 @@ bookkeeperClientTimeoutInSeconds=30
 # Default is Runtime.getRuntime().availableProcessors()
 bookkeeperClientNumWorkerThreads=
 
+# Number of BookKeeper client IO threads
+# Default is Runtime.getRuntime().availableProcessors() * 2
+bookkeeperClientNumIoThreads=
+
+# Use separated IO threads for BookKeeper client
+# Default is false, which will use Pulsar IO threads
+bookkeeperClientSeparatedIoThreadsEnabled=false
+
 # Speculative reads are initiated if a read request doesn't complete within a certain time
 # Using a value of 0, is disabling the speculative reads
 bookkeeperClientSpeculativeReadTimeoutInMillis=0
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index fd138abfce6..43ef83f54ed 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -1683,6 +1683,17 @@ public class ServiceConfiguration implements PulsarConfiguration {
     )
     private int bookkeeperClientNumWorkerThreads = Runtime.getRuntime().availableProcessors();
 
+    @FieldContext(
+            category = CATEGORY_STORAGE_BK,
+            doc = "Number of BookKeeper client IO threads. Default is Runtime.getRuntime().availableProcessors() * 2"
+    )
+    private int bookkeeperClientNumIoThreads = Runtime.getRuntime().availableProcessors() * 2;
+
+    @FieldContext(
+            category = CATEGORY_STORAGE_BK,
+            doc = "Use separated IO threads for BookKeeper client. Default is false, which will use Pulsar IO threads"
+    )
+    private boolean bookkeeperClientSeparatedIoThreadsEnabled = false;
 
     /**** --- Managed Ledger. --- ****/
     @FieldContext(
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
index 62fdd20b1ac..f3d69202a08 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
@@ -80,16 +80,24 @@ public class BookKeeperClientFactoryImpl implements BookKeeperClientFactory {
             setDefaultEnsemblePlacementPolicy(bkConf, conf, store);
         }
         try {
-            return BookKeeper.forConfig(bkConf)
-                    .allocator(PulsarByteBufAllocator.DEFAULT)
-                    .eventLoopGroup(eventLoopGroup)
-                    .statsLogger(statsLogger)
-                    .build();
+            return getBookKeeperBuilder(conf, eventLoopGroup, statsLogger, bkConf).build();
         } catch (InterruptedException | BKException e) {
             throw new IOException(e);
         }
     }
 
+    @VisibleForTesting
+    BookKeeper.Builder getBookKeeperBuilder(ServiceConfiguration conf, EventLoopGroup eventLoopGroup,
+                                            StatsLogger statsLogger, ClientConfiguration bkConf) {
+        BookKeeper.Builder builder = BookKeeper.forConfig(bkConf)
+                .allocator(PulsarByteBufAllocator.DEFAULT)
+                .statsLogger(statsLogger);
+        if (!conf.isBookkeeperClientSeparatedIoThreadsEnabled()) {
+            builder.eventLoopGroup(eventLoopGroup);
+        }
+        return builder;
+    }
+
     @VisibleForTesting
     ClientConfiguration createBkClientConfiguration(MetadataStoreExtended store, ServiceConfiguration conf) {
         ClientConfiguration bkConf = new ClientConfiguration();
@@ -150,6 +158,7 @@ public class BookKeeperClientFactoryImpl implements BookKeeperClientFactory {
                 conf.getBookkeeperClientGetBookieInfoIntervalSeconds(), TimeUnit.SECONDS);
         bkConf.setGetBookieInfoRetryIntervalSeconds(
                 conf.getBookkeeperClientGetBookieInfoRetryIntervalSeconds(), TimeUnit.SECONDS);
+        bkConf.setNumIOThreads(conf.getBookkeeperClientNumIoThreads());
         PropertiesUtils.filterAndMapProperties(conf.getProperties(), "bookkeeper_")
                 .forEach((key, value) -> {
                     log.info("Applying BookKeeper client configuration setting {}={}", key, value);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/BookKeeperClientFactoryImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BookKeeperClientFactoryImplTest.java
index 0cef2fc45c7..e26b0aa7561 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/BookKeeperClientFactoryImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BookKeeperClientFactoryImplTest.java
@@ -31,12 +31,16 @@ import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
+import io.netty.channel.EventLoopGroup;
+import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.net.CachedDNSToSwitchMapping;
+import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.pulsar.bookie.rackawareness.BookieRackAffinityMapping;
 import org.apache.pulsar.metadata.api.MetadataStore;
 import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+import org.powermock.reflect.Whitebox;
 import org.testng.annotations.Test;
 
 /**
@@ -276,4 +280,23 @@ public class BookKeeperClientFactoryImplTest {
 
     }
 
+    @Test
+    public void testBookKeeperIoThreadsConfiguration() {
+        BookKeeperClientFactoryImpl factory = new BookKeeperClientFactoryImpl();
+        ServiceConfiguration conf = new ServiceConfiguration();
+        assertEquals(factory.createBkClientConfiguration(mock(MetadataStoreExtended.class), conf)
+                .getNumIOThreads(), Runtime.getRuntime().availableProcessors() * 2);
+        conf.setBookkeeperClientNumIoThreads(1);
+        assertEquals(factory.createBkClientConfiguration(mock(MetadataStoreExtended.class), conf)
+                .getNumIOThreads(), 1);
+        EventLoopGroup eventLoopGroup = mock(EventLoopGroup.class);
+        BookKeeper.Builder builder = factory.getBookKeeperBuilder(conf, eventLoopGroup,
+                mock(StatsLogger.class), mock(ClientConfiguration.class));
+        assertEquals(Whitebox.getInternalState(builder, "eventLoopGroup"), eventLoopGroup);
+        conf.setBookkeeperClientSeparatedIoThreadsEnabled(true);
+        builder = factory.getBookKeeperBuilder(conf, eventLoopGroup,
+                mock(StatsLogger.class), mock(ClientConfiguration.class));
+        assertNull(Whitebox.getInternalState(builder, "eventLoopGroup"));
+    }
+
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
index fc3fbc8981b..97202866fb5 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
@@ -237,4 +237,20 @@ public class ServiceConfigurationTest {
         }
 
     }
+
+    @Test
+    public void testBookKeeperClientIoThreads() throws Exception {
+        try (FileInputStream stream = new FileInputStream("../conf/broker.conf")) {
+            final ServiceConfiguration fileConfig = PulsarConfigurationLoader.create(stream, ServiceConfiguration.class);
+            assertFalse(fileConfig.isBookkeeperClientSeparatedIoThreadsEnabled());
+            assertEquals(fileConfig.getBookkeeperClientNumIoThreads(), Runtime.getRuntime().availableProcessors() * 2);
+        }
+        String confFile = "bookkeeperClientNumIoThreads=1\n" +
+                "bookkeeperClientSeparatedIoThreadsEnabled=true\n";
+        try (InputStream stream = new ByteArrayInputStream(confFile.getBytes())) {
+            final ServiceConfiguration conf = PulsarConfigurationLoader.create(stream, ServiceConfiguration.class);
+            assertTrue(conf.isBookkeeperClientSeparatedIoThreadsEnabled());
+            assertEquals(conf.getBookkeeperClientNumIoThreads(), 1);
+        }
+    }
 }
diff --git a/site2/docs/reference-configuration.md b/site2/docs/reference-configuration.md
index ab20f653dde..b956314a53c 100644
--- a/site2/docs/reference-configuration.md
+++ b/site2/docs/reference-configuration.md
@@ -269,6 +269,8 @@ brokerServiceCompactionThresholdInBytes|If the estimated backlog size is greater
 |bookkeeperClientAuthenticationParametersName|  BookKeeper auth plugin implementation specifics parameters name and values  ||
 |bookkeeperClientAuthenticationParameters|||
 |bookkeeperClientNumWorkerThreads|  Number of BookKeeper client worker threads. Default is Runtime.getRuntime().availableProcessors()  ||
+|bookkeeperClientNumIoThreads|  Number of BookKeeper client IO threads. Default is Runtime.getRuntime().availableProcessors() * 2  ||
+|bookkeeperClientSeparatedIoThreadsEnabled|  Use separated IO threads for BookKeeper client. Default is false, which will use Pulsar IO threads  ||
 |bookkeeperClientTimeoutInSeconds|  Timeout for BK add / read operations  |30|
 |bookkeeperClientSpeculativeReadTimeoutInMillis|  Speculative reads are initiated if a read request doesn’t complete within a certain time Using a value of 0, is disabling the speculative reads |0|
 |bookkeeperNumberOfChannelsPerBookie|  Number of channels per bookie  |16|