You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/01/07 02:24:28 UTC

[pulsar] branch branch-2.7 updated (5668058 -> 897203d)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a change to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


    from 5668058  Getting the stats of a non-persistent topic that has been cleaned causes it to re-appear (#9029)
     new 4c2cded  Fix idea warning (#8588)
     new 897203d  Fixes the recovery not respect to the isolation group settings (#8961)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../mledger/impl/LedgerMetadataUtils.java          |  22 +++++
 .../mledger/impl/ManagedLedgerFactoryImpl.java     |  39 +-------
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  24 ++++-
 .../bookkeeper/mledger/offload/OffloadUtils.java   |   1 -
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java |  21 ++++-
 .../broker/authentication/PulsarSaslServer.java    |   1 -
 .../broker/cache/ConfigurationCacheService.java    |   1 -
 .../apache/pulsar/PulsarClusterMetadataSetup.java  |   2 +-
 .../pulsar/broker/ManagedLedgerClientFactory.java  |   2 +-
 .../pulsar/broker/admin/impl/NamespacesBase.java   |   4 +-
 .../pulsar/common/naming/NamespaceBundles.java     |   2 +-
 .../broker/service/BrokerBookieIsolationTest.java  |   2 +-
 .../org/apache/pulsar/client/cli/CmdProduce.java   |   1 -
 .../pulsar/client/impl/UnAckedMessageTracker.java  |   1 -
 .../pulsar/client/impl/ZeroQueueConsumerImpl.java  |   1 -
 .../client/impl/auth/AuthenticationDataTls.java    |   3 -
 .../pulsar/client/impl/auth/AuthenticationTls.java |   1 -
 .../client/impl/transaction/TransactionImpl.java   |   2 -
 pulsar-common/pom.xml                              |   5 +
 .../data/EnsemblePlacementPolicyConfig.java        |  76 ++++++++++++++++
 .../runtime/kubernetes/KubernetesRuntime.java      |   2 +-
 .../pulsar/functions/worker/WorkerConfig.java      |   1 -
 .../pulsar/functions/utils/ValidatorUtils.java     |   4 -
 .../utils/functioncache/FunctionCacheEntry.java    |   3 -
 .../pulsar/functions/worker/MembershipManager.java |   7 --
 .../pulsar/functions/worker/SchedulerManager.java  |   2 +-
 .../metadata/impl/zookeeper/ZKMetadataStore.java   |   1 -
 .../apache/pulsar/proxy/server/ProxyService.java   |   2 +-
 .../pulsar/zookeeper/GlobalZooKeeperCache.java     |   1 -
 .../pulsar/zookeeper/LocalBookkeeperEnsemble.java  |   2 +-
 .../pulsar/zookeeper/LocalZooKeeperCache.java      |   1 -
 .../zookeeper/LocalZooKeeperConnectionService.java |   5 +-
 .../ZkIsolatedBookieEnsemblePlacementPolicy.java   | 101 +++++++++++++++++----
 .../java/org/apache/pulsar/zookeeper/ZkUtils.java  |   8 +-
 .../apache/pulsar/zookeeper/ZooKeeperCache.java    |   6 +-
 .../pulsar/zookeeper/ZooKeeperChildrenCache.java   |   1 -
 .../pulsar/zookeeper/ZooKeeperSessionWatcher.java  |   1 +
 ...kIsolatedBookieEnsemblePlacementPolicyTest.java |  65 +++++++++++++
 38 files changed, 320 insertions(+), 104 deletions(-)
 create mode 100644 pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/EnsemblePlacementPolicyConfig.java


[pulsar] 01/02: Fix idea warning (#8588)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 4c2cded20c5b45f61711b5955d57e67c38572d50
Author: Shoothzj <sh...@gmail.com>
AuthorDate: Mon Nov 30 11:39:08 2020 +0800

    Fix idea warning (#8588)
    
    ### Motivation
    
    When open project in idea, it hints there are something to optimize to make clean code
    
    ### Modifications
    
    - Move the param annotation to it's origin class
    - The generic can be omit
    - Unused imports
    - incorrect log params
    
    (cherry picked from commit 77f7965673119ff40c929b065ee837fe2256a221)
---
 .../org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java     | 2 +-
 .../java/org/apache/bookkeeper/mledger/offload/OffloadUtils.java  | 1 -
 .../org/apache/pulsar/broker/authentication/PulsarSaslServer.java | 1 -
 .../org/apache/pulsar/broker/cache/ConfigurationCacheService.java | 1 -
 .../main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java   | 2 +-
 .../java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java  | 4 ++--
 .../java/org/apache/pulsar/common/naming/NamespaceBundles.java    | 2 +-
 .../src/main/java/org/apache/pulsar/client/cli/CmdProduce.java    | 1 -
 .../java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java | 1 -
 .../java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java | 1 -
 .../org/apache/pulsar/client/impl/auth/AuthenticationDataTls.java | 3 ---
 .../org/apache/pulsar/client/impl/auth/AuthenticationTls.java     | 1 -
 .../apache/pulsar/client/impl/transaction/TransactionImpl.java    | 2 --
 .../pulsar/functions/runtime/kubernetes/KubernetesRuntime.java    | 2 +-
 .../java/org/apache/pulsar/functions/worker/WorkerConfig.java     | 1 -
 .../java/org/apache/pulsar/functions/utils/ValidatorUtils.java    | 4 ----
 .../pulsar/functions/utils/functioncache/FunctionCacheEntry.java  | 3 ---
 .../org/apache/pulsar/functions/worker/MembershipManager.java     | 7 -------
 .../java/org/apache/pulsar/functions/worker/SchedulerManager.java | 2 +-
 .../apache/pulsar/metadata/impl/zookeeper/ZKMetadataStore.java    | 1 -
 .../main/java/org/apache/pulsar/proxy/server/ProxyService.java    | 2 +-
 .../java/org/apache/pulsar/zookeeper/GlobalZooKeeperCache.java    | 1 -
 .../java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java | 2 +-
 .../java/org/apache/pulsar/zookeeper/LocalZooKeeperCache.java     | 1 -
 .../apache/pulsar/zookeeper/LocalZooKeeperConnectionService.java  | 5 +----
 .../pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicy.java | 6 +++---
 .../src/main/java/org/apache/pulsar/zookeeper/ZkUtils.java        | 8 ++++++--
 .../src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java | 6 +++++-
 .../java/org/apache/pulsar/zookeeper/ZooKeeperChildrenCache.java  | 1 -
 .../java/org/apache/pulsar/zookeeper/ZooKeeperSessionWatcher.java | 1 +
 30 files changed, 25 insertions(+), 50 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 0c69147..ba50d2b 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -466,7 +466,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
                 }
 
                 if (!ManagedLedgerImpl.this.config.isLazyCursorRecovery()) {
-                    log.debug("[{}] Loading cursor {}", name);
+                    log.debug("[{}] Loading cursors", name);
 
                     for (final String cursorName : consumers) {
                         log.info("[{}] Loading cursor {}", name, cursorName);
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloadUtils.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloadUtils.java
index 83514a9..f4064e7 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloadUtils.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloadUtils.java
@@ -37,7 +37,6 @@ import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.Ledge
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.OffloadContext;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.OffloadDriverMetadata;
 import org.apache.bookkeeper.net.BookieId;
-import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.DataFormats;
 
 @Slf4j
diff --git a/pulsar-broker-auth-sasl/src/main/java/org/apache/pulsar/broker/authentication/PulsarSaslServer.java b/pulsar-broker-auth-sasl/src/main/java/org/apache/pulsar/broker/authentication/PulsarSaslServer.java
index 517c1fd..5fc6f26 100644
--- a/pulsar-broker-auth-sasl/src/main/java/org/apache/pulsar/broker/authentication/PulsarSaslServer.java
+++ b/pulsar-broker-auth-sasl/src/main/java/org/apache/pulsar/broker/authentication/PulsarSaslServer.java
@@ -38,7 +38,6 @@ import javax.security.sasl.SaslServer;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.common.api.AuthData;
-import org.apache.pulsar.common.sasl.KerberosName;
 import org.apache.pulsar.common.sasl.SaslConstants;
 
 
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/cache/ConfigurationCacheService.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/cache/ConfigurationCacheService.java
index 6fda6cd..87dd671 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/cache/ConfigurationCacheService.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/cache/ConfigurationCacheService.java
@@ -41,7 +41,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.fasterxml.jackson.core.type.TypeReference;
-import com.google.common.collect.Maps;
 
 /**
  * ConfigurationCacheService maintains a local in-memory cache of all the configurations and policies stored in
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java
index 3dba3ee..f64e4c8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java
@@ -363,7 +363,7 @@ public class PulsarClusterMetadataSetup {
         Long maxVal = ((long) 1) << 32;
         Long segSize = maxVal / numBundles;
         List<String> partitions = Lists.newArrayList();
-        partitions.add(String.format("0x%08x", 0l));
+        partitions.add(String.format("0x%08x", 0L));
         Long curPartition = segSize;
         for (int i = 0; i < numBundles; i++) {
             if (i != numBundles - 1) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index e091616..f06e416 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -1433,7 +1433,7 @@ public abstract class NamespacesBase extends AdminResource {
 
     protected void internalRemovePublishRate() {
         validateSuperUserAccess();
-        log.info("[{}] Remove namespace publish-rate {}/{}", clientAppId(), namespaceName);
+        log.info("[{}] Remove namespace publish-rate {}/{}", clientAppId(), namespaceName, topicName);
         Entry<Policies, Stat> policiesNode = null;
         try {
             final String path = path(POLICIES, namespaceName.toString());
@@ -2447,7 +2447,7 @@ public abstract class NamespacesBase extends AdminResource {
         Long maxVal = ((long) 1) << 32;
         Long segSize = maxVal / numBundles;
         List<String> partitions = Lists.newArrayList();
-        partitions.add(String.format("0x%08x", 0l));
+        partitions.add(String.format("0x%08x", 0L));
         Long curPartition = segSize;
         for (int i = 0; i < numBundles; i++) {
             if (i != numBundles - 1) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundles.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundles.java
index 9e77f75..607b2cf 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundles.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundles.java
@@ -81,7 +81,7 @@ public class NamespaceBundles {
                 lowerBound = upperBound;
             }
         } else {
-            this.partitions = new long[] { 0l };
+            this.partitions = new long[] { 0L };
             bundles.add(fullBundle);
         }
     }
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java
index 9b67a08..e44eb9f 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java
@@ -44,7 +44,6 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.AuthenticationDataProvider;
 import org.apache.pulsar.client.api.ClientBuilder;
-import org.apache.pulsar.client.api.CompressionType;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
index dfce149..e1a9bf9 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
@@ -32,7 +32,6 @@ import java.io.Closeable;
 import java.util.ArrayDeque;
 import java.util.HashSet;
 import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
index ae6f385..ba6e97d 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
@@ -37,7 +37,6 @@ import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
-import org.apache.pulsar.common.api.proto.PulsarApi.IntRange;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationDataTls.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationDataTls.java
index f11e974..588f614 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationDataTls.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationDataTls.java
@@ -19,8 +19,6 @@
 package org.apache.pulsar.client.impl.auth;
 
 import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
 import java.io.InputStream;
 import java.security.KeyManagementException;
 import java.security.PrivateKey;
@@ -28,7 +26,6 @@ import java.security.cert.Certificate;
 import java.security.cert.X509Certificate;
 import java.util.function.Supplier;
 
-import org.apache.commons.compress.utils.IOUtils;
 import org.apache.pulsar.client.api.AuthenticationDataProvider;
 import org.apache.pulsar.common.util.FileModifiedTimeUpdater;
 import org.apache.pulsar.common.util.SecurityUtility;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationTls.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationTls.java
index 326fa46..e5f8361 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationTls.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationTls.java
@@ -20,7 +20,6 @@ package org.apache.pulsar.client.impl.auth;
 
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
-import java.io.InputStream;
 import java.util.Map;
 import java.util.function.Supplier;
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
index 5efb58c..a122ff7 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
@@ -21,7 +21,6 @@ package org.apache.pulsar.client.impl.transaction;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -31,7 +30,6 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicLong;
 
 import com.google.common.collect.Lists;
-import lombok.Data;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.MessageId;
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
index f8a2ec1..6741d14 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
@@ -260,7 +260,7 @@ public class KubernetesRuntime implements Runtime {
                         authConfig,
                         "$" + ENV_SHARD_ID,
                         grpcPort,
-                        -1l,
+                        -1L,
                         logConfigFile,
                         secretsProviderClassName,
                         secretsProviderConfig,
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index 73acafa..32acae8 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -51,7 +51,6 @@ import org.apache.pulsar.functions.auth.KubernetesSecretsTokenAuthProvider;
 import org.apache.pulsar.functions.runtime.kubernetes.KubernetesRuntimeFactory;
 import org.apache.pulsar.functions.runtime.kubernetes.KubernetesRuntimeFactoryConfig;
 import org.apache.pulsar.functions.runtime.process.ProcessRuntimeFactoryConfig;
-import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
 import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactoryConfig;
 
 @Data
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/ValidatorUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/ValidatorUtils.java
index 9203cba..2d6325f 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/ValidatorUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/ValidatorUtils.java
@@ -21,7 +21,6 @@ package org.apache.pulsar.functions.utils;
 
 import lombok.extern.slf4j.Slf4j;
 import net.jodah.typetools.TypeResolver;
-import org.apache.commons.lang3.reflect.ConstructorUtils;
 import org.apache.pulsar.client.api.CryptoKeyReader;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.functions.CryptoConfig;
@@ -33,14 +32,11 @@ import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.io.core.Sink;
 import org.apache.pulsar.io.core.Source;
 
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
 import java.util.Map;
 
 import static org.apache.commons.lang3.StringUtils.isBlank;
 import static org.apache.commons.lang3.StringUtils.isEmpty;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
-import static org.apache.pulsar.common.util.Reflections.createInstance;
 
 @Slf4j
 public class ValidatorUtils {
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functioncache/FunctionCacheEntry.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functioncache/FunctionCacheEntry.java
index 81d39d9..e9fb409 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functioncache/FunctionCacheEntry.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functioncache/FunctionCacheEntry.java
@@ -33,9 +33,6 @@ import java.util.stream.Collectors;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 
-import static org.apache.commons.lang3.StringUtils.isNoneBlank;
-import static org.apache.pulsar.functions.utils.functioncache.FunctionCacheEntry.JAVA_INSTANCE_JAR_PROPERTY;
-
 import org.apache.pulsar.common.nar.NarClassLoader;
 
 /**
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
index 2bb5613..bb5a2d0 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
@@ -28,20 +28,13 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.ConsumerEventListener;
 import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.client.impl.ConsumerImpl;
 import org.apache.pulsar.common.functions.WorkerInfo;
 import org.apache.pulsar.common.policies.data.ConsumerStats;
 import org.apache.pulsar.common.policies.data.TopicStats;
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
index b1ac384..05c3790 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
@@ -213,7 +213,7 @@ public class SchedulerManager implements AutoCloseable {
                         try {
                             runnable.run();
                         } catch (Throwable th) {
-                            log.error("Encountered error when invoking scheduler", errMsg);
+                            log.error("Encountered error when invoking scheduler [{}]", errMsg);
                             errorNotifier.triggerError(th);
                         }
                     }
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/zookeeper/ZKMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/zookeeper/ZKMetadataStore.java
index eb03272..e35b583 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/zookeeper/ZKMetadataStore.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/zookeeper/ZKMetadataStore.java
@@ -36,7 +36,6 @@ import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException;
 import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
 import org.apache.pulsar.metadata.api.Stat;
-import org.apache.zookeeper.AsyncCallback.StatCallback;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.Code;
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
index e45f9c6..34ec40b 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
@@ -131,7 +131,7 @@ public class ProxyService implements Closeable {
                 new Semaphore(proxyConfig.getMaxConcurrentLookupRequests(), false));
 
         if (proxyConfig.getProxyLogLevel().isPresent()) {
-            proxyLogLevel = Integer.valueOf(proxyConfig.getProxyLogLevel().get());
+            proxyLogLevel = proxyConfig.getProxyLogLevel().get();
         } else {
             proxyLogLevel = 0;
         }
diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/GlobalZooKeeperCache.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/GlobalZooKeeperCache.java
index 9788b52..0b856f1 100644
--- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/GlobalZooKeeperCache.java
+++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/GlobalZooKeeperCache.java
@@ -40,7 +40,6 @@ import org.slf4j.LoggerFactory;
  * and invalidated by the ZNode path. For the data cache, ZNode data parsing is done at request time with the given
  * {@link Deserializer} argument.
  *
- * @param <T>
  */
 public class GlobalZooKeeperCache extends ZooKeeperCache implements Closeable {
     private static final Logger LOG = LoggerFactory.getLogger(GlobalZooKeeperCache.class);
diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java
index 925d5c6..0672c26 100644
--- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java
+++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java
@@ -85,7 +85,7 @@ public class LocalBookkeeperEnsemble {
     public static final int CONNECTION_TIMEOUT = 30000;
 
     int numberOfBookies;
-    private boolean clearOldData = false;
+    private boolean clearOldData;
 
     private static class BasePortManager implements Supplier<Integer> {
 
diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalZooKeeperCache.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalZooKeeperCache.java
index b6cb6d1..b3a1520 100644
--- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalZooKeeperCache.java
+++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalZooKeeperCache.java
@@ -29,7 +29,6 @@ import org.slf4j.LoggerFactory;
  * and invalidated by the ZNode path. For the data cache, ZNode data parsing is done at request time with the given
  * {@link Deserializer} argument.
  *
- * @param <T>
  */
 public class LocalZooKeeperCache extends ZooKeeperCache {
 
diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalZooKeeperConnectionService.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalZooKeeperConnectionService.java
index e4be27e..85ec400 100644
--- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalZooKeeperConnectionService.java
+++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalZooKeeperConnectionService.java
@@ -24,7 +24,6 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.pulsar.zookeeper.ZooKeeperClientFactory.SessionType;
-import org.apache.pulsar.zookeeper.ZooKeeperSessionWatcher.ShutdownService;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.NoNodeException;
@@ -108,9 +107,7 @@ public class LocalZooKeeperConnectionService implements Closeable {
 
         // check if the node exists
         if (zkc.exists(path, false) == null) {
-            /**
-             * create znode
-             */
+            //create znode
             try {
                 // do create the node
                 zkc.create(path, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicy.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicy.java
index 43d727a..efa4120 100644
--- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicy.java
+++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicy.java
@@ -59,8 +59,8 @@ public class ZkIsolatedBookieEnsemblePlacementPolicy extends RackawareEnsemblePl
 
     private ZooKeeperCache bookieMappingCache = null;
 
-    private final List<String> primaryIsolationGroups = new ArrayList<String>();
-    private final List<String> secondaryIsolationGroups = new ArrayList<String>();
+    private final List<String> primaryIsolationGroups = new ArrayList<>();
+    private final List<String> secondaryIsolationGroups = new ArrayList<>();
     private final ObjectMapper jsonMapper = ObjectMapperFactory.create();
 
     public ZkIsolatedBookieEnsemblePlacementPolicy() {
@@ -194,7 +194,7 @@ public class ZkIsolatedBookieEnsemblePlacementPolicy extends RackawareEnsemblePl
                 // if primary-isolated-bookies are not enough then add consider secondary isolated bookie group as well.
                 if (totalAvailableBookiesInPrimaryGroup < ensembleSize) {
                     LOG.info(
-                            "Not found enough available-bookies from primary isolation group {} , checking secondary group",
+                            "Not found enough available-bookies from primary isolation group [{}] , checking secondary group [{}]",
                             primaryIsolationGroups, secondaryIsolationGroups);
                     for (String group : secondaryIsolationGroups) {
                         Map<String, BookieInfo> bookieGroup = allGroupsBookieMapping.get(group);
diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkUtils.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkUtils.java
index b49326b..c40a11c 100644
--- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkUtils.java
+++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkUtils.java
@@ -85,9 +85,13 @@ public final class ZkUtils {
      * @return the parent path or null if no parent exists
      */
     public static String getParentForPath(final String path) {
-        if (path == null) return null;
+        if (path == null) {
+            return null;
+        }
         final int length = path.length();
-        if (length == 0 || (length == 1 && path.charAt(0) == '/')) return null;
+        if (length == 0 || (length == 1 && path.charAt(0) == '/')) {
+            return null;
+        }
 
         int partStartIndex = 0;
         char lastChar = path.charAt(0);
diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java
index 390222b..8929f9b 100644
--- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java
+++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java
@@ -62,9 +62,13 @@ import org.slf4j.LoggerFactory;
  * and invalidated by the ZNode path. For the data cache, ZNode data parsing is done at request time with the given
  * {@link Deserializer} argument.
  *
- * @param <T>
  */
 public abstract class ZooKeeperCache implements Watcher {
+
+    /**
+     *
+     * @param <T> the type of zookeeper content
+     */
     public static interface Deserializer<T> {
         T deserialize(String key, byte[] content) throws Exception;
     }
diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperChildrenCache.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperChildrenCache.java
index ce64480..3cc381f 100644
--- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperChildrenCache.java
+++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperChildrenCache.java
@@ -21,7 +21,6 @@ package org.apache.pulsar.zookeeper;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.pulsar.zookeeper.ZooKeeperCache.CacheUpdater;
diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperSessionWatcher.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperSessionWatcher.java
index fa70c9e..7cf21ea 100644
--- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperSessionWatcher.java
+++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperSessionWatcher.java
@@ -180,6 +180,7 @@ public class ZooKeeperSessionWatcher implements Watcher, StatCallback, Runnable,
         }
     }
 
+    @Override
     public void close() {
         if (scheduler != null) {
             scheduler.shutdownNow();


[pulsar] 02/02: Fixes the recovery not respect to the isolation group settings (#8961)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 897203d909e89d7d31e06f97a3d90a709ef1a2f8
Author: Yong Zhang <zh...@gmail.com>
AuthorDate: Wed Dec 30 18:24:21 2020 +0800

    Fixes the recovery not respect to the isolation group settings (#8961)
    
    ---
    
    *Motivation*
    
    When users configure to use ZkIsolatedBookieEnsemblePlacementPolicy,
    it is hard to configure AutoRecovery to respect to the isolation group
    settings. Because we don't store the isolation group information as
    part of ledger metadata, the framework doesn't have any information
    to use for choosing the bookies.
    
    *Modifications*
    
    - Change the ZkIsolatedBookieEnsemblePlacementPolicy to use the policy
    passed from the custom metadata
    
    (cherry picked from commit bddd030aa1a0b8b3b1767b337880baeb35662ffd)
---
 .../mledger/impl/LedgerMetadataUtils.java          |  22 +++++
 .../mledger/impl/ManagedLedgerFactoryImpl.java     |  39 +-------
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  22 +++++
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java |  21 ++++-
 .../pulsar/broker/ManagedLedgerClientFactory.java  |   2 +-
 .../broker/service/BrokerBookieIsolationTest.java  |   2 +-
 pulsar-common/pom.xml                              |   5 +
 .../data/EnsemblePlacementPolicyConfig.java        |  76 ++++++++++++++++
 .../ZkIsolatedBookieEnsemblePlacementPolicy.java   | 101 +++++++++++++++++----
 ...kIsolatedBookieEnsemblePlacementPolicyTest.java |  65 +++++++++++++
 10 files changed, 298 insertions(+), 57 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/LedgerMetadataUtils.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/LedgerMetadataUtils.java
index 3a245d1..1f59603 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/LedgerMetadataUtils.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/LedgerMetadataUtils.java
@@ -19,6 +19,10 @@
 package org.apache.bookkeeper.mledger.impl;
 
 import com.google.common.collect.ImmutableMap;
+import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
+import org.apache.bookkeeper.common.util.JsonUtil.ParseJsonException;
+import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig;
+
 import java.nio.charset.StandardCharsets;
 import java.util.Map;
 
@@ -99,6 +103,24 @@ public final class LedgerMetadataUtils {
         );
     }
 
+    /**
+     * Build additional metadata for the placement policy config.
+     *
+     * @param className
+     *          the ensemble placement policy classname
+     * @param properties
+     *          the ensemble placement policy properties
+     * @return
+     *          the additional metadata
+     * @throws ParseJsonException
+     *          placement policy configuration encode error
+     */
+    static Map<String, byte[]> buildMetadataForPlacementPolicyConfig(
+        Class<? extends EnsemblePlacementPolicy> className, Map<String, Object> properties) throws ParseJsonException {
+        EnsemblePlacementPolicyConfig config = new EnsemblePlacementPolicyConfig(className, properties);
+        return ImmutableMap.of(EnsemblePlacementPolicyConfig.ENSEMBLE_PLACEMENT_POLICY_CONFIG, config.encode());
+    }
+
     private LedgerMetadataUtils() {}
 
 }
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index f411eff..7efb471 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -21,7 +21,6 @@ package org.apache.bookkeeper.mledger.impl;
 import static com.google.common.base.Preconditions.checkArgument;
 import static org.apache.bookkeeper.mledger.ManagedLedgerException.getManagedLedgerException;
 
-import com.google.common.base.Objects;
 import com.google.common.base.Predicates;
 import com.google.common.collect.Maps;
 
@@ -45,7 +44,6 @@ import java.util.stream.Collectors;
 
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
-import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
 import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.conf.ClientConfiguration;
@@ -81,6 +79,7 @@ import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
 import org.apache.pulsar.common.util.DateFormatter;
+import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig;
 import org.apache.pulsar.metadata.api.MetadataStore;
 import org.apache.pulsar.metadata.api.Stat;
 import org.apache.pulsar.metadata.impl.zookeeper.ZKMetadataStore;
@@ -871,41 +870,5 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
         BookKeeper get(EnsemblePlacementPolicyConfig ensemblePlacementPolicyMetadata);
     }
 
-    public static class EnsemblePlacementPolicyConfig {
-        private final Class<? extends EnsemblePlacementPolicy> policyClass;
-        private final Map<String, Object> properties;
-
-        public EnsemblePlacementPolicyConfig(Class<? extends EnsemblePlacementPolicy> policyClass,
-                Map<String, Object> properties) {
-            super();
-            this.policyClass = policyClass;
-            this.properties = properties;
-        }
-
-        public Class<? extends EnsemblePlacementPolicy> getPolicyClass() {
-            return policyClass;
-        }
-
-        public Map<String, Object> getProperties() {
-            return properties;
-        }
-
-        @Override
-        public int hashCode() {
-            return Objects.hashCode(policyClass != null ? policyClass.getName() : "", properties);
-        }
-
-        @Override
-        public boolean equals(Object obj) {
-            if (obj instanceof EnsemblePlacementPolicyConfig) {
-                EnsemblePlacementPolicyConfig other = (EnsemblePlacementPolicyConfig) obj;
-                return Objects.equal(this.policyClass == null ? null : this.policyClass.getName(),
-                        other.policyClass == null ? null : other.policyClass.getName())
-                        && Objects.equal(this.properties, other.properties);
-            }
-            return false;
-        }
-    }
-
     private static final Logger log = LoggerFactory.getLogger(ManagedLedgerFactoryImpl.class);
 }
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index ba50d2b..4fe5318 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -74,6 +74,7 @@ import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.client.api.ReadHandle;
 import org.apache.bookkeeper.common.util.Backoff;
+import org.apache.bookkeeper.common.util.JsonUtil;
 import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.common.util.Retries;
@@ -249,6 +250,13 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
      */
     final ConcurrentLinkedQueue<OpAddEntry> pendingAddEntries = new ConcurrentLinkedQueue<>();
 
+    /**
+     * This variable is used for testing the tests
+     * {@link ManagedLedgerTest#testManagedLedgerWithPlacementPolicyInCustomMetadata()}
+     */
+    @VisibleForTesting
+    Map<String, byte[]> createdLedgerCustomMetadata;
+
     // //////////////////////////////////////////////////////////////////////
 
     public ManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bookKeeper, MetaStore store,
@@ -3239,6 +3247,20 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
         Map<String, byte[]> finalMetadata = new HashMap<>();
         finalMetadata.putAll(ledgerMetadata);
         finalMetadata.putAll(metadata);
+        if (config.getBookKeeperEnsemblePlacementPolicyClassName() != null
+            && config.getBookKeeperEnsemblePlacementPolicyProperties() != null) {
+            try {
+                finalMetadata.putAll(LedgerMetadataUtils.buildMetadataForPlacementPolicyConfig(
+                    config.getBookKeeperEnsemblePlacementPolicyClassName(),
+                    config.getBookKeeperEnsemblePlacementPolicyProperties()
+                ));
+            } catch (JsonUtil.ParseJsonException e) {
+                log.error("[{}] Serialize the placement configuration failed", name, e);
+                cb.createComplete(Code.UnexpectedConditionException, null, ledgerCreated);
+                return;
+            }
+        }
+        createdLedgerCustomMetadata = finalMetadata;
         log.info("[{}] Creating ledger, metadata: {} - metadata ops timeout : {} seconds",
             name, finalMetadata, config.getMetadataOperationsTimeoutSeconds());
         try {
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index e6388c4..35f7921 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.bookkeeper.mledger.impl;
 
+import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyString;
@@ -69,6 +70,7 @@ import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.client.PulsarMockBookKeeper;
 import org.apache.bookkeeper.client.PulsarMockLedgerHandle;
@@ -2791,6 +2793,23 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
         }
     }
 
+    private abstract class MockedPlacementPolicy implements EnsemblePlacementPolicy{}
+
+    @Test(timeOut = 10000)
+    public void testManagedLedgerWithPlacementPolicyInCustomMetadata() throws Exception {
+        ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
+        managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyClassName(MockedPlacementPolicy.class);
+        managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(Collections.singletonMap("key", "value"));
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("my_test_ledger", managedLedgerConfig);
+        assertFalse(ledger.createdLedgerCustomMetadata.isEmpty());
+        byte[] configData = ledger.createdLedgerCustomMetadata.get(EnsemblePlacementPolicyConfig.ENSEMBLE_PLACEMENT_POLICY_CONFIG);
+        EnsemblePlacementPolicyConfig config = EnsemblePlacementPolicyConfig.decode(configData);
+        assertEquals(config.getPolicyClass().getName(), MockedPlacementPolicy.class.getName());
+        assertEquals(config.getProperties().size(), 1);
+        assertTrue(config.getProperties().containsKey("key"));
+        assertEquals(config.getProperties().get("key"), "value");
+    }
+
     private void setFieldValue(Class clazz, Object classObj, String fieldName, Object fieldValue) throws Exception {
         Field field = clazz.getDeclaredField(fieldName);
         field.setAccessible(true);
@@ -2806,4 +2825,4 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
             Thread.sleep(intSleepTimeInMillis + (intSleepTimeInMillis * i));
         }
     }
-}
\ No newline at end of file
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
index 5f6cdaf..398da08 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
@@ -30,12 +30,12 @@ import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl.BookkeeperFactoryForCustomEnsemblePlacementPolicy;
-import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl.EnsemblePlacementPolicyConfig;
 import org.apache.bookkeeper.stats.NullStatsProvider;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.stats.StatsProvider;
 import org.apache.commons.configuration.Configuration;
 import org.apache.pulsar.broker.stats.prometheus.metrics.PrometheusMetricsProvider;
+import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig;
 import org.apache.zookeeper.ZooKeeper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java
index bb0ae00..9662f3e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java
@@ -40,7 +40,6 @@ import static org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicyImpl.
 import org.apache.bookkeeper.client.api.LedgerMetadata;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.meta.LedgerManager;
-import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl.EnsemblePlacementPolicyConfig;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
 import org.apache.bookkeeper.net.BookieId;
@@ -63,6 +62,7 @@ import org.apache.pulsar.common.policies.data.BookieAffinityGroupData;
 import org.apache.pulsar.common.policies.data.BookieInfo;
 import org.apache.pulsar.common.policies.data.BookiesRackConfiguration;
 import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
diff --git a/pulsar-common/pom.xml b/pulsar-common/pom.xml
index a22f82d..f5ef697 100644
--- a/pulsar-common/pom.xml
+++ b/pulsar-common/pom.xml
@@ -102,6 +102,11 @@
     </dependency>
 
     <dependency>
+      <groupId>org.apache.bookkeeper</groupId>
+      <artifactId>bookkeeper-common</artifactId>
+    </dependency>
+
+    <dependency>
       <groupId>io.airlift</groupId>
       <artifactId>aircompressor</artifactId>
     </dependency>
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/EnsemblePlacementPolicyConfig.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/EnsemblePlacementPolicyConfig.java
new file mode 100644
index 0000000..2c42f14
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/EnsemblePlacementPolicyConfig.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.policies.data;
+
+import com.google.common.base.Objects;
+import org.apache.bookkeeper.common.util.JsonUtil;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.Map;
+
+public class EnsemblePlacementPolicyConfig {
+    public static final String ENSEMBLE_PLACEMENT_POLICY_CONFIG = "EnsemblePlacementPolicyConfig";
+    private final Class policyClass;
+    private final Map<String, Object> properties;
+
+    // Add a default constructor for decode data from bytes to construct this.
+    private EnsemblePlacementPolicyConfig() {
+        this.policyClass = null;
+        this.properties = Collections.emptyMap();
+    }
+
+    public EnsemblePlacementPolicyConfig(Class policyClass, Map<String, Object> properties) {
+        super();
+        this.policyClass = policyClass;
+        this.properties = properties;
+    }
+
+    public Class getPolicyClass() {
+        return policyClass;
+    }
+
+    public Map<String, Object> getProperties() {
+        return properties;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hashCode(policyClass != null ? policyClass.getName() : "", properties);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (obj instanceof EnsemblePlacementPolicyConfig) {
+            EnsemblePlacementPolicyConfig other = (EnsemblePlacementPolicyConfig) obj;
+            return Objects.equal(this.policyClass == null ? null : this.policyClass.getName(),
+                other.policyClass == null ? null : other.policyClass.getName())
+                && Objects.equal(this.properties, other.properties);
+        }
+        return false;
+    }
+
+    public byte[] encode() throws JsonUtil.ParseJsonException {
+        return JsonUtil.toJson(this).getBytes(StandardCharsets.UTF_8);
+    }
+
+    public static EnsemblePlacementPolicyConfig decode(byte[] data) throws JsonUtil.ParseJsonException {
+        return JsonUtil.fromJson(new String(data, StandardCharsets.UTF_8), EnsemblePlacementPolicyConfig.class);
+    }
+}
diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicy.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicy.java
index efa4120..5cca446 100644
--- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicy.java
+++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicy.java
@@ -19,6 +19,8 @@
 package org.apache.pulsar.zookeeper;
 
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -29,18 +31,23 @@ import java.util.concurrent.TimeUnit;
 import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
 import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy;
 import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicyImpl;
+import org.apache.bookkeeper.common.util.JsonUtil;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.feature.FeatureProvider;
 import org.apache.bookkeeper.net.DNSToSwitchMapping;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
 import org.apache.commons.configuration.Configuration;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.MutablePair;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.common.policies.data.BookieInfo;
 import org.apache.pulsar.common.policies.data.BookiesRackConfiguration;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.zookeeper.ZooKeeperCache.Deserializer;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooKeeper;
+import org.inferred.freebuilder.shaded.com.google.common.collect.Sets;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -50,6 +57,8 @@ import io.netty.util.HashedWheelTimer;
 import org.apache.bookkeeper.net.BookieId;
 import org.apache.bookkeeper.proto.BookieAddressResolver;
 
+import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig;
+
 public class ZkIsolatedBookieEnsemblePlacementPolicy extends RackawareEnsemblePlacementPolicy
         implements Deserializer<BookiesRackConfiguration> {
     private static final Logger LOG = LoggerFactory.getLogger(ZkIsolatedBookieEnsemblePlacementPolicy.class);
@@ -59,9 +68,10 @@ public class ZkIsolatedBookieEnsemblePlacementPolicy extends RackawareEnsemblePl
 
     private ZooKeeperCache bookieMappingCache = null;
 
-    private final List<String> primaryIsolationGroups = new ArrayList<>();
-    private final List<String> secondaryIsolationGroups = new ArrayList<>();
     private final ObjectMapper jsonMapper = ObjectMapperFactory.create();
+    // Using a pair to save the isolation groups, the left value is the primary group and the right value is
+    // the secondary group.
+    private ImmutablePair<Set<String>, Set<String>> defaultIsolationGroups;
 
     public ZkIsolatedBookieEnsemblePlacementPolicy() {
         super();
@@ -71,6 +81,8 @@ public class ZkIsolatedBookieEnsemblePlacementPolicy extends RackawareEnsemblePl
     public RackawareEnsemblePlacementPolicyImpl initialize(ClientConfiguration conf,
             Optional<DNSToSwitchMapping> optionalDnsResolver, HashedWheelTimer timer, FeatureProvider featureProvider,
             StatsLogger statsLogger, BookieAddressResolver bookieAddressResolver) {
+        Set<String> primaryIsolationGroups = new HashSet<>();
+        Set<String> secondaryIsolationGroups = new HashSet<>();
         if (conf.getProperty(ISOLATION_BOOKIE_GROUPS) != null) {
             String isolationGroupsString = castToString(conf.getProperty(ISOLATION_BOOKIE_GROUPS));
             if (!isolationGroupsString.isEmpty()) {
@@ -88,10 +100,14 @@ public class ZkIsolatedBookieEnsemblePlacementPolicy extends RackawareEnsemblePl
                 }
             }
         }
+        defaultIsolationGroups = ImmutablePair.of(primaryIsolationGroups, secondaryIsolationGroups);
         return super.initialize(conf, optionalDnsResolver, timer, featureProvider, statsLogger, bookieAddressResolver);
     }
 
-    private String castToString(Object obj) {
+    private static String castToString(Object obj) {
+        if (null == obj) {
+            return "";
+        }
         if (obj instanceof List<?>) {
             List<String> result = new ArrayList<>();
             for (Object o : (List<?>) obj) {
@@ -134,7 +150,9 @@ public class ZkIsolatedBookieEnsemblePlacementPolicy extends RackawareEnsemblePl
     public PlacementResult<List<BookieId>> newEnsemble(int ensembleSize, int writeQuorumSize, int ackQuorumSize,
             Map<String, byte[]> customMetadata, Set<BookieId> excludeBookies)
             throws BKNotEnoughBookiesException {
-        Set<BookieId> blacklistedBookies = getBlacklistedBookies(ensembleSize);
+        Map<String, List<String>> isolationGroup = new HashMap<>();
+        Set<BookieId> blacklistedBookies = getBlacklistedBookiesWithIsolationGroups(
+            ensembleSize, defaultIsolationGroups);
         if (excludeBookies == null) {
             excludeBookies = new HashSet<BookieId>();
         }
@@ -147,7 +165,18 @@ public class ZkIsolatedBookieEnsemblePlacementPolicy extends RackawareEnsemblePl
             Map<String, byte[]> customMetadata, List<BookieId> currentEnsemble,
             BookieId bookieToReplace, Set<BookieId> excludeBookies)
             throws BKNotEnoughBookiesException {
-        Set<BookieId> blacklistedBookies = getBlacklistedBookies(ensembleSize);
+        // parse the ensemble placement policy from the custom metadata, if it is present, we will apply it to
+        // the isolation groups for filtering the bookies.
+        Optional<EnsemblePlacementPolicyConfig> ensemblePlacementPolicyConfig =
+            getEnsemblePlacementPolicyConfig(customMetadata);
+        Set<BookieId> blacklistedBookies;
+        if (ensemblePlacementPolicyConfig.isPresent()) {
+            EnsemblePlacementPolicyConfig config = ensemblePlacementPolicyConfig.get();
+            Pair<Set<String>, Set<String>> groups = getIsolationGroup(config);
+            blacklistedBookies = getBlacklistedBookiesWithIsolationGroups(ensembleSize, groups);
+        } else {
+            blacklistedBookies = getBlacklistedBookiesWithIsolationGroups(ensembleSize, defaultIsolationGroups);
+        }
         if (excludeBookies == null) {
             excludeBookies = new HashSet<BookieId>();
         }
@@ -156,26 +185,66 @@ public class ZkIsolatedBookieEnsemblePlacementPolicy extends RackawareEnsemblePl
                 bookieToReplace, excludeBookies);
     }
 
-    private Set<BookieId> getBlacklistedBookies(int ensembleSize) {
-        Set<BookieId> blacklistedBookies = new HashSet<BookieId>();
+    private static Optional<EnsemblePlacementPolicyConfig> getEnsemblePlacementPolicyConfig(
+        Map<String, byte[]> customMetadata) {
+
+        byte[] ensemblePlacementPolicyConfigData = customMetadata.get(
+            EnsemblePlacementPolicyConfig.ENSEMBLE_PLACEMENT_POLICY_CONFIG);
+        if (ensemblePlacementPolicyConfigData != null) {
+            try {
+                return Optional.ofNullable(EnsemblePlacementPolicyConfig.decode(ensemblePlacementPolicyConfigData));
+            } catch (JsonUtil.ParseJsonException e) {
+                LOG.error("Failed to parse the ensemble placement policy config from the custom metadata", e);
+                return Optional.empty();
+            }
+        }
+        return Optional.empty();
+    }
+
+    private static Pair<Set<String>, Set<String>> getIsolationGroup(EnsemblePlacementPolicyConfig ensemblePlacementPolicyConfig) {
+        MutablePair<Set<String>, Set<String>> pair = new MutablePair<>();
+        String className = ZkIsolatedBookieEnsemblePlacementPolicy.class.getName();
+        if (ensemblePlacementPolicyConfig.getPolicyClass().getName().equals(className)) {
+            Map<String, Object> properties = ensemblePlacementPolicyConfig.getProperties();
+            String primaryIsolationGroupString = castToString(properties.getOrDefault(ISOLATION_BOOKIE_GROUPS, ""));
+            String secondaryIsolationGroupString = castToString(properties.getOrDefault(SECONDARY_ISOLATION_BOOKIE_GROUPS, ""));
+            if (!primaryIsolationGroupString.isEmpty()) {
+                pair.setLeft(Sets.newHashSet(primaryIsolationGroupString.split(",")));
+            }
+            if (!secondaryIsolationGroupString.isEmpty()) {
+                pair.setRight(Sets.newHashSet(secondaryIsolationGroupString.split(",")));
+            }
+        }
+        return pair;
+    }
+
+    private Set<BookieId> getBlacklistedBookiesWithIsolationGroups(int ensembleSize,
+        Pair<Set<String>, Set<String>> isolationGroups) {
+        Set<BookieId> blacklistedBookies = new HashSet<>();
         try {
             if (bookieMappingCache != null) {
                 BookiesRackConfiguration allGroupsBookieMapping = bookieMappingCache
-                        .getData(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, this)
-                        .orElseThrow(() -> new KeeperException.NoNodeException(
-                                ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH));
+                    .getData(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, this)
+                    .orElseThrow(() -> new KeeperException.NoNodeException(
+                        ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH));
                 Set<String> allBookies = allGroupsBookieMapping.keySet();
                 int totalAvailableBookiesInPrimaryGroup = 0;
+                Set<String> primaryIsolationGroup = Collections.emptySet();
+                Set<String> secondaryIsolationGroup = Collections.emptySet();
+                if (isolationGroups != null) {
+                    primaryIsolationGroup = isolationGroups.getLeft();
+                    secondaryIsolationGroup = isolationGroups.getRight();
+                }
                 for (String group : allBookies) {
                     Set<String> bookiesInGroup = allGroupsBookieMapping.get(group).keySet();
-                    if (!primaryIsolationGroups.contains(group)) {
+                    if (!primaryIsolationGroup.contains(group)) {
                         for (String bookieAddress : bookiesInGroup) {
                             blacklistedBookies.add(BookieId.parse(bookieAddress));
                         }
                     } else {
                         for (String groupBookie : bookiesInGroup) {
                             totalAvailableBookiesInPrimaryGroup += knownBookies
-                                    .containsKey(BookieId.parse(groupBookie)) ? 1 : 0;
+                                .containsKey(BookieId.parse(groupBookie)) ? 1 : 0;
                         }
                     }
                 }
@@ -183,7 +252,7 @@ public class ZkIsolatedBookieEnsemblePlacementPolicy extends RackawareEnsemblePl
                 // groups. so, same set of bookies could be overlapped into isolated-group and other default groups. so,
                 // try to remove those overlapped bookies from excluded-bookie list because they are also part of
                 // isolated-group bookies.
-                for (String group : primaryIsolationGroups) {
+                for (String group : primaryIsolationGroup) {
                     Map<String, BookieInfo> bookieGroup = allGroupsBookieMapping.get(group);
                     if (bookieGroup != null && !bookieGroup.isEmpty()) {
                         for (String bookieAddress : bookieGroup.keySet()) {
@@ -194,9 +263,9 @@ public class ZkIsolatedBookieEnsemblePlacementPolicy extends RackawareEnsemblePl
                 // if primary-isolated-bookies are not enough then add consider secondary isolated bookie group as well.
                 if (totalAvailableBookiesInPrimaryGroup < ensembleSize) {
                     LOG.info(
-                            "Not found enough available-bookies from primary isolation group [{}] , checking secondary group [{}]",
-                            primaryIsolationGroups, secondaryIsolationGroups);
-                    for (String group : secondaryIsolationGroups) {
+                        "Not found enough available-bookies from primary isolation group [{}] , checking secondary group [{}]",
+                        primaryIsolationGroup, secondaryIsolationGroup);
+                    for (String group : secondaryIsolationGroup) {
                         Map<String, BookieInfo> bookieGroup = allGroupsBookieMapping.get(group);
                         if (bookieGroup != null && !bookieGroup.isEmpty()) {
                             for (String bookieAddress : bookieGroup.keySet()) {
diff --git a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicyTest.java b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicyTest.java
index e3f1a9c..6ed31fc 100644
--- a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicyTest.java
+++ b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicyTest.java
@@ -28,6 +28,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import io.netty.util.HashedWheelTimer;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -45,6 +46,7 @@ import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
 import org.apache.pulsar.common.policies.data.BookieInfo;
+import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.ZooDefs;
@@ -449,4 +451,67 @@ public class ZkIsolatedBookieEnsemblePlacementPolicyTest {
 
         localZkc.delete(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, -1);
     }
+
+    /**
+     * test case for auto-recovery.
+     * When the auto-recovery trigger from bookkeeper, we need to make sure the placement policy can read from
+     * custom metadata and apply it when choosing the new bookie.
+     */
+    @Test
+    public void testTheIsolationPolicyUsingCustomMetadata() throws Exception {
+        // We configure two groups for the isolation policy, one is the 'primary' group, and the another is
+        // 'secondary' group.
+        // We put bookie1, bookie2, bookie3 into the 'primary' group, and put bookie4 into the 'secondary' group.
+        Map<String, Map<String, BookieInfo>> bookieMapping = new HashMap<>();
+        Map<String, BookieInfo> primaryIsolationBookieGroups = new HashMap<>();
+        String primaryGroupName = "primary";
+        String secondaryGroupName = "secondary";
+        primaryIsolationBookieGroups.put(BOOKIE1, new BookieInfo("rack0", null));
+        primaryIsolationBookieGroups.put(BOOKIE2, new BookieInfo("rack0", null));
+        primaryIsolationBookieGroups.put(BOOKIE3, new BookieInfo("rack1", null));
+
+        Map<String, BookieInfo> secondaryIsolationBookieGroups = new HashMap<>();
+        secondaryIsolationBookieGroups.put(BOOKIE4, new BookieInfo("rack0", null));
+        bookieMapping.put(primaryGroupName, primaryIsolationBookieGroups);
+        bookieMapping.put(secondaryGroupName, secondaryIsolationBookieGroups);
+
+        ZkUtils.createFullPathOptimistic(localZkc, ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH,
+            jsonMapper.writeValueAsBytes(bookieMapping), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+        Thread.sleep(100);
+
+        // prepare a custom placement policy and put it into the custom metadata. The isolation policy should decode
+        // from the custom metadata and apply it to the get black list method.
+        Map<String, Object> placementPolicyProperties = new HashMap<>();
+        placementPolicyProperties.put(
+            ZkIsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS, primaryGroupName);
+        placementPolicyProperties.put(
+            ZkIsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS, secondaryGroupName);
+        EnsemblePlacementPolicyConfig policyConfig = new EnsemblePlacementPolicyConfig(
+            ZkIsolatedBookieEnsemblePlacementPolicy.class,
+            placementPolicyProperties
+        );
+        Map<String, byte[]> customMetadata = new HashMap<>();
+        customMetadata.put(EnsemblePlacementPolicyConfig.ENSEMBLE_PLACEMENT_POLICY_CONFIG, policyConfig.encode());
+
+        // do the test logic
+        ZkIsolatedBookieEnsemblePlacementPolicy isolationPolicy = new ZkIsolatedBookieEnsemblePlacementPolicy();
+        ClientConfiguration bkClientConf = new ClientConfiguration();
+        bkClientConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache("test", localZkc, 30) {
+        });
+        bkClientConf.setProperty(ZkIsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS, primaryGroupName);
+        isolationPolicy.initialize(bkClientConf, Optional.empty(), timer, SettableFeatureProvider.DISABLE_ALL,
+            NullStatsLogger.INSTANCE, BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
+        isolationPolicy.onClusterChanged(writableBookies, readOnlyBookies);
+
+        // we assume we have an ensemble list which is consist with bookie1 and bookie3, and bookie3 is broken.
+        // we want to get a replace bookie from the 'primary' group and that should be bookie2. Because we only have
+        // bookie1, bookie2, and bookie3 in the 'primary' group.
+        BookieId bookie1Id = new BookieSocketAddress(BOOKIE1).toBookieId();
+        BookieId bookie2Id = new BookieSocketAddress(BOOKIE2).toBookieId();
+        BookieId bookie3Id = new BookieSocketAddress(BOOKIE3).toBookieId();
+        BookieId bookieId = isolationPolicy.replaceBookie(2, 1, 1, customMetadata,
+            Arrays.asList(bookie1Id, bookie3Id), bookie3Id, null).getResult();
+        assertEquals(bookieId, bookie2Id);
+    }
 }