You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/05/18 13:58:13 UTC
[incubator-pulsar] branch master updated: Ensure
BookKeeperClientFactory is only instantiated once in PulsarService (#1804)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 655b7c0 Ensure BookKeeperClientFactory is only instantiated once in PulsarService (#1804)
655b7c0 is described below
commit 655b7c0edb32a5c22b3a9941764f9867fcb08d36
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Fri May 18 06:58:10 2018 -0700
Ensure BookKeeperClientFactory is only instantiated once in PulsarService (#1804)
---
.../pulsar/broker/BookKeeperClientFactoryImpl.java | 28 +++++++++++++++-------
.../org/apache/pulsar/broker/PulsarService.java | 8 +++++--
.../broker/auth/MockedPulsarServiceBaseTest.java | 2 +-
.../apache/pulsar/broker/web/WebServiceTest.java | 2 +-
4 files changed, 27 insertions(+), 13 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
index 0677800..c121854 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
@@ -33,8 +34,8 @@ import org.apache.zookeeper.ZooKeeper;
public class BookKeeperClientFactoryImpl implements BookKeeperClientFactory {
- private ZooKeeperCache rackawarePolicyZkCache;
- private ZooKeeperCache clientIsolationZkCache;
+ private final AtomicReference<ZooKeeperCache> rackawarePolicyZkCache = new AtomicReference<>();
+ private final AtomicReference<ZooKeeperCache> clientIsolationZkCache = new AtomicReference<>();
@Override
public BookKeeper create(ServiceConfiguration conf, ZooKeeper zkClient) throws IOException {
@@ -67,9 +68,14 @@ public class BookKeeperClientFactoryImpl implements BookKeeperClientFactory {
bkConf.setEnsemblePlacementPolicy(RackawareEnsemblePlacementPolicy.class);
bkConf.setProperty(RackawareEnsemblePlacementPolicy.REPP_DNS_RESOLVER_CLASS,
ZkBookieRackAffinityMapping.class.getName());
- this.rackawarePolicyZkCache = new ZooKeeperCache(zkClient) {
+
+ ZooKeeperCache zkc = new ZooKeeperCache(zkClient) {
};
- bkConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, this.rackawarePolicyZkCache);
+ if (!rackawarePolicyZkCache.compareAndSet(null, zkc)) {
+ zkc.stop();
+ }
+
+ bkConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, this.rackawarePolicyZkCache.get());
}
if (conf.getBookkeeperClientIsolationGroups() != null && !conf.getBookkeeperClientIsolationGroups().isEmpty()) {
@@ -77,8 +83,12 @@ public class BookKeeperClientFactoryImpl implements BookKeeperClientFactory {
bkConf.setProperty(ZkIsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
conf.getBookkeeperClientIsolationGroups());
if (bkConf.getProperty(ZooKeeperCache.ZK_CACHE_INSTANCE) == null) {
- this.clientIsolationZkCache = new ZooKeeperCache(zkClient) {
+ ZooKeeperCache zkc = new ZooKeeperCache(zkClient) {
};
+
+ if (!clientIsolationZkCache.compareAndSet(null, zkc)) {
+ zkc.stop();
+ }
bkConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, this.clientIsolationZkCache);
}
}
@@ -91,11 +101,11 @@ public class BookKeeperClientFactoryImpl implements BookKeeperClientFactory {
}
public void close() {
- if (this.rackawarePolicyZkCache != null) {
- this.rackawarePolicyZkCache.stop();
+ if (this.rackawarePolicyZkCache.get() != null) {
+ this.rackawarePolicyZkCache.get().stop();
}
- if (this.clientIsolationZkCache != null) {
- this.clientIsolationZkCache.stop();
+ if (this.clientIsolationZkCache.get() != null) {
+ this.clientIsolationZkCache.get().stop();
}
}
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 5f6dbca..3168982 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -320,7 +320,7 @@ public class PulsarService implements AutoCloseable {
// Initialize and start service to access configuration repository.
this.startZkCacheService();
- this.bkClientFactory = getBookKeeperClientFactory();
+ this.bkClientFactory = newBookKeeperClientFactory();
managedLedgerClientFactory = new ManagedLedgerClientFactory(config, getZkClient(), bkClientFactory);
this.brokerService = new BrokerService(this);
@@ -695,10 +695,14 @@ public class PulsarService implements AutoCloseable {
return zkClientFactory;
}
- public BookKeeperClientFactory getBookKeeperClientFactory() {
+ public BookKeeperClientFactory newBookKeeperClientFactory() {
return new BookKeeperClientFactoryImpl();
}
+ public BookKeeperClientFactory getBookKeeperClientFactory() {
+ return bkClientFactory;
+ }
+
protected synchronized ScheduledExecutorService getCompactorExecutor() {
if (this.compactorExecutor == null) {
compactorExecutor = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("compaction"));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index 23592b5..bc98efa 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -201,7 +201,7 @@ public abstract class MockedPulsarServiceBaseTest {
protected void setupBrokerMocks(PulsarService pulsar) throws Exception {
// Override default providers with mocked ones
doReturn(mockZooKeeperClientFactory).when(pulsar).getZooKeeperClientFactory();
- doReturn(mockBookKeeperClientFactory).when(pulsar).getBookKeeperClientFactory();
+ doReturn(mockBookKeeperClientFactory).when(pulsar).newBookKeeperClientFactory();
Supplier<NamespaceService> namespaceServiceSupplier = () -> spy(new NamespaceService(pulsar));
doReturn(namespaceServiceSupplier).when(pulsar).getNamespaceServiceProvider();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java
index 542c5d7..30e4554 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java
@@ -259,7 +259,7 @@ public class WebServiceTest {
config.setZookeeperServers("localhost:2181");
pulsar = spy(new PulsarService(config));
doReturn(new MockedZooKeeperClientFactoryImpl()).when(pulsar).getZooKeeperClientFactory();
- doReturn(new MockedBookKeeperClientFactory()).when(pulsar).getBookKeeperClientFactory();
+ doReturn(new MockedBookKeeperClientFactory()).when(pulsar).newBookKeeperClientFactory();
pulsar.start();
try {
--
To stop receiving notification emails like this one, please contact
mmerli@apache.org.