You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2019/06/26 01:48:13 UTC

[pulsar] branch branch-2.4 updated: [managed-ledger] close bk-client factory gracefully (#4580)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new c8b2573  [managed-ledger] close bk-client factory gracefully (#4580)
c8b2573 is described below

commit c8b257397f989dc5fef2344b71d01fd4a228a606
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Mon Jun 24 18:16:12 2019 -0700

    [managed-ledger] close bk-client factory gracefully (#4580)
    
    ### Motivation
    User can create tools on bookkeeper using ManagedLedger factory which provides [constructor](https://github.com/apache/pulsar/blob/master/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java#L121) to create ml-factory using self-managed bookkeeper (it's not used by broker).
    So, in case of self-managed bk-client, ML-Factory couldn't shutdown it gracefully and we see issue: #4573
    
    ### Modification
    - ML-Factory creates `DefaultBkFactory` to create self-managed bk-client and shutdowns same bk-client while closing the resource.
---
 .../mledger/impl/ManagedLedgerFactoryImpl.java     | 33 ++++++++++++++--------
 1 file changed, 21 insertions(+), 12 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index 95ee6b4..39ff102 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -27,6 +27,7 @@ import com.google.common.collect.Maps;
 
 import io.netty.util.concurrent.DefaultThreadFactory;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -116,15 +117,8 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
     }
 
     private ManagedLedgerFactoryImpl(ZooKeeper zkc, ClientConfiguration bkClientConfiguration,
-            ManagedLedgerFactoryConfig config)
-            throws Exception {
-        this((policyConfig) -> {
-            try {
-                return new BookKeeper(bkClientConfiguration, zkc);
-            } catch (Exception e) {
-                throw new IllegalStateException(e);
-            }
-        }, true /* isBookkeeperManaged */, zkc, config);
+            ManagedLedgerFactoryConfig config) throws Exception {
+        this(new DefaultBkFactory(bkClientConfiguration, zkc), true /* isBookkeeperManaged */, zkc, config);
     }
 
     public ManagedLedgerFactoryImpl(BookKeeper bookKeeper, ZooKeeper zooKeeper) throws Exception {
@@ -171,6 +165,21 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
         cacheEvictionExecutor.execute(this::cacheEvictionTask);
     }
 
+    static class DefaultBkFactory implements BookkeeperFactoryForCustomEnsemblePlacementPolicy {
+
+        private final BookKeeper bkClient;
+
+        public DefaultBkFactory(ClientConfiguration bkClientConfiguration, ZooKeeper zkc)
+                throws BKException, IOException, InterruptedException {
+            bkClient = new BookKeeper(bkClientConfiguration, zkc);
+        }
+
+        @Override
+        public BookKeeper get(EnsemblePlacementPolicyConfig policy) {
+            return bkClient;
+        }
+    }
+    
     private synchronized void refreshStats() {
         long now = System.nanoTime();
         long period = now - lastStatTimestamp;
@@ -428,9 +437,9 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
 
         if (isBookkeeperManaged) {
             try {
-                BookKeeper bkFactory = bookkeeperFactory.get();
-                if (bkFactory != null) {
-                    bkFactory.close();
+                BookKeeper bookkeeper = bookkeeperFactory.get();
+                if (bookkeeper != null) {
+                    bookkeeper.close();
                 }
             } catch (BKException e) {
                 throw new ManagedLedgerException(e);