You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/05/18 13:58:13 UTC

[incubator-pulsar] branch master updated: Ensure BookKeeperClientFactory is only instantiated once in PulsarService (#1804)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 655b7c0  Ensure BookKeeperClientFactory is only instantiated once in PulsarService (#1804)
655b7c0 is described below

commit 655b7c0edb32a5c22b3a9941764f9867fcb08d36
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Fri May 18 06:58:10 2018 -0700

    Ensure BookKeeperClientFactory is only instantiated once in PulsarService (#1804)
---
 .../pulsar/broker/BookKeeperClientFactoryImpl.java | 28 +++++++++++++++-------
 .../org/apache/pulsar/broker/PulsarService.java    |  8 +++++--
 .../broker/auth/MockedPulsarServiceBaseTest.java   |  2 +-
 .../apache/pulsar/broker/web/WebServiceTest.java   |  2 +-
 4 files changed, 27 insertions(+), 13 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
index 0677800..c121854 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker;
 
 import java.io.IOException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
@@ -33,8 +34,8 @@ import org.apache.zookeeper.ZooKeeper;
 
 public class BookKeeperClientFactoryImpl implements BookKeeperClientFactory {
 
-    private ZooKeeperCache rackawarePolicyZkCache;
-    private ZooKeeperCache clientIsolationZkCache;
+    private final AtomicReference<ZooKeeperCache> rackawarePolicyZkCache = new AtomicReference<>();
+    private final AtomicReference<ZooKeeperCache> clientIsolationZkCache = new AtomicReference<>();
 
     @Override
     public BookKeeper create(ServiceConfiguration conf, ZooKeeper zkClient) throws IOException {
@@ -67,9 +68,14 @@ public class BookKeeperClientFactoryImpl implements BookKeeperClientFactory {
             bkConf.setEnsemblePlacementPolicy(RackawareEnsemblePlacementPolicy.class);
             bkConf.setProperty(RackawareEnsemblePlacementPolicy.REPP_DNS_RESOLVER_CLASS,
                     ZkBookieRackAffinityMapping.class.getName());
-            this.rackawarePolicyZkCache = new ZooKeeperCache(zkClient) {
+
+            ZooKeeperCache zkc = new ZooKeeperCache(zkClient) {
             };
-            bkConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, this.rackawarePolicyZkCache);
+            if (!rackawarePolicyZkCache.compareAndSet(null, zkc)) {
+                zkc.stop();
+            }
+
+            bkConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, this.rackawarePolicyZkCache.get());
         }
 
         if (conf.getBookkeeperClientIsolationGroups() != null && !conf.getBookkeeperClientIsolationGroups().isEmpty()) {
@@ -77,8 +83,12 @@ public class BookKeeperClientFactoryImpl implements BookKeeperClientFactory {
             bkConf.setProperty(ZkIsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
                     conf.getBookkeeperClientIsolationGroups());
             if (bkConf.getProperty(ZooKeeperCache.ZK_CACHE_INSTANCE) == null) {
-                this.clientIsolationZkCache = new ZooKeeperCache(zkClient) {
+                ZooKeeperCache zkc = new ZooKeeperCache(zkClient) {
                 };
+
+                if (!clientIsolationZkCache.compareAndSet(null, zkc)) {
+                    zkc.stop();
+                }
                 bkConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, this.clientIsolationZkCache);
             }
         }
@@ -91,11 +101,11 @@ public class BookKeeperClientFactoryImpl implements BookKeeperClientFactory {
     }
 
     public void close() {
-        if (this.rackawarePolicyZkCache != null) {
-            this.rackawarePolicyZkCache.stop();
+        if (this.rackawarePolicyZkCache.get() != null) {
+            this.rackawarePolicyZkCache.get().stop();
         }
-        if (this.clientIsolationZkCache != null) {
-            this.clientIsolationZkCache.stop();
+        if (this.clientIsolationZkCache.get() != null) {
+            this.clientIsolationZkCache.get().stop();
         }
     }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 5f6dbca..3168982 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -320,7 +320,7 @@ public class PulsarService implements AutoCloseable {
             // Initialize and start service to access configuration repository.
             this.startZkCacheService();
 
-            this.bkClientFactory = getBookKeeperClientFactory();
+            this.bkClientFactory = newBookKeeperClientFactory();
             managedLedgerClientFactory = new ManagedLedgerClientFactory(config, getZkClient(), bkClientFactory);
 
             this.brokerService = new BrokerService(this);
@@ -695,10 +695,14 @@ public class PulsarService implements AutoCloseable {
         return zkClientFactory;
     }
 
-    public BookKeeperClientFactory getBookKeeperClientFactory() {
+    public BookKeeperClientFactory newBookKeeperClientFactory() {
         return new BookKeeperClientFactoryImpl();
     }
 
+    public BookKeeperClientFactory getBookKeeperClientFactory() {
+        return bkClientFactory;
+    }
+
     protected synchronized ScheduledExecutorService getCompactorExecutor() {
         if (this.compactorExecutor == null) {
             compactorExecutor = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("compaction"));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index 23592b5..bc98efa 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -201,7 +201,7 @@ public abstract class MockedPulsarServiceBaseTest {
     protected void setupBrokerMocks(PulsarService pulsar) throws Exception {
         // Override default providers with mocked ones
         doReturn(mockZooKeeperClientFactory).when(pulsar).getZooKeeperClientFactory();
-        doReturn(mockBookKeeperClientFactory).when(pulsar).getBookKeeperClientFactory();
+        doReturn(mockBookKeeperClientFactory).when(pulsar).newBookKeeperClientFactory();
 
         Supplier<NamespaceService> namespaceServiceSupplier = () -> spy(new NamespaceService(pulsar));
         doReturn(namespaceServiceSupplier).when(pulsar).getNamespaceServiceProvider();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java
index 542c5d7..30e4554 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java
@@ -259,7 +259,7 @@ public class WebServiceTest {
         config.setZookeeperServers("localhost:2181");
         pulsar = spy(new PulsarService(config));
         doReturn(new MockedZooKeeperClientFactoryImpl()).when(pulsar).getZooKeeperClientFactory();
-        doReturn(new MockedBookKeeperClientFactory()).when(pulsar).getBookKeeperClientFactory();
+        doReturn(new MockedBookKeeperClientFactory()).when(pulsar).newBookKeeperClientFactory();
         pulsar.start();
 
         try {

-- 
To stop receiving notification emails like this one, please contact
mmerli@apache.org.