You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/12/30 01:38:59 UTC

[GitHub] [pulsar] heesung-sn commented on a diff in pull request #19102: [improve][broker] PIP-192: Implement extensible load manager

heesung-sn commented on code in PR #19102:
URL: https://github.com/apache/pulsar/pull/19102#discussion_r1059106101


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java:
##########
@@ -794,6 +802,12 @@ public void start() throws PulsarServerException {
             }
             brokerService.start();
 
+            if (this.loadManager.get() instanceof ExtensibleLoadManagerWrapper) {

Review Comment:
   Let's define a static function and globally use it from PulsarService, NamespaceService, and other places.
   
   Proposal:
   ```
   ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(ServiceConfiguration conf){
      return ExtensibleLoadManagerImpl.class.getname().equals(conf.getLoadManagerClassName())
   }
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java:
##########
@@ -1085,6 +1129,9 @@ protected void closeLocalMetadataStore() throws Exception {
     }
 
     protected void startLeaderElectionService() {
+        if (this.loadManager.get() instanceof ExtensibleLoadManagerWrapper) {
+            return;

Review Comment:
   Use the global `isLoadManagerExtensionEnabled ` static func, as discussed above.
   
   Also, plz add a log.
   log.info("The load manager extension is enabled. Skipping PulsarService LeaderElectionService.")



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java:
##########
@@ -825,6 +839,11 @@ public void start() throws PulsarServerException {
                 this.webSocketService.setLocalCluster(clusterData);
             }
 
+            // By starting the Load manager service, the broker will also become visible
+            // to the rest of the broker by creating the registration z-node. This needs
+            // to be done only when the broker is fully operative.
+            this.startLoadManagementService();

Review Comment:
   Please add a comment that the load manager service and its service unit state channel need to be initialized first( namespace service depends on load manager)



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java:
##########
@@ -925,6 +939,36 @@ public void start() throws PulsarServerException {
         }
     }
 
+    protected void createNamespaceIfNotExists(String cluster, String publicTenant, NamespaceName ns) throws Exception {
+        ClusterResources cr = this.getPulsarResources().getClusterResources();
+        TenantResources tr = this.getPulsarResources().getTenantResources();
+        NamespaceResources nsr = this.getPulsarResources().getNamespaceResources();
+
+        if (!cr.clusterExists(cluster)) {
+            cr.createCluster(cluster,
+                    ClusterData.builder()
+                            .serviceUrl(this.getWebServiceAddress())
+                            .serviceUrlTls(this.getWebServiceAddressTls())
+                            .brokerServiceUrl(this.getBrokerServiceUrl())
+                            .brokerServiceUrlTls(this.getBrokerServiceUrlTls())
+                            .build());
+        }
+
+        if (!tr.tenantExists(publicTenant)) {
+            tr.createTenant(publicTenant,
+                    TenantInfo.builder()
+                            .adminRoles(Sets.newHashSet(config.getSuperUserRoles()))
+                            .allowedClusters(Sets.newHashSet(cluster))
+                            .build());
+        }
+
+        if (!nsr.namespaceExists(ns)) {
+            Policies nsp = new Policies();

Review Comment:
   Don't we need to create the namespace before?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java:
##########
@@ -58,6 +61,15 @@ public interface LoadManager {
      */
     Optional<ResourceUnit> getLeastLoaded(ServiceUnitId su) throws Exception;
 
+    default CompletableFuture<Optional<LookupResult>> findBrokerServiceUrl(
+            Optional<ServiceUnitId> topic, ServiceUnitId bundle) {
+        return null;

Review Comment:
   Please throw `UnsupportedOperationException` for the default functions. 



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java:
##########
@@ -58,6 +61,15 @@ public interface LoadManager {
      */
     Optional<ResourceUnit> getLeastLoaded(ServiceUnitId su) throws Exception;
 
+    default CompletableFuture<Optional<LookupResult>> findBrokerServiceUrl(
+            Optional<ServiceUnitId> topic, ServiceUnitId bundle) {
+        return null;
+    }
+
+    default CompletableFuture<Boolean> checkOwnershipAsync(Optional<ServiceUnitId> topic, ServiceUnitId bundle) {
+        return null;

Review Comment:
   Please throw `UnsupportedOperationException` for the default functions. 



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java:
##########
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
+import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel;
+import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerFilter;
+import org.apache.pulsar.broker.loadbalance.extensions.strategy.BrokerSelectionStrategy;
+import org.apache.pulsar.common.naming.ServiceUnitId;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+
+@Slf4j
+public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
+
+    private PulsarService pulsar;
+
+    private ServiceConfiguration conf;
+
+    @Getter
+    private BrokerRegistry brokerRegistry;
+
+    private ServiceUnitStateChannel serviceUnitStateChannel;
+
+    @Getter
+    private LoadManagerContext context;
+
+    @Getter
+    private final BrokerSelectionStrategy brokerSelectionStrategy;
+
+    @Getter
+    private List<BrokerFilter> brokerFilterPipeline;
+
+    private final AtomicBoolean started = new AtomicBoolean(false);
+
+    private final ConcurrentOpenHashMap<String, CompletableFuture<Optional<BrokerLookupData>>>
+            lookupRequests = ConcurrentOpenHashMap.<String,
+                    CompletableFuture<Optional<BrokerLookupData>>>newBuilder()
+            .build();
+
+    /**
+     * Life cycle: Constructor -> initialize -> start -> close.
+     */
+    public ExtensibleLoadManagerImpl() {
+        this.brokerFilterPipeline = new ArrayList<>();
+        this.brokerSelectionStrategy = (brokers, bundle, context) -> {
+            if (brokers.isEmpty()) {
+                return Optional.empty();
+            }
+            return Optional.of(brokers.get(ThreadLocalRandom.current().nextInt(brokers.size())));
+        };
+    }
+
+    @Override
+    public void start() throws PulsarServerException {
+        if (this.started.get()) {

Review Comment:
   Though this `start()` call is not expected from multiple threads on the same obj. This atomic get() check can still allow multiple threads to `start()` on this obj.
   We can simply make this `synchronized` because we don't expect start() to be called competitively from many threads.
   Or, if we want to use the atomic boolean, we can use CAS here.
   ```
   if(started.compareAndSet(false, true)){
     try{
       // start logic here
     } catch ( Exception e){
      started.set(false) // to allow retry.
    }
   }
   ```
   
   



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java:
##########
@@ -0,0 +1,222 @@
+/*

Review Comment:
   I assume this class does not need a review here, as it is added in a separate PR, https://github.com/apache/pulsar/pull/18810.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java:
##########
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
+import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel;
+import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerFilter;
+import org.apache.pulsar.broker.loadbalance.extensions.strategy.BrokerSelectionStrategy;
+import org.apache.pulsar.common.naming.ServiceUnitId;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+
+@Slf4j
+public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
+
+    private PulsarService pulsar;
+
+    private ServiceConfiguration conf;
+
+    @Getter
+    private BrokerRegistry brokerRegistry;
+
+    private ServiceUnitStateChannel serviceUnitStateChannel;
+
+    @Getter
+    private LoadManagerContext context;
+
+    @Getter
+    private final BrokerSelectionStrategy brokerSelectionStrategy;
+
+    @Getter
+    private List<BrokerFilter> brokerFilterPipeline;
+
+    private final AtomicBoolean started = new AtomicBoolean(false);
+
+    private final ConcurrentOpenHashMap<String, CompletableFuture<Optional<BrokerLookupData>>>
+            lookupRequests = ConcurrentOpenHashMap.<String,
+                    CompletableFuture<Optional<BrokerLookupData>>>newBuilder()
+            .build();
+
+    /**
+     * Life cycle: Constructor -> initialize -> start -> close.
+     */
+    public ExtensibleLoadManagerImpl() {
+        this.brokerFilterPipeline = new ArrayList<>();
+        this.brokerSelectionStrategy = (brokers, bundle, context) -> {

Review Comment:
   https://github.com/apache/pulsar/pull/18964 has been merged. We could use LeastResourceUsageWithWeight here. 
   
   Also, we need a TODO: comment here to make brokerSelectionStrategy configurable.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java:
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.loadbalance.LoadManager;
+import org.apache.pulsar.broker.loadbalance.ResourceUnit;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.broker.lookup.LookupResult;
+import org.apache.pulsar.common.naming.ServiceUnitId;
+import org.apache.pulsar.common.stats.Metrics;
+import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport;
+
+public class ExtensibleLoadManagerWrapper implements LoadManager {
+
+    private PulsarService pulsar;
+
+    private final ExtensibleLoadManagerImpl loadManager;
+
+    public ExtensibleLoadManagerWrapper(ExtensibleLoadManagerImpl loadManager) {
+        this.loadManager = loadManager;
+    }
+
+    @Override
+    public void start() throws PulsarServerException {
+        loadManager.start();
+    }
+
+    @Override
+    public void initialize(PulsarService pulsar) {
+        loadManager.initialize(pulsar);
+        this.pulsar = pulsar;
+    }
+
+    @Override
+    public boolean isCentralized() {
+        return true;
+    }
+
+    @Override
+    public CompletableFuture<Optional<LookupResult>> findBrokerServiceUrl(
+            Optional<ServiceUnitId> topic, ServiceUnitId bundle) {
+        return loadManager.assign(topic, bundle)
+                .thenApply(lookupData -> lookupData.map(BrokerLookupData::toLookupResult));
+    }
+
+    @Override
+    public CompletableFuture<Boolean> checkOwnershipAsync(Optional<ServiceUnitId> topic, ServiceUnitId bundle) {
+        return loadManager.checkOwnershipAsync(topic, bundle);
+    }
+
+    @Override
+    public void disableBroker() throws Exception {
+        this.loadManager.getBrokerRegistry().unregister();
+    }
+
+    @Override
+    public Set<String> getAvailableBrokers() throws Exception {
+        return getAvailableBrokersAsync()
+                .get(pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);
+    }
+
+    @Override
+    public CompletableFuture<Set<String>> getAvailableBrokersAsync() {
+        return this.loadManager.getBrokerRegistry().getAvailableBrokersAsync().thenApply(HashSet::new);
+    }
+
+    @Override
+    public void stop() throws PulsarServerException {
+        this.loadManager.close();
+    }
+
+
+    @Override
+    public Optional<ResourceUnit> getLeastLoaded(ServiceUnitId su) throws Exception {
+        return Optional.empty();
+    }
+
+    @Override
+    public LoadManagerReport generateLoadReport() {
+        return null;
+    }
+
+    @Override
+    public void setLoadReportForceUpdateFlag() {
+        // No-op.
+    }
+
+    @Override
+    public void writeLoadReportOnZookeeper() throws Exception {
+        // No-op, this operation is not useful, the load data reporter will automatically write.

Review Comment:
   Throw UnsupportedOperationException.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java:
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.loadbalance.LoadManager;
+import org.apache.pulsar.broker.loadbalance.ResourceUnit;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.broker.lookup.LookupResult;
+import org.apache.pulsar.common.naming.ServiceUnitId;
+import org.apache.pulsar.common.stats.Metrics;
+import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport;
+
+public class ExtensibleLoadManagerWrapper implements LoadManager {
+
+    private PulsarService pulsar;
+
+    private final ExtensibleLoadManagerImpl loadManager;
+
+    public ExtensibleLoadManagerWrapper(ExtensibleLoadManagerImpl loadManager) {
+        this.loadManager = loadManager;
+    }
+
+    @Override
+    public void start() throws PulsarServerException {
+        loadManager.start();
+    }
+
+    @Override
+    public void initialize(PulsarService pulsar) {
+        loadManager.initialize(pulsar);
+        this.pulsar = pulsar;
+    }
+
+    @Override
+    public boolean isCentralized() {
+        return true;
+    }
+
+    @Override
+    public CompletableFuture<Optional<LookupResult>> findBrokerServiceUrl(
+            Optional<ServiceUnitId> topic, ServiceUnitId bundle) {
+        return loadManager.assign(topic, bundle)
+                .thenApply(lookupData -> lookupData.map(BrokerLookupData::toLookupResult));
+    }
+
+    @Override
+    public CompletableFuture<Boolean> checkOwnershipAsync(Optional<ServiceUnitId> topic, ServiceUnitId bundle) {
+        return loadManager.checkOwnershipAsync(topic, bundle);
+    }
+
+    @Override
+    public void disableBroker() throws Exception {
+        this.loadManager.getBrokerRegistry().unregister();
+    }
+
+    @Override
+    public Set<String> getAvailableBrokers() throws Exception {
+        return getAvailableBrokersAsync()
+                .get(pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);
+    }
+
+    @Override
+    public CompletableFuture<Set<String>> getAvailableBrokersAsync() {
+        return this.loadManager.getBrokerRegistry().getAvailableBrokersAsync().thenApply(HashSet::new);
+    }
+
+    @Override
+    public void stop() throws PulsarServerException {
+        this.loadManager.close();
+    }
+
+
+    @Override
+    public Optional<ResourceUnit> getLeastLoaded(ServiceUnitId su) throws Exception {
+        return Optional.empty();
+    }
+
+    @Override
+    public LoadManagerReport generateLoadReport() {
+        return null;
+    }
+
+    @Override
+    public void setLoadReportForceUpdateFlag() {
+        // No-op.
+    }
+
+    @Override
+    public void writeLoadReportOnZookeeper() throws Exception {
+        // No-op, this operation is not useful, the load data reporter will automatically write.
+    }
+
+    @Override
+    public void writeResourceQuotasToZooKeeper() throws Exception {
+        // No-op, this operation is not useful, the load data reporter will automatically write.

Review Comment:
   Throw UnsupportedOperationException.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannel.java:
##########
@@ -118,7 +118,7 @@ public interface ServiceUnitStateChannel extends Closeable {
      *                 the future object will time out.
      * Case 3: If none of them, it returns null.

Review Comment:
   With this change, the case 3 returns Optional.empty(). Please update the comment.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java:
##########
@@ -242,11 +242,12 @@ public CompletableFuture<Optional<String>> getChannelOwnerAsync() {
                     // TODO: discard this protocol prefix removal
                     //  by a util func that returns lookupServiceAddress(serviceUrl)
                     if (leader.isPresent()) {
-                        String broker = leader.get().getServiceUrl();
-                        broker = broker.substring(broker.lastIndexOf('/') + 1);
-                        return Optional.of(broker);
+                        return Optional.of(leader.get().getServiceUrl());
                     } else {
-                        return Optional.empty();

Review Comment:
   If we don't return Optional.empty(), we can change the output signature to  `CompletableFuture<String>`.



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java:
##########
@@ -0,0 +1,313 @@
+/*

Review Comment:
   I assume this class does not need a review here, as it is added in a separate PR, https://github.com/apache/pulsar/pull/18810.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java:
##########
@@ -171,11 +172,21 @@ public void initialize() {
         }
     }
 
+    public boolean isExtensibleLoadManager(){

Review Comment:
   Please use the static func, isExtensibleLoadManagerEnabled(), as discussed above.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannel.java:
##########
@@ -132,7 +132,7 @@ public interface ServiceUnitStateChannel extends Closeable {
      * case 2: If the assigned broker does not take the ownership in time,
      *         the future object will time out.
      */
-    CompletableFuture<String> publishAssignEventAsync(String serviceUnit, String broker);
+    CompletableFuture<Optional<String>> publishAssignEventAsync(String serviceUnit, String broker);

Review Comment:
   Why do we need Optional here? I don't think we return Optional.empty here.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistry.java:
##########
@@ -19,6 +19,7 @@
 package org.apache.pulsar.broker.loadbalance.extensions;

Review Comment:
   I assume this class does not need a review here, as it is added in a separate PR, https://github.com/apache/pulsar/pull/18810.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java:
##########
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
+import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel;
+import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerFilter;
+import org.apache.pulsar.broker.loadbalance.extensions.strategy.BrokerSelectionStrategy;
+import org.apache.pulsar.common.naming.ServiceUnitId;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+
+@Slf4j
+public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
+
+    private PulsarService pulsar;
+
+    private ServiceConfiguration conf;
+
+    @Getter
+    private BrokerRegistry brokerRegistry;
+
+    private ServiceUnitStateChannel serviceUnitStateChannel;
+
+    @Getter
+    private LoadManagerContext context;
+
+    @Getter
+    private final BrokerSelectionStrategy brokerSelectionStrategy;
+
+    @Getter
+    private List<BrokerFilter> brokerFilterPipeline;
+
+    private final AtomicBoolean started = new AtomicBoolean(false);
+
+    private final ConcurrentOpenHashMap<String, CompletableFuture<Optional<BrokerLookupData>>>
+            lookupRequests = ConcurrentOpenHashMap.<String,
+                    CompletableFuture<Optional<BrokerLookupData>>>newBuilder()
+            .build();
+
+    /**
+     * Life cycle: Constructor -> initialize -> start -> close.
+     */
+    public ExtensibleLoadManagerImpl() {
+        this.brokerFilterPipeline = new ArrayList<>();
+        this.brokerSelectionStrategy = (brokers, bundle, context) -> {
+            if (brokers.isEmpty()) {
+                return Optional.empty();
+            }
+            return Optional.of(brokers.get(ThreadLocalRandom.current().nextInt(brokers.size())));
+        };
+    }
+
+    @Override
+    public void start() throws PulsarServerException {
+        if (this.started.get()) {
+            return;
+        }
+        this.brokerRegistry = new BrokerRegistryImpl(pulsar);
+        this.serviceUnitStateChannel = new ServiceUnitStateChannelImpl(pulsar);
+        this.brokerRegistry.start();
+        this.serviceUnitStateChannel.start();
+
+        // TODO: Start the load data store.
+
+        this.context = LoadManagerContextImpl.builder()
+                .configuration(conf)
+                .brokerRegistry(brokerRegistry)
+                .brokerLoadDataStore(null)
+                .topBundleLoadDataStore(null).build();
+        // TODO: Start load data reporter.
+
+        // TODO: Start unload scheduler and bundle split scheduler
+
+        this.started.set(true);
+    }
+
+    @Override
+    public void initialize(PulsarService pulsar) {
+        this.pulsar = pulsar;
+        this.conf = pulsar.getConfiguration();
+    }
+
+    @Override
+    public CompletableFuture<Optional<BrokerLookupData>> assign(Optional<ServiceUnitId> topic,
+                                                                ServiceUnitId serviceUnit) {
+
+        final String bundle = serviceUnit.toString();
+
+        CompletableFuture<Optional<BrokerLookupData>> future = lookupRequests.computeIfAbsent(bundle, k -> {
+            final CompletableFuture<Optional<String>> owner;
+            // Assign the bundle to channel owner if is internal topic, to avoid circular references.
+            if (topic.isPresent() && isInternalTopic(topic.get().toString())) {
+                owner = serviceUnitStateChannel.getChannelOwnerAsync();
+            } else {
+                owner = serviceUnitStateChannel.getOwnerAsync(bundle).thenCompose(broker -> {
+                    // If the bundle not assign yet, select and publish assign event to channel.
+                    if (broker.isEmpty()) {
+                        return this.selectAsync(serviceUnit).thenCompose(brokerOpt -> {
+                            if (brokerOpt.isPresent()) {
+                                log.info("Selected new owner broker: {} for bundle: {}.", brokerOpt.get(), bundle);
+                                return serviceUnitStateChannel.publishAssignEventAsync(bundle, brokerOpt.get());
+                            } else {
+                                throw new IllegalStateException(
+                                        "Failed to discover(select) the new owner broker for bundle: " + bundle);
+                            }
+                        });
+                    }
+                    // Already assigned, return it.
+                    return CompletableFuture.completedFuture(broker);
+                });
+            }
+
+            return owner.thenCompose(broker -> {
+                if (broker.isEmpty()) {
+                    String errorMsg = String.format(
+                            "Failed to look up a broker registry:%s for bundle:%s", broker, bundle);
+                    log.error(errorMsg);
+                    throw new IllegalStateException(errorMsg);
+                }
+                return CompletableFuture.completedFuture(broker.get());
+            }).thenCompose(broker -> this.getBrokerRegistry().lookupAsync(broker).thenCompose(brokerLookupData -> {
+                if (brokerLookupData.isEmpty()) {
+                    String errorMsg = String.format(
+                            "Failed to look up a broker registry:%s for bundle:%s", broker, bundle);
+                    log.error(errorMsg);
+                    throw new IllegalStateException(errorMsg);
+                }
+                return CompletableFuture.completedFuture(brokerLookupData);
+            }));
+        });
+        future.whenComplete((r, t) -> lookupRequests.remove(bundle));
+        return future;
+    }
+
+    public CompletableFuture<Optional<String>> selectAsync(ServiceUnitId bundle) {
+        BrokerRegistry brokerRegistry = getBrokerRegistry();
+        return brokerRegistry.getAvailableBrokerLookupDataAsync()
+                .thenCompose(availableBrokers -> {
+                    // TODO: Support isolation policies
+                    LoadManagerContext context = this.getContext();
+
+                    // Filter out brokers that do not meet the rules.
+                    List<BrokerFilter> filterPipeline = getBrokerFilterPipeline();
+                    Map<String, BrokerLookupData> availableBrokerCandidates = new HashMap<>(availableBrokers);
+                    for (final BrokerFilter filter : filterPipeline) {
+                        try {
+                            filter.filter(availableBrokerCandidates, context);
+                        } catch (BrokerFilterException e) {
+                            availableBrokerCandidates = availableBrokers;

Review Comment:
   plz add an error log.
   
   Why don't we throw an exception here or return the empty broker?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java:
##########
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
+import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel;
+import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerFilter;
+import org.apache.pulsar.broker.loadbalance.extensions.strategy.BrokerSelectionStrategy;
+import org.apache.pulsar.common.naming.ServiceUnitId;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+
+@Slf4j
+public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
+
+    private PulsarService pulsar;
+
+    private ServiceConfiguration conf;
+
+    @Getter
+    private BrokerRegistry brokerRegistry;
+
+    private ServiceUnitStateChannel serviceUnitStateChannel;
+
+    @Getter
+    private LoadManagerContext context;
+
+    @Getter
+    private final BrokerSelectionStrategy brokerSelectionStrategy;
+
+    @Getter
+    private List<BrokerFilter> brokerFilterPipeline;
+
+    private final AtomicBoolean started = new AtomicBoolean(false);
+
+    private final ConcurrentOpenHashMap<String, CompletableFuture<Optional<BrokerLookupData>>>
+            lookupRequests = ConcurrentOpenHashMap.<String,
+                    CompletableFuture<Optional<BrokerLookupData>>>newBuilder()
+            .build();
+
+    /**
+     * Life cycle: Constructor -> initialize -> start -> close.
+     */
+    public ExtensibleLoadManagerImpl() {
+        this.brokerFilterPipeline = new ArrayList<>();
+        this.brokerSelectionStrategy = (brokers, bundle, context) -> {
+            if (brokers.isEmpty()) {
+                return Optional.empty();
+            }
+            return Optional.of(brokers.get(ThreadLocalRandom.current().nextInt(brokers.size())));
+        };
+    }
+
+    @Override
+    public void start() throws PulsarServerException {
+        if (this.started.get()) {
+            return;
+        }
+        this.brokerRegistry = new BrokerRegistryImpl(pulsar);
+        this.serviceUnitStateChannel = new ServiceUnitStateChannelImpl(pulsar);
+        this.brokerRegistry.start();
+        this.serviceUnitStateChannel.start();
+
+        // TODO: Start the load data store.
+
+        this.context = LoadManagerContextImpl.builder()
+                .configuration(conf)
+                .brokerRegistry(brokerRegistry)
+                .brokerLoadDataStore(null)
+                .topBundleLoadDataStore(null).build();
+        // TODO: Start load data reporter.
+
+        // TODO: Start unload scheduler and bundle split scheduler
+
+        this.started.set(true);
+    }
+
+    @Override
+    public void initialize(PulsarService pulsar) {
+        this.pulsar = pulsar;
+        this.conf = pulsar.getConfiguration();
+    }
+
+    @Override
+    public CompletableFuture<Optional<BrokerLookupData>> assign(Optional<ServiceUnitId> topic,
+                                                                ServiceUnitId serviceUnit) {
+
+        final String bundle = serviceUnit.toString();
+
+        CompletableFuture<Optional<BrokerLookupData>> future = lookupRequests.computeIfAbsent(bundle, k -> {
+            final CompletableFuture<Optional<String>> owner;
+            // Assign the bundle to channel owner if is internal topic, to avoid circular references.
+            if (topic.isPresent() && isInternalTopic(topic.get().toString())) {
+                owner = serviceUnitStateChannel.getChannelOwnerAsync();
+            } else {
+                owner = serviceUnitStateChannel.getOwnerAsync(bundle).thenCompose(broker -> {
+                    // If the bundle not assign yet, select and publish assign event to channel.
+                    if (broker.isEmpty()) {
+                        return this.selectAsync(serviceUnit).thenCompose(brokerOpt -> {
+                            if (brokerOpt.isPresent()) {
+                                log.info("Selected new owner broker: {} for bundle: {}.", brokerOpt.get(), bundle);
+                                return serviceUnitStateChannel.publishAssignEventAsync(bundle, brokerOpt.get());
+                            } else {
+                                throw new IllegalStateException(
+                                        "Failed to discover(select) the new owner broker for bundle: " + bundle);

Review Comment:
   nit: discover(select)  -> select



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java:
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.loadbalance.LoadManager;
+import org.apache.pulsar.broker.loadbalance.ResourceUnit;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.broker.lookup.LookupResult;
+import org.apache.pulsar.common.naming.ServiceUnitId;
+import org.apache.pulsar.common.stats.Metrics;
+import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport;
+
+public class ExtensibleLoadManagerWrapper implements LoadManager {
+
+    private PulsarService pulsar;
+
+    private final ExtensibleLoadManagerImpl loadManager;
+
+    public ExtensibleLoadManagerWrapper(ExtensibleLoadManagerImpl loadManager) {
+        this.loadManager = loadManager;
+    }
+
+    @Override
+    public void start() throws PulsarServerException {
+        loadManager.start();
+    }
+
+    @Override
+    public void initialize(PulsarService pulsar) {
+        loadManager.initialize(pulsar);
+        this.pulsar = pulsar;
+    }
+
+    @Override
+    public boolean isCentralized() {
+        return true;
+    }
+
+    @Override
+    public CompletableFuture<Optional<LookupResult>> findBrokerServiceUrl(
+            Optional<ServiceUnitId> topic, ServiceUnitId bundle) {
+        return loadManager.assign(topic, bundle)
+                .thenApply(lookupData -> lookupData.map(BrokerLookupData::toLookupResult));
+    }
+
+    @Override
+    public CompletableFuture<Boolean> checkOwnershipAsync(Optional<ServiceUnitId> topic, ServiceUnitId bundle) {
+        return loadManager.checkOwnershipAsync(topic, bundle);
+    }
+
+    @Override
+    public void disableBroker() throws Exception {
+        this.loadManager.getBrokerRegistry().unregister();
+    }
+
+    @Override
+    public Set<String> getAvailableBrokers() throws Exception {
+        return getAvailableBrokersAsync()
+                .get(pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);
+    }
+
+    @Override
+    public CompletableFuture<Set<String>> getAvailableBrokersAsync() {
+        return this.loadManager.getBrokerRegistry().getAvailableBrokersAsync().thenApply(HashSet::new);
+    }
+
+    @Override
+    public void stop() throws PulsarServerException {
+        this.loadManager.close();
+    }
+
+
+    @Override
+    public Optional<ResourceUnit> getLeastLoaded(ServiceUnitId su) throws Exception {
+        return Optional.empty();

Review Comment:
   Throw UnsupportedOperationException.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java:
##########
@@ -143,6 +155,11 @@ static LoadManager create(final PulsarService pulsar) {
                 final LoadManager casted = new ModularLoadManagerWrapper((ModularLoadManager) loadManagerInstance);
                 casted.initialize(pulsar);
                 return casted;
+            } else if (loadManagerInstance instanceof ExtensibleLoadManagerImpl) {

Review Comment:
   nit : can we compare against the interface, `(loadManagerInstance instanceof ExtensibleLoadManager) ` to be consistent with the other cases?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java:
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.loadbalance.LoadManager;
+import org.apache.pulsar.broker.loadbalance.ResourceUnit;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.broker.lookup.LookupResult;
+import org.apache.pulsar.common.naming.ServiceUnitId;
+import org.apache.pulsar.common.stats.Metrics;
+import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport;
+
+public class ExtensibleLoadManagerWrapper implements LoadManager {
+
+    private PulsarService pulsar;
+
+    private final ExtensibleLoadManagerImpl loadManager;
+
+    public ExtensibleLoadManagerWrapper(ExtensibleLoadManagerImpl loadManager) {
+        this.loadManager = loadManager;
+    }
+
+    @Override
+    public void start() throws PulsarServerException {
+        loadManager.start();
+    }
+
+    @Override
+    public void initialize(PulsarService pulsar) {
+        loadManager.initialize(pulsar);
+        this.pulsar = pulsar;
+    }
+
+    @Override
+    public boolean isCentralized() {
+        return true;
+    }
+
+    @Override
+    public CompletableFuture<Optional<LookupResult>> findBrokerServiceUrl(
+            Optional<ServiceUnitId> topic, ServiceUnitId bundle) {
+        return loadManager.assign(topic, bundle)
+                .thenApply(lookupData -> lookupData.map(BrokerLookupData::toLookupResult));
+    }
+
+    @Override
+    public CompletableFuture<Boolean> checkOwnershipAsync(Optional<ServiceUnitId> topic, ServiceUnitId bundle) {
+        return loadManager.checkOwnershipAsync(topic, bundle);
+    }
+
+    @Override
+    public void disableBroker() throws Exception {
+        this.loadManager.getBrokerRegistry().unregister();
+    }
+
+    @Override
+    public Set<String> getAvailableBrokers() throws Exception {
+        return getAvailableBrokersAsync()
+                .get(pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);
+    }
+
+    @Override
+    public CompletableFuture<Set<String>> getAvailableBrokersAsync() {
+        return this.loadManager.getBrokerRegistry().getAvailableBrokersAsync().thenApply(HashSet::new);
+    }
+
+    @Override
+    public void stop() throws PulsarServerException {
+        this.loadManager.close();
+    }
+
+
+    @Override
+    public Optional<ResourceUnit> getLeastLoaded(ServiceUnitId su) throws Exception {
+        return Optional.empty();
+    }
+
+    @Override
+    public LoadManagerReport generateLoadReport() {
+        return null;
+    }
+
+    @Override
+    public void setLoadReportForceUpdateFlag() {
+        // No-op.
+    }
+
+    @Override
+    public void writeLoadReportOnZookeeper() throws Exception {
+        // No-op, this operation is not useful, the load data reporter will automatically write.
+    }
+
+    @Override
+    public void writeResourceQuotasToZooKeeper() throws Exception {
+        // No-op, this operation is not useful, the load data reporter will automatically write.
+    }
+
+    @Override
+    public List<Metrics> getLoadBalancingMetrics() {
+        return null;

Review Comment:
   Add a TODO comment here.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java:
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.loadbalance.LoadManager;
+import org.apache.pulsar.broker.loadbalance.ResourceUnit;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.broker.lookup.LookupResult;
+import org.apache.pulsar.common.naming.ServiceUnitId;
+import org.apache.pulsar.common.stats.Metrics;
+import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport;
+
+public class ExtensibleLoadManagerWrapper implements LoadManager {
+
+    private PulsarService pulsar;
+
+    private final ExtensibleLoadManagerImpl loadManager;
+
+    public ExtensibleLoadManagerWrapper(ExtensibleLoadManagerImpl loadManager) {
+        this.loadManager = loadManager;
+    }
+
+    @Override
+    public void start() throws PulsarServerException {
+        loadManager.start();
+    }
+
+    @Override
+    public void initialize(PulsarService pulsar) {
+        loadManager.initialize(pulsar);
+        this.pulsar = pulsar;
+    }
+
+    @Override
+    public boolean isCentralized() {
+        return true;
+    }
+
+    @Override
+    public CompletableFuture<Optional<LookupResult>> findBrokerServiceUrl(
+            Optional<ServiceUnitId> topic, ServiceUnitId bundle) {
+        return loadManager.assign(topic, bundle)
+                .thenApply(lookupData -> lookupData.map(BrokerLookupData::toLookupResult));
+    }
+
+    @Override
+    public CompletableFuture<Boolean> checkOwnershipAsync(Optional<ServiceUnitId> topic, ServiceUnitId bundle) {
+        return loadManager.checkOwnershipAsync(topic, bundle);
+    }
+
+    @Override
+    public void disableBroker() throws Exception {
+        this.loadManager.getBrokerRegistry().unregister();
+    }
+
+    @Override
+    public Set<String> getAvailableBrokers() throws Exception {
+        return getAvailableBrokersAsync()
+                .get(pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);
+    }
+
+    @Override
+    public CompletableFuture<Set<String>> getAvailableBrokersAsync() {
+        return this.loadManager.getBrokerRegistry().getAvailableBrokersAsync().thenApply(HashSet::new);
+    }
+
+    @Override
+    public void stop() throws PulsarServerException {
+        this.loadManager.close();
+    }
+
+
+    @Override
+    public Optional<ResourceUnit> getLeastLoaded(ServiceUnitId su) throws Exception {
+        return Optional.empty();
+    }
+
+    @Override
+    public LoadManagerReport generateLoadReport() {
+        return null;

Review Comment:
   Throw UnsupportedOperationException.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java:
##########
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
+import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel;
+import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerFilter;
+import org.apache.pulsar.broker.loadbalance.extensions.strategy.BrokerSelectionStrategy;
+import org.apache.pulsar.common.naming.ServiceUnitId;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+
+@Slf4j
+public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
+
+    private PulsarService pulsar;
+
+    private ServiceConfiguration conf;
+
+    @Getter
+    private BrokerRegistry brokerRegistry;
+
+    private ServiceUnitStateChannel serviceUnitStateChannel;
+
+    @Getter
+    private LoadManagerContext context;
+
+    @Getter
+    private final BrokerSelectionStrategy brokerSelectionStrategy;
+
+    @Getter
+    private List<BrokerFilter> brokerFilterPipeline;
+
+    private final AtomicBoolean started = new AtomicBoolean(false);
+
+    private final ConcurrentOpenHashMap<String, CompletableFuture<Optional<BrokerLookupData>>>
+            lookupRequests = ConcurrentOpenHashMap.<String,
+                    CompletableFuture<Optional<BrokerLookupData>>>newBuilder()
+            .build();
+
+    /**
+     * Life cycle: Constructor -> initialize -> start -> close.
+     */
+    public ExtensibleLoadManagerImpl() {
+        this.brokerFilterPipeline = new ArrayList<>();
+        this.brokerSelectionStrategy = (brokers, bundle, context) -> {
+            if (brokers.isEmpty()) {
+                return Optional.empty();
+            }
+            return Optional.of(brokers.get(ThreadLocalRandom.current().nextInt(brokers.size())));
+        };
+    }
+
+    @Override
+    public void start() throws PulsarServerException {
+        if (this.started.get()) {
+            return;
+        }
+        this.brokerRegistry = new BrokerRegistryImpl(pulsar);
+        this.serviceUnitStateChannel = new ServiceUnitStateChannelImpl(pulsar);
+        this.brokerRegistry.start();
+        this.serviceUnitStateChannel.start();
+
+        // TODO: Start the load data store.
+
+        this.context = LoadManagerContextImpl.builder()
+                .configuration(conf)
+                .brokerRegistry(brokerRegistry)
+                .brokerLoadDataStore(null)
+                .topBundleLoadDataStore(null).build();
+        // TODO: Start load data reporter.
+
+        // TODO: Start unload scheduler and bundle split scheduler
+
+        this.started.set(true);
+    }
+
+    @Override
+    public void initialize(PulsarService pulsar) {
+        this.pulsar = pulsar;
+        this.conf = pulsar.getConfiguration();
+    }
+
+    @Override
+    public CompletableFuture<Optional<BrokerLookupData>> assign(Optional<ServiceUnitId> topic,
+                                                                ServiceUnitId serviceUnit) {
+
+        final String bundle = serviceUnit.toString();
+
+        CompletableFuture<Optional<BrokerLookupData>> future = lookupRequests.computeIfAbsent(bundle, k -> {
+            final CompletableFuture<Optional<String>> owner;
+            // Assign the bundle to channel owner if is internal topic, to avoid circular references.
+            if (topic.isPresent() && isInternalTopic(topic.get().toString())) {
+                owner = serviceUnitStateChannel.getChannelOwnerAsync();
+            } else {
+                owner = serviceUnitStateChannel.getOwnerAsync(bundle).thenCompose(broker -> {
+                    // If the bundle not assign yet, select and publish assign event to channel.
+                    if (broker.isEmpty()) {
+                        return this.selectAsync(serviceUnit).thenCompose(brokerOpt -> {
+                            if (brokerOpt.isPresent()) {
+                                log.info("Selected new owner broker: {} for bundle: {}.", brokerOpt.get(), bundle);
+                                return serviceUnitStateChannel.publishAssignEventAsync(bundle, brokerOpt.get());
+                            } else {
+                                throw new IllegalStateException(
+                                        "Failed to discover(select) the new owner broker for bundle: " + bundle);
+                            }
+                        });
+                    }
+                    // Already assigned, return it.
+                    return CompletableFuture.completedFuture(broker);
+                });
+            }
+
+            return owner.thenCompose(broker -> {
+                if (broker.isEmpty()) {
+                    String errorMsg = String.format(
+                            "Failed to look up a broker registry:%s for bundle:%s", broker, bundle);
+                    log.error(errorMsg);
+                    throw new IllegalStateException(errorMsg);
+                }
+                return CompletableFuture.completedFuture(broker.get());
+            }).thenCompose(broker -> this.getBrokerRegistry().lookupAsync(broker).thenCompose(brokerLookupData -> {
+                if (brokerLookupData.isEmpty()) {
+                    String errorMsg = String.format(
+                            "Failed to look up a broker registry:%s for bundle:%s", broker, bundle);
+                    log.error(errorMsg);
+                    throw new IllegalStateException(errorMsg);
+                }
+                return CompletableFuture.completedFuture(brokerLookupData);
+            }));
+        });
+        future.whenComplete((r, t) -> lookupRequests.remove(bundle));
+        return future;
+    }
+
+    public CompletableFuture<Optional<String>> selectAsync(ServiceUnitId bundle) {
+        BrokerRegistry brokerRegistry = getBrokerRegistry();
+        return brokerRegistry.getAvailableBrokerLookupDataAsync()
+                .thenCompose(availableBrokers -> {
+                    // TODO: Support isolation policies
+                    LoadManagerContext context = this.getContext();
+
+                    // Filter out brokers that do not meet the rules.
+                    List<BrokerFilter> filterPipeline = getBrokerFilterPipeline();
+                    Map<String, BrokerLookupData> availableBrokerCandidates = new HashMap<>(availableBrokers);
+                    for (final BrokerFilter filter : filterPipeline) {
+                        try {
+                            filter.filter(availableBrokerCandidates, context);
+                        } catch (BrokerFilterException e) {
+                            availableBrokerCandidates = availableBrokers;
+                        }
+                    }
+                    if (availableBrokerCandidates.isEmpty()) {
+                        return CompletableFuture.completedFuture(Optional.empty());
+                    }
+                    ArrayList<String> candidateBrokers = new ArrayList<>(availableBrokerCandidates.keySet());
+
+                    return CompletableFuture.completedFuture(
+                            getBrokerSelectionStrategy().select(candidateBrokers, bundle, context));
+                });
+    }
+
+    @Override
+    public CompletableFuture<Boolean> checkOwnershipAsync(Optional<ServiceUnitId> topic, ServiceUnitId bundleUnit) {
+        final String bundle = bundleUnit.toString();
+        CompletableFuture<Optional<String>> owner;
+        if (topic.isPresent() && isInternalTopic(topic.get().toString())) {
+            owner = serviceUnitStateChannel.getChannelOwnerAsync();
+        } else {
+            owner = serviceUnitStateChannel.getOwnerAsync(bundle);
+        }
+
+        return owner.thenApply(broker -> brokerRegistry.getBrokerId().equals(broker.orElse(null)));
+    }
+
+    @Override
+    public void close() throws PulsarServerException {

Review Comment:
   Like `start()` this can still allow multiple threads can `close()` on this obj.
   I think we can simply make this synchronized too.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java:
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.loadbalance.LoadManager;
+import org.apache.pulsar.broker.loadbalance.ResourceUnit;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.broker.lookup.LookupResult;
+import org.apache.pulsar.common.naming.ServiceUnitId;
+import org.apache.pulsar.common.stats.Metrics;
+import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport;
+
+public class ExtensibleLoadManagerWrapper implements LoadManager {
+
+    private PulsarService pulsar;
+
+    private final ExtensibleLoadManagerImpl loadManager;
+
+    public ExtensibleLoadManagerWrapper(ExtensibleLoadManagerImpl loadManager) {
+        this.loadManager = loadManager;
+    }
+
+    @Override
+    public void start() throws PulsarServerException {
+        loadManager.start();
+    }
+
+    @Override
+    public void initialize(PulsarService pulsar) {
+        loadManager.initialize(pulsar);
+        this.pulsar = pulsar;
+    }
+
+    @Override
+    public boolean isCentralized() {
+        return true;
+    }
+
+    @Override
+    public CompletableFuture<Optional<LookupResult>> findBrokerServiceUrl(
+            Optional<ServiceUnitId> topic, ServiceUnitId bundle) {
+        return loadManager.assign(topic, bundle)
+                .thenApply(lookupData -> lookupData.map(BrokerLookupData::toLookupResult));
+    }
+
+    @Override
+    public CompletableFuture<Boolean> checkOwnershipAsync(Optional<ServiceUnitId> topic, ServiceUnitId bundle) {
+        return loadManager.checkOwnershipAsync(topic, bundle);
+    }
+
+    @Override
+    public void disableBroker() throws Exception {
+        this.loadManager.getBrokerRegistry().unregister();
+    }
+
+    @Override
+    public Set<String> getAvailableBrokers() throws Exception {
+        return getAvailableBrokersAsync()
+                .get(pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);
+    }
+
+    @Override
+    public CompletableFuture<Set<String>> getAvailableBrokersAsync() {
+        return this.loadManager.getBrokerRegistry().getAvailableBrokersAsync().thenApply(HashSet::new);
+    }
+
+    @Override
+    public void stop() throws PulsarServerException {
+        this.loadManager.close();
+    }
+
+
+    @Override
+    public Optional<ResourceUnit> getLeastLoaded(ServiceUnitId su) throws Exception {
+        return Optional.empty();
+    }
+
+    @Override
+    public LoadManagerReport generateLoadReport() {
+        return null;
+    }
+
+    @Override
+    public void setLoadReportForceUpdateFlag() {
+        // No-op.

Review Comment:
   Throw UnsupportedOperationException.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java:
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.loadbalance.LoadManager;
+import org.apache.pulsar.broker.loadbalance.ResourceUnit;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.broker.lookup.LookupResult;
+import org.apache.pulsar.common.naming.ServiceUnitId;
+import org.apache.pulsar.common.stats.Metrics;
+import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport;
+
+public class ExtensibleLoadManagerWrapper implements LoadManager {
+
+    private PulsarService pulsar;
+
+    private final ExtensibleLoadManagerImpl loadManager;
+
+    public ExtensibleLoadManagerWrapper(ExtensibleLoadManagerImpl loadManager) {
+        this.loadManager = loadManager;
+    }
+
+    @Override
+    public void start() throws PulsarServerException {
+        loadManager.start();
+    }
+
+    @Override
+    public void initialize(PulsarService pulsar) {
+        loadManager.initialize(pulsar);
+        this.pulsar = pulsar;
+    }
+
+    @Override
+    public boolean isCentralized() {
+        return true;
+    }
+
+    @Override
+    public CompletableFuture<Optional<LookupResult>> findBrokerServiceUrl(
+            Optional<ServiceUnitId> topic, ServiceUnitId bundle) {
+        return loadManager.assign(topic, bundle)
+                .thenApply(lookupData -> lookupData.map(BrokerLookupData::toLookupResult));
+    }
+
+    @Override
+    public CompletableFuture<Boolean> checkOwnershipAsync(Optional<ServiceUnitId> topic, ServiceUnitId bundle) {
+        return loadManager.checkOwnershipAsync(topic, bundle);
+    }
+
+    @Override
+    public void disableBroker() throws Exception {
+        this.loadManager.getBrokerRegistry().unregister();
+    }
+
+    @Override
+    public Set<String> getAvailableBrokers() throws Exception {
+        return getAvailableBrokersAsync()
+                .get(pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);
+    }
+
+    @Override
+    public CompletableFuture<Set<String>> getAvailableBrokersAsync() {
+        return this.loadManager.getBrokerRegistry().getAvailableBrokersAsync().thenApply(HashSet::new);
+    }
+
+    @Override
+    public void stop() throws PulsarServerException {
+        this.loadManager.close();
+    }
+
+
+    @Override
+    public Optional<ResourceUnit> getLeastLoaded(ServiceUnitId su) throws Exception {
+        return Optional.empty();
+    }
+
+    @Override
+    public LoadManagerReport generateLoadReport() {
+        return null;
+    }
+
+    @Override
+    public void setLoadReportForceUpdateFlag() {
+        // No-op.
+    }
+
+    @Override
+    public void writeLoadReportOnZookeeper() throws Exception {
+        // No-op, this operation is not useful, the load data reporter will automatically write.
+    }
+
+    @Override
+    public void writeResourceQuotasToZooKeeper() throws Exception {
+        // No-op, this operation is not useful, the load data reporter will automatically write.
+    }
+
+    @Override
+    public List<Metrics> getLoadBalancingMetrics() {
+        return null;
+    }
+
+    @Override
+    public void doLoadShedding() {
+        // No-op.

Review Comment:
   Throw UnsupportedOperationException.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java:
##########
@@ -171,11 +172,21 @@ public void initialize() {
         }
     }
 
+    public boolean isExtensibleLoadManager(){
+        return loadManager.get() instanceof ExtensibleLoadManagerWrapper;
+    }
+
     public CompletableFuture<Optional<LookupResult>> getBrokerServiceUrlAsync(TopicName topic, LookupOptions options) {
         long startTime = System.nanoTime();
 
         CompletableFuture<Optional<LookupResult>> future = getBundleAsync(topic)
-                .thenCompose(bundle -> findBrokerServiceUrl(bundle, options));
+                .thenCompose(bundle -> {
+                    if (isExtensibleLoadManager()) {

Review Comment:
   I was working on these NamespaceService change as per `pluggable new broker load balancer` work.
   
   Do you want to keep this change here in this PR?
   
   I think we need to add NamespaceService unit tests to track this variation.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java:
##########
@@ -242,11 +242,12 @@ public CompletableFuture<Optional<String>> getChannelOwnerAsync() {
                     // TODO: discard this protocol prefix removal
                     //  by a util func that returns lookupServiceAddress(serviceUrl)
                     if (leader.isPresent()) {
-                        String broker = leader.get().getServiceUrl();
-                        broker = broker.substring(broker.lastIndexOf('/') + 1);
-                        return Optional.of(broker);
+                        return Optional.of(leader.get().getServiceUrl());

Review Comment:
   Do we keep the prefix here?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java:
##########
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
+import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel;
+import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerFilter;
+import org.apache.pulsar.broker.loadbalance.extensions.strategy.BrokerSelectionStrategy;
+import org.apache.pulsar.common.naming.ServiceUnitId;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+
+@Slf4j
+public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
+
+    private PulsarService pulsar;
+
+    private ServiceConfiguration conf;
+
+    @Getter
+    private BrokerRegistry brokerRegistry;
+
+    private ServiceUnitStateChannel serviceUnitStateChannel;
+
+    @Getter
+    private LoadManagerContext context;
+
+    @Getter
+    private final BrokerSelectionStrategy brokerSelectionStrategy;
+
+    @Getter
+    private List<BrokerFilter> brokerFilterPipeline;
+
+    private final AtomicBoolean started = new AtomicBoolean(false);
+
+    private final ConcurrentOpenHashMap<String, CompletableFuture<Optional<BrokerLookupData>>>
+            lookupRequests = ConcurrentOpenHashMap.<String,
+                    CompletableFuture<Optional<BrokerLookupData>>>newBuilder()
+            .build();
+
+    /**
+     * Life cycle: Constructor -> initialize -> start -> close.
+     */
+    public ExtensibleLoadManagerImpl() {
+        this.brokerFilterPipeline = new ArrayList<>();
+        this.brokerSelectionStrategy = (brokers, bundle, context) -> {
+            if (brokers.isEmpty()) {
+                return Optional.empty();
+            }
+            return Optional.of(brokers.get(ThreadLocalRandom.current().nextInt(brokers.size())));
+        };
+    }
+
+    @Override
+    public void start() throws PulsarServerException {
+        if (this.started.get()) {
+            return;
+        }
+        this.brokerRegistry = new BrokerRegistryImpl(pulsar);
+        this.serviceUnitStateChannel = new ServiceUnitStateChannelImpl(pulsar);
+        this.brokerRegistry.start();
+        this.serviceUnitStateChannel.start();
+
+        // TODO: Start the load data store.
+
+        this.context = LoadManagerContextImpl.builder()
+                .configuration(conf)
+                .brokerRegistry(brokerRegistry)
+                .brokerLoadDataStore(null)
+                .topBundleLoadDataStore(null).build();
+        // TODO: Start load data reporter.
+
+        // TODO: Start unload scheduler and bundle split scheduler
+
+        this.started.set(true);
+    }
+
+    @Override
+    public void initialize(PulsarService pulsar) {
+        this.pulsar = pulsar;
+        this.conf = pulsar.getConfiguration();
+    }
+
+    @Override
+    public CompletableFuture<Optional<BrokerLookupData>> assign(Optional<ServiceUnitId> topic,
+                                                                ServiceUnitId serviceUnit) {
+
+        final String bundle = serviceUnit.toString();
+
+        CompletableFuture<Optional<BrokerLookupData>> future = lookupRequests.computeIfAbsent(bundle, k -> {
+            final CompletableFuture<Optional<String>> owner;
+            // Assign the bundle to channel owner if is internal topic, to avoid circular references.
+            if (topic.isPresent() && isInternalTopic(topic.get().toString())) {
+                owner = serviceUnitStateChannel.getChannelOwnerAsync();
+            } else {
+                owner = serviceUnitStateChannel.getOwnerAsync(bundle).thenCompose(broker -> {
+                    // If the bundle not assign yet, select and publish assign event to channel.
+                    if (broker.isEmpty()) {
+                        return this.selectAsync(serviceUnit).thenCompose(brokerOpt -> {
+                            if (brokerOpt.isPresent()) {
+                                log.info("Selected new owner broker: {} for bundle: {}.", brokerOpt.get(), bundle);
+                                return serviceUnitStateChannel.publishAssignEventAsync(bundle, brokerOpt.get());
+                            } else {
+                                throw new IllegalStateException(
+                                        "Failed to discover(select) the new owner broker for bundle: " + bundle);
+                            }
+                        });
+                    }
+                    // Already assigned, return it.
+                    return CompletableFuture.completedFuture(broker);
+                });
+            }
+
+            return owner.thenCompose(broker -> {
+                if (broker.isEmpty()) {
+                    String errorMsg = String.format(
+                            "Failed to look up a broker registry:%s for bundle:%s", broker, bundle);
+                    log.error(errorMsg);
+                    throw new IllegalStateException(errorMsg);
+                }
+                return CompletableFuture.completedFuture(broker.get());
+            }).thenCompose(broker -> this.getBrokerRegistry().lookupAsync(broker).thenCompose(brokerLookupData -> {
+                if (brokerLookupData.isEmpty()) {
+                    String errorMsg = String.format(
+                            "Failed to look up a broker registry:%s for bundle:%s", broker, bundle);
+                    log.error(errorMsg);
+                    throw new IllegalStateException(errorMsg);
+                }
+                return CompletableFuture.completedFuture(brokerLookupData);
+            }));
+        });
+        future.whenComplete((r, t) -> lookupRequests.remove(bundle));
+        return future;
+    }
+
+    public CompletableFuture<Optional<String>> selectAsync(ServiceUnitId bundle) {
+        BrokerRegistry brokerRegistry = getBrokerRegistry();
+        return brokerRegistry.getAvailableBrokerLookupDataAsync()
+                .thenCompose(availableBrokers -> {
+                    // TODO: Support isolation policies
+                    LoadManagerContext context = this.getContext();
+
+                    // Filter out brokers that do not meet the rules.
+                    List<BrokerFilter> filterPipeline = getBrokerFilterPipeline();
+                    Map<String, BrokerLookupData> availableBrokerCandidates = new HashMap<>(availableBrokers);
+                    for (final BrokerFilter filter : filterPipeline) {
+                        try {
+                            filter.filter(availableBrokerCandidates, context);
+                        } catch (BrokerFilterException e) {
+                            availableBrokerCandidates = availableBrokers;
+                        }
+                    }
+                    if (availableBrokerCandidates.isEmpty()) {
+                        return CompletableFuture.completedFuture(Optional.empty());
+                    }
+                    ArrayList<String> candidateBrokers = new ArrayList<>(availableBrokerCandidates.keySet());

Review Comment:
   Isn't availableBrokerCandidates already copied?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java:
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.loadbalance.LoadManager;
+import org.apache.pulsar.broker.loadbalance.ResourceUnit;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.broker.lookup.LookupResult;
+import org.apache.pulsar.common.naming.ServiceUnitId;
+import org.apache.pulsar.common.stats.Metrics;
+import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport;
+
+public class ExtensibleLoadManagerWrapper implements LoadManager {
+
+    private PulsarService pulsar;
+
+    private final ExtensibleLoadManagerImpl loadManager;
+
+    public ExtensibleLoadManagerWrapper(ExtensibleLoadManagerImpl loadManager) {
+        this.loadManager = loadManager;
+    }
+
+    @Override
+    public void start() throws PulsarServerException {
+        loadManager.start();
+    }
+
+    @Override
+    public void initialize(PulsarService pulsar) {
+        loadManager.initialize(pulsar);
+        this.pulsar = pulsar;
+    }
+
+    @Override
+    public boolean isCentralized() {
+        return true;
+    }
+
+    @Override
+    public CompletableFuture<Optional<LookupResult>> findBrokerServiceUrl(
+            Optional<ServiceUnitId> topic, ServiceUnitId bundle) {
+        return loadManager.assign(topic, bundle)
+                .thenApply(lookupData -> lookupData.map(BrokerLookupData::toLookupResult));
+    }
+
+    @Override
+    public CompletableFuture<Boolean> checkOwnershipAsync(Optional<ServiceUnitId> topic, ServiceUnitId bundle) {
+        return loadManager.checkOwnershipAsync(topic, bundle);
+    }
+
+    @Override
+    public void disableBroker() throws Exception {
+        this.loadManager.getBrokerRegistry().unregister();
+    }
+
+    @Override
+    public Set<String> getAvailableBrokers() throws Exception {
+        return getAvailableBrokersAsync()
+                .get(pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);
+    }
+
+    @Override
+    public CompletableFuture<Set<String>> getAvailableBrokersAsync() {
+        return this.loadManager.getBrokerRegistry().getAvailableBrokersAsync().thenApply(HashSet::new);
+    }
+
+    @Override
+    public void stop() throws PulsarServerException {
+        this.loadManager.close();
+    }
+
+
+    @Override
+    public Optional<ResourceUnit> getLeastLoaded(ServiceUnitId su) throws Exception {
+        return Optional.empty();
+    }
+
+    @Override
+    public LoadManagerReport generateLoadReport() {
+        return null;
+    }
+
+    @Override
+    public void setLoadReportForceUpdateFlag() {
+        // No-op.
+    }
+
+    @Override
+    public void writeLoadReportOnZookeeper() throws Exception {
+        // No-op, this operation is not useful, the load data reporter will automatically write.
+    }
+
+    @Override
+    public void writeResourceQuotasToZooKeeper() throws Exception {
+        // No-op, this operation is not useful, the load data reporter will automatically write.
+    }
+
+    @Override
+    public List<Metrics> getLoadBalancingMetrics() {
+        return null;
+    }
+
+    @Override
+    public void doLoadShedding() {
+        // No-op.
+    }
+
+    @Override
+    public void doNamespaceBundleSplit() {
+        // No-op.

Review Comment:
   Throw UnsupportedOperationException.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java:
##########
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
+import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel;
+import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerFilter;
+import org.apache.pulsar.broker.loadbalance.extensions.strategy.BrokerSelectionStrategy;
+import org.apache.pulsar.common.naming.ServiceUnitId;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+
+@Slf4j
+public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
+
+    private PulsarService pulsar;
+
+    private ServiceConfiguration conf;
+
+    @Getter
+    private BrokerRegistry brokerRegistry;
+
+    private ServiceUnitStateChannel serviceUnitStateChannel;
+
+    @Getter
+    private LoadManagerContext context;
+
+    @Getter
+    private final BrokerSelectionStrategy brokerSelectionStrategy;
+
+    @Getter
+    private List<BrokerFilter> brokerFilterPipeline;
+
+    private final AtomicBoolean started = new AtomicBoolean(false);
+
+    private final ConcurrentOpenHashMap<String, CompletableFuture<Optional<BrokerLookupData>>>
+            lookupRequests = ConcurrentOpenHashMap.<String,
+                    CompletableFuture<Optional<BrokerLookupData>>>newBuilder()
+            .build();
+
+    /**
+     * Life cycle: Constructor -> initialize -> start -> close.
+     */
+    public ExtensibleLoadManagerImpl() {
+        this.brokerFilterPipeline = new ArrayList<>();
+        this.brokerSelectionStrategy = (brokers, bundle, context) -> {
+            if (brokers.isEmpty()) {
+                return Optional.empty();
+            }
+            return Optional.of(brokers.get(ThreadLocalRandom.current().nextInt(brokers.size())));
+        };
+    }
+
+    @Override
+    public void start() throws PulsarServerException {
+        if (this.started.get()) {
+            return;
+        }
+        this.brokerRegistry = new BrokerRegistryImpl(pulsar);
+        this.serviceUnitStateChannel = new ServiceUnitStateChannelImpl(pulsar);
+        this.brokerRegistry.start();
+        this.serviceUnitStateChannel.start();
+
+        // TODO: Start the load data store.
+
+        this.context = LoadManagerContextImpl.builder()
+                .configuration(conf)
+                .brokerRegistry(brokerRegistry)
+                .brokerLoadDataStore(null)
+                .topBundleLoadDataStore(null).build();
+        // TODO: Start load data reporter.
+
+        // TODO: Start unload scheduler and bundle split scheduler
+
+        this.started.set(true);
+    }
+
+    @Override
+    public void initialize(PulsarService pulsar) {
+        this.pulsar = pulsar;
+        this.conf = pulsar.getConfiguration();
+    }
+
+    @Override
+    public CompletableFuture<Optional<BrokerLookupData>> assign(Optional<ServiceUnitId> topic,
+                                                                ServiceUnitId serviceUnit) {
+
+        final String bundle = serviceUnit.toString();
+
+        CompletableFuture<Optional<BrokerLookupData>> future = lookupRequests.computeIfAbsent(bundle, k -> {
+            final CompletableFuture<Optional<String>> owner;
+            // Assign the bundle to channel owner if is internal topic, to avoid circular references.
+            if (topic.isPresent() && isInternalTopic(topic.get().toString())) {
+                owner = serviceUnitStateChannel.getChannelOwnerAsync();
+            } else {
+                owner = serviceUnitStateChannel.getOwnerAsync(bundle).thenCompose(broker -> {
+                    // If the bundle not assign yet, select and publish assign event to channel.
+                    if (broker.isEmpty()) {
+                        return this.selectAsync(serviceUnit).thenCompose(brokerOpt -> {
+                            if (brokerOpt.isPresent()) {
+                                log.info("Selected new owner broker: {} for bundle: {}.", brokerOpt.get(), bundle);
+                                return serviceUnitStateChannel.publishAssignEventAsync(bundle, brokerOpt.get());
+                            } else {
+                                throw new IllegalStateException(
+                                        "Failed to discover(select) the new owner broker for bundle: " + bundle);
+                            }
+                        });
+                    }
+                    // Already assigned, return it.
+                    return CompletableFuture.completedFuture(broker);
+                });
+            }
+
+            return owner.thenCompose(broker -> {
+                if (broker.isEmpty()) {
+                    String errorMsg = String.format(
+                            "Failed to look up a broker registry:%s for bundle:%s", broker, bundle);
+                    log.error(errorMsg);
+                    throw new IllegalStateException(errorMsg);
+                }
+                return CompletableFuture.completedFuture(broker.get());
+            }).thenCompose(broker -> this.getBrokerRegistry().lookupAsync(broker).thenCompose(brokerLookupData -> {
+                if (brokerLookupData.isEmpty()) {
+                    String errorMsg = String.format(
+                            "Failed to look up a broker registry:%s for bundle:%s", broker, bundle);
+                    log.error(errorMsg);
+                    throw new IllegalStateException(errorMsg);
+                }
+                return CompletableFuture.completedFuture(brokerLookupData);
+            }));
+        });
+        future.whenComplete((r, t) -> lookupRequests.remove(bundle));
+        return future;
+    }
+
+    public CompletableFuture<Optional<String>> selectAsync(ServiceUnitId bundle) {
+        BrokerRegistry brokerRegistry = getBrokerRegistry();
+        return brokerRegistry.getAvailableBrokerLookupDataAsync()
+                .thenCompose(availableBrokers -> {
+                    // TODO: Support isolation policies
+                    LoadManagerContext context = this.getContext();
+
+                    // Filter out brokers that do not meet the rules.
+                    List<BrokerFilter> filterPipeline = getBrokerFilterPipeline();
+                    Map<String, BrokerLookupData> availableBrokerCandidates = new HashMap<>(availableBrokers);

Review Comment:
   Isn't availableBrokers already copied from the cache?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org