You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/10/18 16:58:14 UTC

[GitHub] [pulsar] heesung-sn commented on a diff in pull request #18084: [improve][broker] PIP-192: Define new load manager base interfaces

heesung-sn commented on code in PR #18084:
URL: https://github.com/apache/pulsar/pull/18084#discussion_r998433901


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensible/BrokerRegistry.java:
##########
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensible;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.loadbalance.extensible.data.BrokerLookupData;
+import org.apache.pulsar.metadata.api.NotificationType;
+
+/**
+ * Responsible for registering the current Broker lookup info to
+ * the distributed store (e.g. Zookeeper) for broker discovery.
+ */
+public interface BrokerRegistry {
+
+    /**
+     * Start broker registry.
+     */
+    void start();
+
+    /**
+     * Register local broker to metadata store.
+     */
+    void register();
+
+    /**
+     * Unregister the broker.
+     *
+     * Same as {@link org.apache.pulsar.broker.loadbalance.ModularLoadManager#disableBroker()}
+     */
+    void unregister() throws PulsarServerException;
+
+    /**
+     * Get the current broker lookup service address.
+     */
+    String getLookupServiceAddress();
+
+    /**
+     * Get available brokers.
+     */
+    List<String> getAvailableBrokers();
+
+    /**
+     * Async get available brokers.
+     */
+    CompletableFuture<List<String>> getAvailableBrokersAsync();
+
+    /**
+     * Fetch local-broker data from load-manager broker cache.
+     *
+     * @param broker The load-balancer path.
+     */
+    Optional<BrokerLookupData> lookup(String broker);
+
+    /**
+     * For each the broker lookup data.
+     * The key is lookupServiceAddress
+     */
+    void forEach(BiConsumer<String, BrokerLookupData> action);
+
+    /**
+     * Listen the broker register change.
+     */
+    void listen(BiConsumer<String, NotificationType> listener);
+
+    /**
+     * Close the broker registry.
+     */
+    void close() throws Exception;

Review Comment:
   should we throw `PulsarServerException` instead?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensible/data/BrokerLoadData.java:
##########
@@ -0,0 +1,324 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensible.data;
+
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import lombok.Data;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicDomain;
+import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
+import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
+import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
+import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
+
+/**
+ * Contains all the data that is maintained locally on each broker.
+ *
+ * Migrate from {@link org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData}.
+ * And removed the lookup data, see {@link BrokerLookupData}
+ */
+@Data
+public class BrokerLoadData {
+
+    public static final String TOPIC =
+            TopicDomain.non_persistent
+                    + "://"
+                    + NamespaceName.SYSTEM_NAMESPACE
+                    + "/broker-load-data";
+    private static final double gigaBitToByte = 128 * 1024 * 1024.0;
+
+    // Most recently available system resource usage.
+    private ResourceUsage cpu;
+    private ResourceUsage memory;
+    private ResourceUsage directMemory;
+
+    private ResourceUsage bandwidthIn;
+    private ResourceUsage bandwidthOut;
+
+    // Message data from the most recent namespace bundle stats.
+    private double msgThroughputIn;
+    private ResourceUsage msgThroughputInUsage;
+    private double msgThroughputOut;
+    private ResourceUsage msgThroughputOutUsage;
+    private double msgRateIn;
+    private double msgRateOut;
+
+    // Timestamp of last update.
+    private long lastUpdate;
+
+    // The stats given in the most recent invocation of update.
+    private Map<String, NamespaceBundleStats> lastStats;

Review Comment:
   Please note that when we serialize this object to JSON(when publishing this object to the `BrokerLoadDataStore`). 
   
   We need to ignore the majority of these parameters (e.g. bundles and lastStats) to minimize the payload size. 



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensible/data/BrokerLoadDataTest.java:
##########
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensible.data;
+
+import static org.testng.Assert.assertEquals;
+
+import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker")
+public class BrokerLoadDataTest {

Review Comment:
   I think we need to add more unit tests for the public methods in this class.
   
   Please add `//: TODO` comment not to forget this work.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensible/BrokerRegistry.java:
##########
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensible;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.loadbalance.extensible.data.BrokerLookupData;
+import org.apache.pulsar.metadata.api.NotificationType;
+
+/**
+ * Responsible for registering the current Broker lookup info to
+ * the distributed store (e.g. Zookeeper) for broker discovery.
+ */
+public interface BrokerRegistry {
+
+    /**
+     * Start broker registry.
+     */
+    void start();
+
+    /**
+     * Register local broker to metadata store.
+     */
+    void register();
+
+    /**
+     * Unregister the broker.
+     *
+     * Same as {@link org.apache.pulsar.broker.loadbalance.ModularLoadManager#disableBroker()}
+     */
+    void unregister() throws PulsarServerException;
+
+    /**
+     * Get the current broker lookup service address.
+     */
+    String getLookupServiceAddress();
+
+    /**
+     * Get available brokers.
+     */
+    List<String> getAvailableBrokers();
+
+    /**
+     * Async get available brokers.
+     */
+    CompletableFuture<List<String>> getAvailableBrokersAsync();
+
+    /**
+     * Fetch local-broker data from load-manager broker cache.
+     *
+     * @param broker The load-balancer path.

Review Comment:
   nit: can we specify the broker param format with an example?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensible/BrokerRegistry.java:
##########
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensible;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.loadbalance.extensible.data.BrokerLookupData;
+import org.apache.pulsar.metadata.api.NotificationType;
+
+/**
+ * Responsible for registering the current Broker lookup info to
+ * the distributed store (e.g. Zookeeper) for broker discovery.
+ */
+public interface BrokerRegistry {
+
+    /**
+     * Start broker registry.
+     */
+    void start();
+
+    /**
+     * Register local broker to metadata store.
+     */
+    void register();
+
+    /**
+     * Unregister the broker.
+     *
+     * Same as {@link org.apache.pulsar.broker.loadbalance.ModularLoadManager#disableBroker()}
+     */
+    void unregister() throws PulsarServerException;
+
+    /**
+     * Get the current broker lookup service address.

Review Comment:
   nit: can we explain more about the `lookup service address` here?( format specification and example)



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensible/ExtensibleLoadManager.java:
##########
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensible;
+
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.lookup.LookupResult;
+import org.apache.pulsar.common.naming.ServiceUnitId;
+
+/**
+ * Find the appropriate broker for bundle through different load balancer Implementation.
+ */
+public interface ExtensibleLoadManager {
+
+    /**
+     * Start the extensible load manager.
+     *
+     * 1. Start the broker registry.
+     * 2. Register self to registry.
+     * 3. Start the load data store.
+     * 4. Init the load manager context.
+     * 5. Start load data reporter.
+     * 6. Start the namespace unload scheduler.
+     * 7. Start the namespace split scheduler.
+     * 8. Listen the broker up or down, so we can split immediately.
+     */
+    void start() throws PulsarServerException;
+
+    /**
+     * Initialize this load manager using the given pulsar service.
+     */
+    void initialize(PulsarService pulsar);
+
+    /**
+     * The incoming bundle selects the appropriate broker through strategies.

Review Comment:
   nit: bundle selects -> service unit (e.g bundle) selects.
   
   I think we could use a generic term, `service unit` instead of bundle in our docs and variables.
   Please update this in the other places too.
   



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensible/ExtensibleLoadManager.java:
##########
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensible;
+
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.lookup.LookupResult;
+import org.apache.pulsar.common.naming.ServiceUnitId;
+
+/**
+ * Find the appropriate broker for bundle through different load balancer Implementation.
+ */
+public interface ExtensibleLoadManager {
+
+    /**
+     * Start the extensible load manager.
+     *
+     * 1. Start the broker registry.
+     * 2. Register self to registry.
+     * 3. Start the load data store.
+     * 4. Init the load manager context.
+     * 5. Start load data reporter.
+     * 6. Start the namespace unload scheduler.
+     * 7. Start the namespace split scheduler.
+     * 8. Listen the broker up or down, so we can split immediately.
+     */
+    void start() throws PulsarServerException;
+
+    /**
+     * Initialize this load manager using the given pulsar service.
+     */
+    void initialize(PulsarService pulsar);
+
+    /**
+     * The incoming bundle selects the appropriate broker through strategies.
+     *
+     * @param serviceUnit Bundle.
+     * @return Simple resource.
+     */
+    Optional<String> discover(ServiceUnitId serviceUnit);

Review Comment:
   In the PoC, we only call `discover` inside `assign`. Do we need to expose this discover in this interface?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensible/ExtensibleLoadManager.java:
##########
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensible;
+
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.lookup.LookupResult;
+import org.apache.pulsar.common.naming.ServiceUnitId;
+
+/**
+ * Find the appropriate broker for bundle through different load balancer Implementation.
+ */
+public interface ExtensibleLoadManager {
+
+    /**
+     * Start the extensible load manager.
+     *
+     * 1. Start the broker registry.
+     * 2. Register self to registry.
+     * 3. Start the load data store.
+     * 4. Init the load manager context.
+     * 5. Start load data reporter.
+     * 6. Start the namespace unload scheduler.
+     * 7. Start the namespace split scheduler.
+     * 8. Listen the broker up or down, so we can split immediately.
+     */
+    void start() throws PulsarServerException;
+
+    /**
+     * Initialize this load manager using the given pulsar service.
+     */
+    void initialize(PulsarService pulsar);
+
+    /**
+     * The incoming bundle selects the appropriate broker through strategies.
+     *
+     * @param serviceUnit Bundle.
+     * @return Simple resource.
+     */
+    Optional<String> discover(ServiceUnitId serviceUnit);
+
+
+    /**
+     * The incoming bundle selects the appropriate broker through strategies.
+     *
+     * @param serviceUnit Bundle.
+     * @return Simple resource.
+     */
+    CompletableFuture<Optional<LookupResult>> assign(Optional<ServiceUnitId> topic, ServiceUnitId serviceUnit);

Review Comment:
   nit: Ideally, I think we need to expect the `topic` to be non-optional. I remember This was optional in PoC because some of the callers do not have the topic var in their stack.
   `
   private CompletableFuture<Optional<URL>> internalGetWebServiceUrl(NamespaceBundle bundle, LookupOptions options) {`
   
   



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensible/models/Split.java:
##########
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensible.models;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * Defines the information required for a bundle split.
+ */
+public record Split(String bundle, String sourceBroker, Map<String, Optional<String>> splitBundleToDestBroker) {

Review Comment:
   I see this `Split` and `Unload` records have been added in another PR. Hopefully, this does not conflict. https://github.com/apache/pulsar/pull/18079



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensible/data/TopBundlesLoadData.java:
##########
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensible.data;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import lombok.Getter;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicDomain;
+import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
+
+@Getter
+public class TopBundlesLoadData {
+
+    public static final String TOPIC =
+            TopicDomain.non_persistent
+                    + "://"
+                    + NamespaceName.SYSTEM_NAMESPACE
+                    + "/top-bundle-load-data";
+
+    @JsonProperty("top_bundles_load_data")

Review Comment:
   Let's remove this JsonProperty("top_bundles_load_data") and confirm if this is really required.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensible/store/LoadDataStore.java:
##########
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensible.store;
+
+import java.io.Closeable;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
+
+/**
+ * The load data store interface.
+ *
+ * @param <T> The Load data type.
+ */
+public interface LoadDataStore<T> extends Closeable {
+
+    /**
+     * Push load data to store.
+     *
+     * @param key
+     *           The load data key.
+     * @param loadData
+     *           The load data.
+     */
+    void push(String key, T loadData) throws LoadDataStoreException;
+
+    /**
+     * Async push load data to store.
+     *
+     * @param key
+     *           The load data key.
+     * @param loadData
+     *           The load data.
+     */
+    CompletableFuture<Void> pushAsync(String key, T loadData);

Review Comment:
   I think we mostly need pushAsync() and get().
   
   Can we start with the required ones first and add them later if required?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensible/data/BrokerLoadData.java:
##########
@@ -0,0 +1,324 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensible.data;
+
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import lombok.Data;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicDomain;
+import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
+import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
+import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
+import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
+
+/**
+ * Contains all the data that is maintained locally on each broker.
+ *
+ * Migrate from {@link org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData}.
+ * And removed the lookup data, see {@link BrokerLookupData}
+ */
+@Data
+public class BrokerLoadData {
+
+    public static final String TOPIC =
+            TopicDomain.non_persistent
+                    + "://"
+                    + NamespaceName.SYSTEM_NAMESPACE
+                    + "/broker-load-data";
+    private static final double gigaBitToByte = 128 * 1024 * 1024.0;
+
+    // Most recently available system resource usage.
+    private ResourceUsage cpu;
+    private ResourceUsage memory;
+    private ResourceUsage directMemory;
+
+    private ResourceUsage bandwidthIn;
+    private ResourceUsage bandwidthOut;
+
+    // Message data from the most recent namespace bundle stats.
+    private double msgThroughputIn;
+    private ResourceUsage msgThroughputInUsage;
+    private double msgThroughputOut;
+    private ResourceUsage msgThroughputOutUsage;
+    private double msgRateIn;
+    private double msgRateOut;
+
+    // Timestamp of last update.
+    private long lastUpdate;
+
+    // The stats given in the most recent invocation of update.
+    private Map<String, NamespaceBundleStats> lastStats;
+
+    private int numTopics;
+    private int numBundles;
+    private int numConsumers;
+    private int numProducers;
+
+    // All bundles belonging to this broker.
+    private Set<String> bundles;
+
+    // The bundles gained since the last invocation of update.
+    private Set<String> lastBundleGains;
+
+    // The bundles lost since the last invocation of update.
+    private Set<String> lastBundleLosses;
+
+    public BrokerLoadData() {
+        lastStats = new ConcurrentHashMap<>();
+        lastUpdate = System.currentTimeMillis();
+        cpu = new ResourceUsage();
+        memory = new ResourceUsage();
+        directMemory = new ResourceUsage();
+        bandwidthIn = new ResourceUsage();
+        bandwidthOut = new ResourceUsage();
+        msgThroughputInUsage = new ResourceUsage();
+        msgThroughputOutUsage = new ResourceUsage();
+        bundles = new HashSet<>();
+        lastBundleGains = new HashSet<>();
+        lastBundleLosses = new HashSet<>();
+    }
+
+    /**
+     * Using the system resource usage and bundle stats acquired from the Pulsar client, update this LocalBrokerData.
+     *
+     * @param systemResourceUsage
+     *            System resource usage (cpu, memory, and direct memory).
+     * @param bundleStats
+     *            The bundle stats retrieved from the Pulsar client.
+     */
+    public void update(final SystemResourceUsage systemResourceUsage,
+                       final Map<String, NamespaceBundleStats> bundleStats) {
+        updateSystemResourceUsage(systemResourceUsage);
+        updateBundleData(bundleStats);
+        lastStats = bundleStats;
+    }
+
+    /**
+     * Using another LocalBrokerData, update this.
+     *
+     * @param other
+     *            LocalBrokerData to update from.
+     */
+    public void update(final BrokerLoadData other) {
+        updateSystemResourceUsage(other.cpu, other.memory, other.directMemory, other.bandwidthIn, other.bandwidthOut);
+        updateBundleData(other.lastStats);
+        lastStats = other.lastStats;
+    }
+
+    // Set the cpu, memory, and direct memory to that of the new system resource usage data.
+    private void updateSystemResourceUsage(final SystemResourceUsage systemResourceUsage) {
+        updateSystemResourceUsage(systemResourceUsage.cpu, systemResourceUsage.memory, systemResourceUsage.directMemory,
+                systemResourceUsage.bandwidthIn, systemResourceUsage.bandwidthOut);
+    }
+
+    // Update resource usage given each individual usage.
+    private void updateSystemResourceUsage(final ResourceUsage cpu, final ResourceUsage memory,
+                                           final ResourceUsage directMemory, final ResourceUsage bandwidthIn,
+                                           final ResourceUsage bandwidthOut) {
+        this.cpu = cpu;
+        this.memory = memory;
+        this.directMemory = directMemory;
+        this.bandwidthIn = bandwidthIn;
+        this.bandwidthOut = bandwidthOut;
+    }
+
+    // Aggregate all message, throughput, topic count, bundle count, consumer
+    // count, and producer count across the
+    // given data. Also keep track of bundle gains and losses.
+    private void updateBundleData(final Map<String, NamespaceBundleStats> bundleStats) {
+        msgRateIn = 0;
+        msgRateOut = 0;
+        msgThroughputIn = 0;
+        msgThroughputOut = 0;
+        int totalNumTopics = 0;
+        int totalNumBundles = 0;
+        int totalNumConsumers = 0;
+        int totalNumProducers = 0;
+        final Iterator<String> oldBundleIterator = bundles.iterator();
+        while (oldBundleIterator.hasNext()) {
+            final String bundle = oldBundleIterator.next();
+            if (!bundleStats.containsKey(bundle)) {
+                // If this bundle is in the old bundle set but not the new one,
+                // we lost it.
+                lastBundleLosses.add(bundle);
+                oldBundleIterator.remove();
+            }
+        }
+        for (Map.Entry<String, NamespaceBundleStats> entry : bundleStats.entrySet()) {
+            final String bundle = entry.getKey();
+            final NamespaceBundleStats stats = entry.getValue();
+            if (!bundles.contains(bundle)) {
+                // If this bundle is in the new bundle set but not the old one,
+                // we gained it.
+                lastBundleGains.add(bundle);
+                bundles.add(bundle);
+            }
+            msgThroughputIn += stats.msgThroughputIn;
+            msgThroughputOut += stats.msgThroughputOut;
+            msgRateIn += stats.msgRateIn;
+            msgRateOut += stats.msgRateOut;
+            totalNumTopics += stats.topics;
+            ++totalNumBundles;
+            totalNumConsumers += stats.consumerCount;
+            totalNumProducers += stats.producerCount;
+        }
+        numTopics = totalNumTopics;
+        numBundles = totalNumBundles;
+        numConsumers = totalNumConsumers;
+        numProducers = totalNumProducers;
+    }
+
+    public void cleanDeltas() {
+        lastBundleGains.clear();
+        lastBundleLosses.clear();
+    }
+
+    public double getMaxResourceUsage() {
+        return max(cpu.percentUsage(), directMemory.percentUsage(), bandwidthIn.percentUsage(),
+                bandwidthOut.percentUsage()) / 100;
+    }
+
+    public double getMaxResourceUsageWithWeight(final double cpuWeight, final double memoryWeight,
+                                                final double directMemoryWeight, final double bandwidthInWeight,
+                                                final double bandwidthOutWeight) {
+        return max(cpu.percentUsage() * cpuWeight, memory.percentUsage() * memoryWeight,
+                directMemory.percentUsage() * directMemoryWeight, bandwidthIn.percentUsage() * bandwidthInWeight,
+                bandwidthOut.percentUsage() * bandwidthOutWeight) / 100;
+    }
+
+    public double getMaxResourceUsageWithWeightWithinLimit(final double cpuWeight, final double memoryWeight,
+                                                           final double directMemoryWeight,
+                                                           final double bandwidthInWeight,
+                                                           final double bandwidthOutWeight) {
+        return maxWithinLimit(100.0d,
+                cpu.percentUsage() * cpuWeight, memory.percentUsage() * memoryWeight,
+                directMemory.percentUsage() * directMemoryWeight, bandwidthIn.percentUsage() * bandwidthInWeight,
+                bandwidthOut.percentUsage() * bandwidthOutWeight) / 100;
+    }
+
+    private static double getNicSpeedBytesInSec(ServiceConfiguration conf) {
+        return conf.getLoadBalancerOverrideBrokerNicSpeedGbps().isPresent()
+                ? conf.getLoadBalancerOverrideBrokerNicSpeedGbps().get() * gigaBitToByte : -1.0;
+    }
+
+    synchronized ResourceUsage getMsgThroughputInUsage(double nicSpeedBytesInSec) {
+        if (msgThroughputInUsage.usage != msgThroughputIn) {
+            msgThroughputInUsage = new ResourceUsage(msgThroughputIn, nicSpeedBytesInSec);
+        }
+        return msgThroughputInUsage;
+    }
+
+    synchronized ResourceUsage getMsgThroughputOutUsage(double nicSpeedBytesInSec) {
+        if (msgThroughputOutUsage.usage != msgThroughputOut) {
+            msgThroughputOutUsage = new ResourceUsage(msgThroughputOut, nicSpeedBytesInSec);
+        }
+        return msgThroughputOutUsage;
+    }
+
+    public double getMaxResourceUsageWithExtendedNetworkSignal(ServiceConfiguration conf) {
+
+        double nicSpeedBytesInSec = getNicSpeedBytesInSec(conf);
+        return maxWithinLimit(100.0d,
+                cpu.percentUsage() * conf.getLoadBalancerCPUResourceWeight(),
+                memory.percentUsage() * conf.getLoadBalancerMemoryResourceWeight(),
+                directMemory.percentUsage() * conf.getLoadBalancerDirectMemoryResourceWeight(),
+                bandwidthIn.percentUsage() * conf.getLoadBalancerBandwithInResourceWeight(),
+                bandwidthOut.percentUsage() * conf.getLoadBalancerBandwithOutResourceWeight(),
+                getMsgThroughputInUsage(nicSpeedBytesInSec).percentUsage()
+                        * conf.getLoadBalancerBandwithInResourceWeight(),

Review Comment:
   This change will require updates in BrokerConfiguration. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org