You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/05/18 13:58:12 UTC

[GitHub] merlimat closed pull request #1804: Ensure BookKeeperClientFactory is only instantiated once in PulsarService

merlimat closed pull request #1804: Ensure BookKeeperClientFactory is only instantiated once in PulsarService
URL: https://github.com/apache/incubator-pulsar/pull/1804
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
index 0677800c66..c1218540b2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
@@ -20,6 +20,7 @@
 
 import java.io.IOException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
@@ -33,8 +34,8 @@
 
 public class BookKeeperClientFactoryImpl implements BookKeeperClientFactory {
 
-    private ZooKeeperCache rackawarePolicyZkCache;
-    private ZooKeeperCache clientIsolationZkCache;
+    private final AtomicReference<ZooKeeperCache> rackawarePolicyZkCache = new AtomicReference<>();
+    private final AtomicReference<ZooKeeperCache> clientIsolationZkCache = new AtomicReference<>();
 
     @Override
     public BookKeeper create(ServiceConfiguration conf, ZooKeeper zkClient) throws IOException {
@@ -67,9 +68,14 @@ public BookKeeper create(ServiceConfiguration conf, ZooKeeper zkClient) throws I
             bkConf.setEnsemblePlacementPolicy(RackawareEnsemblePlacementPolicy.class);
             bkConf.setProperty(RackawareEnsemblePlacementPolicy.REPP_DNS_RESOLVER_CLASS,
                     ZkBookieRackAffinityMapping.class.getName());
-            this.rackawarePolicyZkCache = new ZooKeeperCache(zkClient) {
+
+            ZooKeeperCache zkc = new ZooKeeperCache(zkClient) {
             };
-            bkConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, this.rackawarePolicyZkCache);
+            if (!rackawarePolicyZkCache.compareAndSet(null, zkc)) {
+                zkc.stop();
+            }
+
+            bkConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, this.rackawarePolicyZkCache.get());
         }
 
         if (conf.getBookkeeperClientIsolationGroups() != null && !conf.getBookkeeperClientIsolationGroups().isEmpty()) {
@@ -77,8 +83,12 @@ public BookKeeper create(ServiceConfiguration conf, ZooKeeper zkClient) throws I
             bkConf.setProperty(ZkIsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
                     conf.getBookkeeperClientIsolationGroups());
             if (bkConf.getProperty(ZooKeeperCache.ZK_CACHE_INSTANCE) == null) {
-                this.clientIsolationZkCache = new ZooKeeperCache(zkClient) {
+                ZooKeeperCache zkc = new ZooKeeperCache(zkClient) {
                 };
+
+                if (!clientIsolationZkCache.compareAndSet(null, zkc)) {
+                    zkc.stop();
+                }
                 bkConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, this.clientIsolationZkCache);
             }
         }
@@ -91,11 +101,11 @@ public BookKeeper create(ServiceConfiguration conf, ZooKeeper zkClient) throws I
     }
 
     public void close() {
-        if (this.rackawarePolicyZkCache != null) {
-            this.rackawarePolicyZkCache.stop();
+        if (this.rackawarePolicyZkCache.get() != null) {
+            this.rackawarePolicyZkCache.get().stop();
         }
-        if (this.clientIsolationZkCache != null) {
-            this.clientIsolationZkCache.stop();
+        if (this.clientIsolationZkCache.get() != null) {
+            this.clientIsolationZkCache.get().stop();
         }
     }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 5f6dbca934..3168982e9b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -320,7 +320,7 @@ public void start() throws PulsarServerException {
             // Initialize and start service to access configuration repository.
             this.startZkCacheService();
 
-            this.bkClientFactory = getBookKeeperClientFactory();
+            this.bkClientFactory = newBookKeeperClientFactory();
             managedLedgerClientFactory = new ManagedLedgerClientFactory(config, getZkClient(), bkClientFactory);
 
             this.brokerService = new BrokerService(this);
@@ -695,10 +695,14 @@ public ZooKeeperClientFactory getZooKeeperClientFactory() {
         return zkClientFactory;
     }
 
-    public BookKeeperClientFactory getBookKeeperClientFactory() {
+    public BookKeeperClientFactory newBookKeeperClientFactory() {
         return new BookKeeperClientFactoryImpl();
     }
 
+    public BookKeeperClientFactory getBookKeeperClientFactory() {
+        return bkClientFactory;
+    }
+
     protected synchronized ScheduledExecutorService getCompactorExecutor() {
         if (this.compactorExecutor == null) {
             compactorExecutor = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("compaction"));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index 23592b50db..bc98efa557 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -201,7 +201,7 @@ protected PulsarService startBroker(ServiceConfiguration conf) throws Exception
     protected void setupBrokerMocks(PulsarService pulsar) throws Exception {
         // Override default providers with mocked ones
         doReturn(mockZooKeeperClientFactory).when(pulsar).getZooKeeperClientFactory();
-        doReturn(mockBookKeeperClientFactory).when(pulsar).getBookKeeperClientFactory();
+        doReturn(mockBookKeeperClientFactory).when(pulsar).newBookKeeperClientFactory();
 
         Supplier<NamespaceService> namespaceServiceSupplier = () -> spy(new NamespaceService(pulsar));
         doReturn(namespaceServiceSupplier).when(pulsar).getNamespaceServiceProvider();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java
index 542c5d77e2..30e455412d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java
@@ -259,7 +259,7 @@ private void setupEnv(boolean enableFilter, String minApiVersion, boolean allowU
         config.setZookeeperServers("localhost:2181");
         pulsar = spy(new PulsarService(config));
         doReturn(new MockedZooKeeperClientFactoryImpl()).when(pulsar).getZooKeeperClientFactory();
-        doReturn(new MockedBookKeeperClientFactory()).when(pulsar).getBookKeeperClientFactory();
+        doReturn(new MockedBookKeeperClientFactory()).when(pulsar).newBookKeeperClientFactory();
         pulsar.start();
 
         try {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services