You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2021/04/29 11:45:26 UTC
[pulsar] branch branch-2.7 updated: Allow to configure BookKeeper
Opportunistic Striping Feature (and all BK client features using
bookkeeper_ prefix) (#9232)
This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.7 by this push:
new baddf9d Allow to configure BookKeeper Opportunistic Striping Feature (and all BK client features using bookkeeper_ prefix) (#9232)
baddf9d is described below
commit baddf9dfd0cd65742f09d4a2af165794fd9e9cdf
Author: Enrico Olivelli <eo...@gmail.com>
AuthorDate: Mon Feb 1 03:48:37 2021 +0100
Allow to configure BookKeeper Opportunistic Striping Feature (and all BK client features using bookkeeper_ prefix) (#9232)
BookKeeper 4.12 introduces the 'Opportunistic Striping' feature.
In BK terms 'striping' happens when EnsembleSize is greater than WriteQuorumSize, in this mode the entries are distributed round robin over a set of bookies, in order to achieve better performances as you can use the resources of more bookies.
For instance in a small HA cluster, with only 3 bookies, you must run Pulsar with 2-2-2 replication parameters (EnsembleSize=2,WriteQuorumSize=2,AckQuorumSize=2).
You cannot set EnsembleSize=3 (and thus use 'striping') because in case of temporary outage of a single bookie the BK client is not able to create an ensemble with 3 bookies.
With Opportunistic Striping you can use 3-2-2 and when the system is fully up-and-running with 3 bookies then you go with striping, but during single bookie outages (like during reconfigurations/updates) you fall back to 2-2-2.
This is not about consistency or durability, it is only about having the ability to get the most out of your bookkeeper cluster.
- Add a generic way to configure internal BookKeeper client options, any entry start starts with `bookkeeper_` is passed to the BK client configuration after stripping the prefix
- Add unit tests for `bookkeeper_opportunisticStriping` configuration option
- Add configuration example in broker.conf
- the change add new tests
(cherry picked from commit bc69ad29e01a005ef2dc3eae36aed429b42495aa)
---
conf/broker.conf | 7 ++
.../apache/pulsar/broker/ServiceConfiguration.java | 1 +
.../pulsar/broker/BookKeeperClientFactoryImpl.java | 20 ++++-
.../broker/BookKeeperClientFactoryImplTest.java | 13 +++
.../pulsar/broker/service/BkEnsemblesTestBase.java | 15 ++--
.../broker/service/OpportunisticStripingTest.java | 95 ++++++++++++++++++++++
6 files changed, 140 insertions(+), 11 deletions(-)
diff --git a/conf/broker.conf b/conf/broker.conf
index 45e5d8c..4e689c6 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -791,6 +791,13 @@ managedLedgerDefaultWriteQuorum=2
# Number of guaranteed copies (acks to wait before write is complete)
managedLedgerDefaultAckQuorum=2
+# with OpportunisticStriping=true the ensembleSize is adapted automatically to writeQuorum
+# in case of lack of enough bookies
+#bookkeeper_opportunisticStriping=false
+
+# you can add other configuration options for the BookKeeper client
+# by prefixing them with bookkeeper_
+
# How frequently to flush the cursor positions that were accumulated due to rate limiting. (seconds).
# Default is 60 seconds
managedLedgerCursorPositionFlushSeconds = 60
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index c6b64d7..c9001a2 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -1044,6 +1044,7 @@ public class ServiceConfiguration implements PulsarConfiguration {
+ " and resolving its metadata service location"
)
private String bookkeeperMetadataServiceUri;
+
@FieldContext(
category = CATEGORY_STORAGE_BK,
doc = "Authentication plugin to use when connecting to bookies"
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
index 7709930..f00c905 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
@@ -27,10 +27,11 @@ import static org.apache.bookkeeper.client.RegionAwareEnsemblePlacementPolicy.RE
import java.io.IOException;
import java.util.Map;
import java.util.Optional;
+import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
-
import com.google.common.annotations.VisibleForTesting;
+import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
@@ -48,6 +49,7 @@ import org.apache.zookeeper.ZooKeeper;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
@SuppressWarnings("deprecation")
+@Slf4j
public class BookKeeperClientFactoryImpl implements BookKeeperClientFactory {
private final AtomicReference<ZooKeeperCache> rackawarePolicyZkCache = new AtomicReference<>();
@@ -135,9 +137,19 @@ public class BookKeeperClientFactoryImpl implements BookKeeperClientFactory {
bkConf.setReorderReadSequenceEnabled(conf.isBookkeeperClientReorderReadSequenceEnabled());
bkConf.setExplictLacInterval(conf.getBookkeeperExplicitLacIntervalInMills());
- bkConf.setGetBookieInfoIntervalSeconds(conf.getBookkeeperClientGetBookieInfoIntervalSeconds(), TimeUnit.SECONDS);
- bkConf.setGetBookieInfoRetryIntervalSeconds(conf.getBookkeeperClientGetBookieInfoRetryIntervalSeconds(), TimeUnit.SECONDS);
-
+ bkConf.setGetBookieInfoIntervalSeconds(
+ conf.getBookkeeperClientGetBookieInfoIntervalSeconds(), TimeUnit.SECONDS);
+ bkConf.setGetBookieInfoRetryIntervalSeconds(
+ conf.getBookkeeperClientGetBookieInfoRetryIntervalSeconds(), TimeUnit.SECONDS);
+ Properties allProps = conf.getProperties();
+ allProps.forEach((key, value) -> {
+ String sKey = key.toString();
+ if (sKey.startsWith("bookkeeper_") && value != null) {
+ String bkExtraConfigKey = sKey.substring(11);
+ log.info("Extra BookKeeper client configuration {}, setting {}={}", sKey, bkExtraConfigKey, value);
+ bkConf.setProperty(bkExtraConfigKey, value);
+ }
+ });
return bkConf;
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/BookKeeperClientFactoryImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BookKeeperClientFactoryImplTest.java
index 0f170717..13c352c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/BookKeeperClientFactoryImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BookKeeperClientFactoryImplTest.java
@@ -205,4 +205,17 @@ public class BookKeeperClientFactoryImplTest {
}
}
+ @Test
+ public void testOpportunisticStripingConfiguration() {
+ BookKeeperClientFactoryImpl factory = new BookKeeperClientFactoryImpl();
+ ServiceConfiguration conf = new ServiceConfiguration();
+ // default value
+ assertEquals(factory.createBkClientConfiguration(conf).getOpportunisticStriping(), false);
+ conf.getProperties().setProperty("bookkeeper_opportunisticStriping", "true");
+ assertEquals(factory.createBkClientConfiguration(conf).getOpportunisticStriping(), true);
+ conf.getProperties().setProperty("bookkeeper_opportunisticStriping", "false");
+ assertEquals(factory.createBkClientConfiguration(conf).getOpportunisticStriping(), false);
+
+ }
+
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java
index ec4e24d..a44e6d6 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java
@@ -57,6 +57,10 @@ public abstract class BkEnsemblesTestBase {
this.numberOfBookies = numberOfBookies;
}
+ protected void configurePulsar(ServiceConfiguration config) throws Exception {
+ //overridable by subclasses
+ }
+
@BeforeMethod
protected void setup() throws Exception {
try {
@@ -78,6 +82,7 @@ public abstract class BkEnsemblesTestBase {
config.setAdvertisedAddress("127.0.0.1");
config.setAllowAutoTopicCreationType("non-partitioned");
config.setZooKeeperOperationTimeoutSeconds(1);
+ configurePulsar(config);
pulsar = new PulsarService(config);
pulsar.start();
@@ -95,13 +100,9 @@ public abstract class BkEnsemblesTestBase {
@AfterMethod(alwaysRun = true)
protected void shutdown() throws Exception {
- try {
- admin.close();
- pulsar.close();
- bkEnsemble.stop();
- } catch (Throwable t) {
- log.warn("Error cleaning up broker test setup state", t);
- }
+ admin.close();
+ pulsar.close();
+ bkEnsemble.stop();
}
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OpportunisticStripingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OpportunisticStripingTest.java
new file mode 100644
index 0000000..221367c
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OpportunisticStripingTest.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.client.api.BookKeeper;
+import org.apache.bookkeeper.client.api.LedgerMetadata;
+import org.apache.bookkeeper.client.api.ListLedgersResult;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import static org.testng.AssertJUnit.assertEquals;
+import static org.testng.AssertJUnit.assertTrue;
+import org.testng.annotations.Test;
+
+/**
+ * With BookKeeper Opportunistic Striping feature we can allow Pulsar to work
+ * with only WQ bookie during temporary outages of some bookie.
+ */
+public class OpportunisticStripingTest extends BkEnsemblesTestBase {
+
+ public OpportunisticStripingTest() {
+ // starting only two bookies
+ super(2);
+ }
+
+ @Override
+ protected void configurePulsar(ServiceConfiguration config) throws Exception {
+ // we would like to stripe over 5 bookies
+ config.setManagedLedgerDefaultEnsembleSize(5);
+ // we want 2 copies for each entry
+ config.setManagedLedgerDefaultWriteQuorum(2);
+ config.setManagedLedgerDefaultAckQuorum(2);
+
+ config.setBrokerDeleteInactiveTopicsEnabled(false);
+ config.getProperties().setProperty("bookkeeper_opportunisticStriping", "true");
+ }
+
+ @Test
+ public void testOpportunisticStriping() throws Exception {
+
+ try (PulsarClient client = PulsarClient.builder()
+ .serviceUrl(pulsar.getWebServiceAddress())
+ .statsInterval(0, TimeUnit.SECONDS)
+ .build();) {
+
+ final String ns1 = "prop/usc/opportunistic1";
+ admin.namespaces().createNamespace(ns1);
+
+ final String topic1 = "persistent://" + ns1 + "/my-topic";
+ Producer<byte[]> producer = client.newProducer().topic(topic1).create();
+ for (int i = 0; i < 10; i++) {
+ String message = "my-message-" + i;
+ producer.send(message.getBytes());
+ }
+
+ // verify that all ledgers has the proper writequorumsize,
+ // equals to the number of available bookies (in this case 2)
+ ClientConfiguration clientConfiguration = new ClientConfiguration();
+ clientConfiguration.setZkServers("localhost:" + this.bkEnsemble.getZookeeperPort());
+
+ try (BookKeeper bkAdmin = BookKeeper.newBuilder(clientConfiguration).build()) {
+ try (ListLedgersResult list = bkAdmin.newListLedgersOp().execute().get();) {
+ int count = 0;
+ for (long ledgerId : list.toIterable()) {
+ LedgerMetadata ledgerMetadata = bkAdmin.getLedgerMetadata(ledgerId).get();
+ assertEquals(2, ledgerMetadata.getEnsembleSize());
+ assertEquals(2, ledgerMetadata.getWriteQuorumSize());
+ assertEquals(2, ledgerMetadata.getAckQuorumSize());
+ count++;
+ }
+ assertTrue(count > 0);
+ }
+ }
+ }
+ }
+
+}