You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2019/06/26 01:48:13 UTC
[pulsar] branch branch-2.4 updated: [managed-ledger] close
bk-client factory gracefully (#4580)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.4 by this push:
new c8b2573 [managed-ledger] close bk-client factory gracefully (#4580)
c8b2573 is described below
commit c8b257397f989dc5fef2344b71d01fd4a228a606
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Mon Jun 24 18:16:12 2019 -0700
[managed-ledger] close bk-client factory gracefully (#4580)
### Motivation
User can create tools on bookkeeper using ManagedLedger factory which provides [constructor](https://github.com/apache/pulsar/blob/master/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java#L121) to create ml-factory using self-managed bookkeeper (it's not used by broker).
So, in case of self-managed bk-client, ML-Factory couldn't shutdown it gracefully and we see issue: #4573
### Modification
- ML-Factory creates `DefaultBkFactory` to create self-managed bk-client and shutdowns same bk-client while closing the resource.
---
.../mledger/impl/ManagedLedgerFactoryImpl.java | 33 ++++++++++++++--------
1 file changed, 21 insertions(+), 12 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index 95ee6b4..39ff102 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -27,6 +27,7 @@ import com.google.common.collect.Maps;
import io.netty.util.concurrent.DefaultThreadFactory;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -116,15 +117,8 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
}
private ManagedLedgerFactoryImpl(ZooKeeper zkc, ClientConfiguration bkClientConfiguration,
- ManagedLedgerFactoryConfig config)
- throws Exception {
- this((policyConfig) -> {
- try {
- return new BookKeeper(bkClientConfiguration, zkc);
- } catch (Exception e) {
- throw new IllegalStateException(e);
- }
- }, true /* isBookkeeperManaged */, zkc, config);
+ ManagedLedgerFactoryConfig config) throws Exception {
+ this(new DefaultBkFactory(bkClientConfiguration, zkc), true /* isBookkeeperManaged */, zkc, config);
}
public ManagedLedgerFactoryImpl(BookKeeper bookKeeper, ZooKeeper zooKeeper) throws Exception {
@@ -171,6 +165,21 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
cacheEvictionExecutor.execute(this::cacheEvictionTask);
}
+ static class DefaultBkFactory implements BookkeeperFactoryForCustomEnsemblePlacementPolicy {
+
+ private final BookKeeper bkClient;
+
+ public DefaultBkFactory(ClientConfiguration bkClientConfiguration, ZooKeeper zkc)
+ throws BKException, IOException, InterruptedException {
+ bkClient = new BookKeeper(bkClientConfiguration, zkc);
+ }
+
+ @Override
+ public BookKeeper get(EnsemblePlacementPolicyConfig policy) {
+ return bkClient;
+ }
+ }
+
private synchronized void refreshStats() {
long now = System.nanoTime();
long period = now - lastStatTimestamp;
@@ -428,9 +437,9 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
if (isBookkeeperManaged) {
try {
- BookKeeper bkFactory = bookkeeperFactory.get();
- if (bkFactory != null) {
- bkFactory.close();
+ BookKeeper bookkeeper = bookkeeperFactory.get();
+ if (bookkeeper != null) {
+ bookkeeper.close();
}
} catch (BKException e) {
throw new ManagedLedgerException(e);