You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by kw...@apache.org on 2022/12/02 07:44:30 UTC

[pulsar] branch master updated: [improve][broker] PIP-192: Define new load manager base interfaces (#18084)

This is an automated email from the ASF dual-hosted git repository.

kwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new fd9444c4e2a [improve][broker] PIP-192: Define new load manager base interfaces (#18084)
fd9444c4e2a is described below

commit fd9444c4e2a7bd68b7743a93a45ab711fd144016
Author: Kai Wang <kw...@apache.org>
AuthorDate: Fri Dec 2 15:44:21 2022 +0800

    [improve][broker] PIP-192: Define new load manager base interfaces (#18084)
    
    Master Issue: #16691
    
    ### Motivation
    
    We will start raising PRs to implement PIP-192, https://github.com/apache/pulsar/issues/16691
    
    ### Modifications
    
    The PR adds base classes for the new broker load balance project and does not integrate with the existing load balance logic. This PR should not impact the existing broker load balance behavior.
    
    For the pip-192 project, this PR
    
    * defines the base interface under `org.apache.pulsar.broker.loadbalance.extensible` package.
    * defines this `BrokerRegistry` public interface and its expected behaviors.
    * defines `BrokerFilter` interfaces.
    * defines `LoadDataReporter` interfaces.
    * defines `NamespaceBundleSplitStrategy` interfaces.
    * defines `LoadManagerScheduler` interfaces.
    * defines `NamespaceUnloadStrategy` interfaces.
    * defines `LoadDataStore` interfaces.
    * defines `ExtensibleLoadManager` interfaces.
    * defines `LoadManagerContext` interfaces.
    * defines `BrokerLoadData` and `BrokerLookupData` data classes.
---
 .../loadbalance/extensions/BrokerRegistry.java     |  95 ++++++++++++++++++
 .../extensions/ExtensibleLoadManager.java          |  73 ++++++++++++++
 .../loadbalance/extensions/LoadManagerContext.java |  50 ++++++++++
 .../extensions/data/BrokerLoadData.java            | 111 +++++++++++++++++++++
 .../extensions/data/BrokerLookupData.java          |  73 ++++++++++++++
 .../extensions/data/TopBundlesLoadData.java        |  58 +++++++++++
 .../loadbalance/extensions/data/package-info.java  |  19 ++++
 .../extensions/filter/BrokerFilter.java            |  44 ++++++++
 .../extensions/filter/package-info.java            |  19 ++++
 .../loadbalance/extensions/package-info.java       |  19 ++++
 .../extensions/reporter/LoadDataReporter.java      |  41 ++++++++
 .../extensions/reporter/package-info.java          |  19 ++++
 .../extensions/scheduler/LoadManagerScheduler.java |  44 ++++++++
 .../scheduler/NamespaceBundleSplitStrategy.java    |  39 ++++++++
 .../scheduler/NamespaceUnloadStrategy.java         |  45 +++++++++
 .../extensions/scheduler/package-info.java         |  19 ++++
 .../extensions/store/LoadDataStore.java            |  65 ++++++++++++
 .../extensions/store/LoadDataStoreException.java   |  42 ++++++++
 .../loadbalance/extensions/store/package-info.java |  19 ++++
 .../extensions/data/BrokerLoadDataTest.java        |  51 ++++++++++
 .../extensions/data/BrokerLookupDataTest.java      |  62 ++++++++++++
 .../extensions/data/TopBundlesLoadDataTest.java    |  59 +++++++++++
 .../data/loadbalancer/LocalBrokerData.java         |   2 +-
 23 files changed, 1067 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistry.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistry.java
new file mode 100644
index 00000000000..94ac87f7cf7
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistry.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.metadata.api.NotificationType;
+
+/**
+ * Responsible for registering the current Broker lookup info to
+ * the distributed store (e.g. Zookeeper) for broker discovery.
+ */
+public interface BrokerRegistry extends AutoCloseable {
+
+    /**
+     * Start broker registry.
+     */
+    void start() throws PulsarServerException;
+
+    /**
+     * Register local broker to metadata store.
+     */
+    void register() throws MetadataStoreException;
+
+    /**
+     * Unregister the broker.
+     *
+     * Same as {@link org.apache.pulsar.broker.loadbalance.ModularLoadManager#disableBroker()}
+     */
+    void unregister() throws MetadataStoreException;
+
+    /**
+     * Get the current broker lookup service address.
+     *
+     * @return The service url without the protocol prefix, 'http://'. e.g. broker-xyz:port
+     */
+    String getLookupServiceAddress();
+
+    /**
+     * Async get available brokers.
+     *
+     * @return The brokers service url list.
+     */
+    CompletableFuture<List<String>> getAvailableBrokersAsync();
+
+    /**
+     * Get the broker lookup data.
+     *
+     * @param broker The service url without the protocol prefix, 'http://'. e.g. broker-xyz:port
+     */
+    CompletableFuture<Optional<BrokerLookupData>> lookupAsync(String broker);
+
+    /**
+     * For each the broker lookup data.
+     * The key is lookupServiceAddress{@link BrokerRegistry#getLookupServiceAddress()}
+     */
+    void forEach(BiConsumer<String, BrokerLookupData> action);
+
+    /**
+     * Listen the broker register change.
+     *
+     * @param listener Key is lookup service address{@link BrokerRegistry#getLookupServiceAddress()}
+     *                 Value is notification type.
+     */
+    void listen(BiConsumer<String, NotificationType> listener);
+
+    /**
+     * Close the broker registry.
+     *
+     * @throws PulsarServerException if it fails to close the broker registry.
+     */
+    @Override
+    void close() throws PulsarServerException;
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManager.java
new file mode 100644
index 00000000000..bb66bf731f4
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManager.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions;
+
+import java.io.Closeable;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.broker.namespace.LookupOptions;
+import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.common.naming.NamespaceBundle;
+import org.apache.pulsar.common.naming.ServiceUnitId;
+
+/**
+ * Find the appropriate broker for service unit (e.g. bundle) through different load balancer Implementation.
+ */
+public interface ExtensibleLoadManager extends Closeable {
+
+    /**
+     * Start the extensible load manager.
+     *
+     * 1. Start the broker registry.
+     * 2. Register self to registry.
+     * 3. Start the load data store.
+     * 4. Init the load manager context.
+     * 5. Start load data reporter.
+     * 6. Start the namespace unload scheduler.
+     * 7. Start the namespace split scheduler.
+     * 8. Listen the broker up or down, so we can split immediately.
+     */
+    void start() throws PulsarServerException;
+
+    /**
+     * Initialize this load manager using the given pulsar service.
+     */
+    void initialize(PulsarService pulsar);
+
+    /**
+     * The incoming service unit (e.g. bundle) selects the appropriate broker through strategies.
+     *
+     * @param topic The optional topic, some method won't provide topic var in this param
+     *              (e.g. {@link NamespaceService#internalGetWebServiceUrl(NamespaceBundle, LookupOptions)}),
+     *              So the topic is optional.
+     * @param serviceUnit service unit (e.g. bundle).
+     * @return The broker lookup data.
+     */
+    CompletableFuture<Optional<BrokerLookupData>> assign(Optional<ServiceUnitId> topic, ServiceUnitId serviceUnit);
+
+    /**
+     * Close the load manager.
+     *
+     * @throws PulsarServerException if it fails to stop the load manager.
+     */
+    void close() throws PulsarServerException;
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/LoadManagerContext.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/LoadManagerContext.java
new file mode 100644
index 00000000000..27878d64b85
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/LoadManagerContext.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions;
+
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLoadData;
+import org.apache.pulsar.broker.loadbalance.extensions.data.TopBundlesLoadData;
+import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore;
+
+/**
+ * The filter and load balance context, use for delivering context between filter, scheduler and strategy.
+ */
+public interface LoadManagerContext {
+
+    /**
+     * The broker configuration.
+     */
+    ServiceConfiguration brokerConfiguration();
+
+    /**
+     * Broker load data store, each component use the context to access the load data store.
+     */
+    LoadDataStore<BrokerLoadData> brokerLoadDataStore();
+
+    /**
+     * Top bundle load data store.
+     */
+    LoadDataStore<TopBundlesLoadData> topBundleLoadDataStore();
+
+    /**
+     * The broker register.
+     */
+    BrokerRegistry brokerRegistry();
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLoadData.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLoadData.java
new file mode 100644
index 00000000000..fbb5093939e
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLoadData.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.data;
+
+import lombok.Data;
+import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
+import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
+import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
+
+/**
+ * Contains all the data that is maintained locally on each broker.
+ *
+ * Migrate from {@link org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData}.
+ * And removed the lookup data, see {@link BrokerLookupData}
+ */
+@Data
+public class BrokerLoadData {
+
+    // Most recently available system resource usage.
+    private ResourceUsage cpu;
+    private ResourceUsage memory;
+    private ResourceUsage directMemory;
+
+    private ResourceUsage bandwidthIn;
+    private ResourceUsage bandwidthOut;
+
+    // Message data from the most recent namespace bundle stats.
+    private double msgThroughputIn;
+    private ResourceUsage msgThroughputInUsage;
+    private double msgThroughputOut;
+    private ResourceUsage msgThroughputOutUsage;
+    private double msgRateIn;
+    private double msgRateOut;
+
+    public BrokerLoadData() {
+        cpu = new ResourceUsage();
+        memory = new ResourceUsage();
+        directMemory = new ResourceUsage();
+        bandwidthIn = new ResourceUsage();
+        bandwidthOut = new ResourceUsage();
+        msgThroughputInUsage = new ResourceUsage();
+        msgThroughputOutUsage = new ResourceUsage();
+    }
+
+    /**
+     * Using the system resource usage and bundle stats acquired from the Pulsar client, update this LocalBrokerData.
+     *
+     * @param systemResourceUsage
+     *            System resource usage (cpu, memory, and direct memory).
+     */
+    public void update(final SystemResourceUsage systemResourceUsage) {
+        updateSystemResourceUsage(systemResourceUsage);
+    }
+
+    /**
+     * Using another LocalBrokerData, update this.
+     *
+     * @param other
+     *            LocalBrokerData to update from.
+     */
+    public void update(final BrokerLoadData other) {
+        updateSystemResourceUsage(other.cpu, other.memory, other.directMemory, other.bandwidthIn, other.bandwidthOut);
+    }
+
+    // Set the cpu, memory, and direct memory to that of the new system resource usage data.
+    private void updateSystemResourceUsage(final SystemResourceUsage systemResourceUsage) {
+        updateSystemResourceUsage(systemResourceUsage.cpu, systemResourceUsage.memory, systemResourceUsage.directMemory,
+                systemResourceUsage.bandwidthIn, systemResourceUsage.bandwidthOut);
+    }
+
+    // Update resource usage given each individual usage.
+    private void updateSystemResourceUsage(final ResourceUsage cpu, final ResourceUsage memory,
+                                           final ResourceUsage directMemory, final ResourceUsage bandwidthIn,
+                                           final ResourceUsage bandwidthOut) {
+        this.cpu = cpu;
+        this.memory = memory;
+        this.directMemory = directMemory;
+        this.bandwidthIn = bandwidthIn;
+        this.bandwidthOut = bandwidthOut;
+    }
+
+    public double getMaxResourceUsage() {
+        return LocalBrokerData.max(cpu.percentUsage(), directMemory.percentUsage(), bandwidthIn.percentUsage(),
+                bandwidthOut.percentUsage()) / 100;
+    }
+
+    public double getMaxResourceUsageWithWeight(final double cpuWeight, final double memoryWeight,
+                                                final double directMemoryWeight, final double bandwidthInWeight,
+                                                final double bandwidthOutWeight) {
+        return LocalBrokerData.max(cpu.percentUsage() * cpuWeight, memory.percentUsage() * memoryWeight,
+                directMemory.percentUsage() * directMemoryWeight, bandwidthIn.percentUsage() * bandwidthInWeight,
+                bandwidthOut.percentUsage() * bandwidthOutWeight) / 100;
+    }
+
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLookupData.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLookupData.java
new file mode 100644
index 00000000000..504ae13003e
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLookupData.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.data;
+
+import java.util.Map;
+import java.util.Optional;
+import org.apache.pulsar.broker.lookup.LookupResult;
+import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener;
+import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData;
+
+/**
+ * Defines the information required to broker lookup.
+ */
+public record BrokerLookupData (String webServiceUrl,
+                                String webServiceUrlTls,
+                                String pulsarServiceUrl,
+                                String pulsarServiceUrlTls,
+                                Map<String, AdvertisedListener> advertisedListeners,
+                                Map<String, String> protocols,
+                                boolean persistentTopicsEnabled,
+                                boolean nonPersistentTopicsEnabled,
+                                String brokerVersion) implements ServiceLookupData {
+    @Override
+    public String getWebServiceUrl() {
+        return this.webServiceUrl();
+    }
+
+    @Override
+    public String getWebServiceUrlTls() {
+        return this.webServiceUrlTls();
+    }
+
+    @Override
+    public String getPulsarServiceUrl() {
+        return this.pulsarServiceUrl();
+    }
+
+    @Override
+    public String getPulsarServiceUrlTls() {
+        return this.pulsarServiceUrlTls();
+    }
+
+    @Override
+    public Map<String, String> getProtocols() {
+        return this.protocols();
+    }
+
+    @Override
+    public Optional<String> getProtocol(String protocol) {
+        return Optional.ofNullable(this.protocols().get(protocol));
+    }
+
+    public LookupResult toLookupResult() {
+        return new LookupResult(webServiceUrl, webServiceUrlTls, pulsarServiceUrl, pulsarServiceUrlTls,
+                LookupResult.Type.BrokerUrl, false);
+    }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/data/TopBundlesLoadData.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/data/TopBundlesLoadData.java
new file mode 100644
index 00000000000..9c34b40ff04
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/data/TopBundlesLoadData.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.data;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import lombok.Getter;
+import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
+
+/**
+ * Defines the information of top bundles load data.
+ */
+@Getter
+public class TopBundlesLoadData {
+
+    private final List<BundleLoadData> topBundlesLoadData;
+
+    public record BundleLoadData(String bundleName, NamespaceBundleStats stats) {
+        public BundleLoadData {
+            Objects.requireNonNull(bundleName);
+        }
+    }
+
+    private TopBundlesLoadData(List<BundleLoadData> bundleStats, int topK) {
+        topBundlesLoadData = bundleStats
+                .stream()
+                .sorted((o1, o2) -> o2.stats().compareTo(o1.stats()))
+                .limit(topK)
+                .collect(Collectors.toList());
+    }
+
+    /**
+     * Give full bundle stats, and return the top K bundle stats.
+     *
+     * @param bundleStats full bundle stats.
+     * @param topK Top K bundles.
+     */
+    public static TopBundlesLoadData of(List<BundleLoadData> bundleStats, int topK) {
+        return new TopBundlesLoadData(bundleStats, topK);
+    }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/data/package-info.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/data/package-info.java
new file mode 100644
index 00000000000..ae0dc67b25b
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/data/package-info.java
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.data;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilter.java
new file mode 100644
index 00000000000..4adc6aa1ce4
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilter.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.filter;
+
+import java.util.List;
+import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
+import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
+
+/**
+ * Filter out unqualified Brokers, which are not entered into LoadBalancer for decision-making.
+ */
+public interface BrokerFilter {
+
+    /**
+     * The broker filter name.
+     */
+    String name();
+
+    /**
+     * Filter out unqualified brokers based on implementation.
+     *
+     * @param brokers The full brokers.
+     * @param context The load manager context.
+     * @return Filtered broker list.
+     */
+    List<String> filter(List<String> brokers, LoadManagerContext context) throws BrokerFilterException;
+
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/package-info.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/package-info.java
new file mode 100644
index 00000000000..dc8d77f9823
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/package-info.java
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.filter;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/package-info.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/package-info.java
new file mode 100644
index 00000000000..3318ecaa180
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/package-info.java
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/reporter/LoadDataReporter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/reporter/LoadDataReporter.java
new file mode 100644
index 00000000000..ed620c02985
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/reporter/LoadDataReporter.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.reporter;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * The load data reporter, it publish the load data to load data storage.
+ *
+ */
+public interface LoadDataReporter<T> {
+
+    /**
+     * Generate the load data.
+     *
+     * @return Load data.
+     */
+    T generateLoadData();
+
+    /**
+     * Report the load data to load data store async.
+     */
+    CompletableFuture<Void> reportAsync(boolean force);
+
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/reporter/package-info.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/reporter/package-info.java
new file mode 100644
index 00000000000..03de544247a
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/reporter/package-info.java
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.reporter;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/LoadManagerScheduler.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/LoadManagerScheduler.java
new file mode 100644
index 00000000000..d3b5c72aa50
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/LoadManagerScheduler.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.scheduler;
+
+import java.io.Closeable;
+
+/**
+ * The base interface to schedule execute task in load manager.
+ * Example: The namespace bundle split or unload should schedule running.
+ */
+public interface LoadManagerScheduler extends Closeable {
+
+    /**
+     * Execute the schedule task.
+     */
+    void execute();
+
+    /**
+     * Start the load manager scheduler.
+     */
+    void start();
+
+    /**
+     * Close the load manager scheduler.
+     */
+    @Override
+    void close();
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/NamespaceBundleSplitStrategy.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/NamespaceBundleSplitStrategy.java
new file mode 100644
index 00000000000..88bd7f0b087
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/NamespaceBundleSplitStrategy.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.scheduler;
+
+import java.util.Set;
+import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
+import org.apache.pulsar.broker.loadbalance.extensions.models.Split;
+
+/**
+ * Determines which bundles should be split based on various thresholds.
+ *
+ * Migrate from {@link org.apache.pulsar.broker.loadbalance.BundleSplitStrategy}
+ */
+public interface NamespaceBundleSplitStrategy {
+
+    /**
+     * Determines which bundles, if any, should be split.
+     *
+     * @param context The context used for decisions.
+     * @return A set of the bundles that should be split.
+     */
+    Set<Split> findBundlesToSplit(LoadManagerContext context);
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/NamespaceUnloadStrategy.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/NamespaceUnloadStrategy.java
new file mode 100644
index 00000000000..0942dc9522e
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/NamespaceUnloadStrategy.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.scheduler;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
+import org.apache.pulsar.broker.loadbalance.extensions.models.Unload;
+
+/**
+ * The namespace unload strategy.
+ * Used to determine the criteria for unloading bundles.
+ *
+ * Migrate from {@link org.apache.pulsar.broker.loadbalance.LoadSheddingStrategy}
+ */
+public interface NamespaceUnloadStrategy {
+
+    /**
+     * Recommend that all the returned bundles be unloaded.
+     *
+     * @param context The context used for decisions.
+     * @param recentlyUnloadedBundles
+     *           The recently unloaded bundles.
+     * @return A list of the bundles that should be unloaded.
+     */
+    List<Unload> findBundlesForUnloading(LoadManagerContext context,
+                                         Map<String, Long> recentlyUnloadedBundles);
+
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/package-info.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/package-info.java
new file mode 100644
index 00000000000..2f579b6a2a7
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/package-info.java
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.scheduler;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStore.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStore.java
new file mode 100644
index 00000000000..174e656167d
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStore.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.store;
+
+import java.io.Closeable;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
+
+/**
+ * The load data store interface.
+ *
+ * @param <T> The Load data type.
+ */
+public interface LoadDataStore<T> extends Closeable {
+
+    /**
+     * Async push load data to store.
+     *
+     * @param key The load data key.
+     * @param loadData The load data.
+     */
+    CompletableFuture<Void> pushAsync(String key, T loadData);
+
+    /**
+     * Get load data by key.
+     *
+     * @param key The load data key.
+     */
+    Optional<T> get(String key);
+
+    /**
+     * Performs the given action for each entry in this map until all entries
+     * have been processed or the action throws an exception.
+     *
+     * @param action The action to be performed for each entry
+     */
+    void forEach(BiConsumer<String, T> action);
+
+    /**
+     * Returns a Set view of the mappings contained in this map.
+     *
+     * @return a set view of the mappings contained in this map
+     */
+    Set<Map.Entry<String, T>> entrySet();
+
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStoreException.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStoreException.java
new file mode 100644
index 00000000000..510c490f579
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStoreException.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.store;
+
+import java.io.IOException;
+
+public class LoadDataStoreException extends IOException {
+
+    public LoadDataStoreException(Throwable t) {
+        super(t);
+    }
+
+    public LoadDataStoreException(String msg) {
+        super(msg);
+    }
+
+    public LoadDataStoreException(String msg, Throwable t) {
+        super(msg, t);
+    }
+
+    public static class InvalidPathException extends LoadDataStoreException {
+        public InvalidPathException(String path) {
+            super("Path(" + path + ") is invalid");
+        }
+    }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/package-info.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/package-info.java
new file mode 100644
index 00000000000..70d830f77be
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/package-info.java
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.store;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLoadDataTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLoadDataTest.java
new file mode 100644
index 00000000000..58805af922b
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLoadDataTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.data;
+
+import static org.testng.Assert.assertEquals;
+
+import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
+import org.testng.annotations.Test;
+
+/**
+ * Unit test of {@link BrokerLoadData}.
+ * TODO: Add more units test.
+ */
+@Test(groups = "broker")
+public class BrokerLoadDataTest {
+
+    @Test
+    public void testMaxResourceUsage() {
+        BrokerLoadData data = new BrokerLoadData();
+        data.setCpu(new ResourceUsage(1.0, 100.0));
+        data.setMemory(new ResourceUsage(800.0, 200.0));
+        data.setDirectMemory(new ResourceUsage(2.0, 100.0));
+        data.setBandwidthIn(new ResourceUsage(3.0, 100.0));
+        data.setBandwidthOut(new ResourceUsage(4.0, 100.0));
+
+        double epsilon = 0.00001;
+        double weight = 0.5;
+        // skips memory usage
+        assertEquals(data.getMaxResourceUsage(), 0.04, epsilon);
+
+        assertEquals(
+                data.getMaxResourceUsageWithWeight(
+                        weight, weight, weight, weight, weight), 2.0, epsilon);
+    }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLookupDataTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLookupDataTest.java
new file mode 100644
index 00000000000..b5d2d63a311
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLookupDataTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.data;
+
+import org.apache.pulsar.broker.lookup.LookupResult;
+import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener;
+import org.junit.Assert;
+import org.testng.annotations.Test;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+@Test(groups = "broker")
+public class BrokerLookupDataTest {
+
+    @Test
+    public void testConstructors() {
+        String webServiceUrl = "http://localhost:8080";
+        String webServiceUrlTls = "https://localhoss:8081";
+        String pulsarServiceUrl = "pulsar://localhost:6650";
+        String pulsarServiceUrlTls = "pulsar+ssl://localhost:6651";
+        Map<String, AdvertisedListener> advertisedListeners = new HashMap<>();
+        Map<String, String> protocols = new HashMap<>(){{
+            put("kafka", "9092");
+        }};
+        BrokerLookupData lookupData = new BrokerLookupData(
+                webServiceUrl, webServiceUrlTls, pulsarServiceUrl,
+                pulsarServiceUrlTls, advertisedListeners, protocols, true, true, "3.0");
+        Assert.assertEquals(webServiceUrl, lookupData.webServiceUrl());
+        Assert.assertEquals(webServiceUrlTls, lookupData.webServiceUrlTls());
+        Assert.assertEquals(pulsarServiceUrl, lookupData.pulsarServiceUrl());
+        Assert.assertEquals(pulsarServiceUrlTls, lookupData.pulsarServiceUrlTls());
+        Assert.assertEquals(Optional.of("9092"), lookupData.getProtocol("kafka"));
+        Assert.assertEquals(Optional.empty(), lookupData.getProtocol("echo"));
+        Assert.assertTrue(lookupData.persistentTopicsEnabled());
+        Assert.assertTrue(lookupData.nonPersistentTopicsEnabled());
+        Assert.assertEquals("3.0", lookupData.brokerVersion());
+
+
+        LookupResult lookupResult = lookupData.toLookupResult();
+        Assert.assertEquals(webServiceUrl, lookupResult.getLookupData().getHttpUrl());
+        Assert.assertEquals(webServiceUrlTls, lookupResult.getLookupData().getHttpUrlTls());
+        Assert.assertEquals(pulsarServiceUrl, lookupResult.getLookupData().getBrokerUrl());
+        Assert.assertEquals(pulsarServiceUrlTls, lookupResult.getLookupData().getBrokerUrlTls());
+    }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/data/TopBundlesLoadDataTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/data/TopBundlesLoadDataTest.java
new file mode 100644
index 00000000000..06c232d3219
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/data/TopBundlesLoadDataTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.data;
+
+import static org.testng.Assert.assertEquals;
+
+import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
+import org.testng.annotations.Test;
+import java.util.ArrayList;
+import java.util.List;
+
+@Test(groups = "broker")
+public class TopBundlesLoadDataTest {
+
+    @Test
+    public void testTopBundlesLoadData() {
+        List<TopBundlesLoadData.BundleLoadData> bundleStats = new ArrayList<>();
+        NamespaceBundleStats stats1 = new NamespaceBundleStats();
+        stats1.msgRateIn = 100;
+        bundleStats.add(new TopBundlesLoadData.BundleLoadData("bundle-1", stats1));
+
+        NamespaceBundleStats stats2 = new NamespaceBundleStats();
+        stats2.msgRateIn = 10000;
+        bundleStats.add(new TopBundlesLoadData.BundleLoadData("bundle-2", stats2));
+
+        NamespaceBundleStats stats3 = new NamespaceBundleStats();
+        stats3.msgRateIn = 100000;
+        bundleStats.add(new TopBundlesLoadData.BundleLoadData("bundle-3", stats3));
+
+        NamespaceBundleStats stats4 = new NamespaceBundleStats();
+        stats4.msgRateIn = 10;
+        bundleStats.add(new TopBundlesLoadData.BundleLoadData("bundle-4", stats4));
+
+        TopBundlesLoadData topBundlesLoadData = TopBundlesLoadData.of(bundleStats, 3);
+        var top0 = topBundlesLoadData.getTopBundlesLoadData().get(0);
+        var top1 = topBundlesLoadData.getTopBundlesLoadData().get(1);
+        var top2 = topBundlesLoadData.getTopBundlesLoadData().get(2);
+
+        assertEquals(top0.bundleName(), "bundle-3");
+        assertEquals(top1.bundleName(), "bundle-2");
+        assertEquals(top2.bundleName(), "bundle-1");
+    }
+}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java
index 904b2d14368..030ecc63be5 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java
@@ -258,7 +258,7 @@ public class LocalBrokerData implements LoadManagerReport {
                 bandwidthOut.percentUsage() * bandwidthOutWeight) / 100;
     }
 
-    private static double max(double... args) {
+    public static double max(double... args) {
         double max = Double.NEGATIVE_INFINITY;
 
         for (double d : args) {