You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/10/18 07:43:31 UTC

[GitHub] [pulsar] Demogorgon314 opened a new pull request, #18084: PIP-192: Define new load manager base interfaces

Demogorgon314 opened a new pull request, #18084:
URL: https://github.com/apache/pulsar/pull/18084

   Master Issue: #16691
   
   ### Motivation
   
   We will start raising PRs to implement PIP-192, https://github.com/apache/pulsar/issues/16691
   
   ### Modifications
   
   The PR adds base classes for the new broker load balance project and does not integrate with the existing load balance logic. This PR should not impact the existing broker load balance behavior.
   
   For the pip-192 project, this PR
   
   * defines the base interface under `org.apache.pulsar.broker.loadbalance.extensible` package.
   * defines this `BrokerRegistry` public interface and its expected behaviors.
   * defines `BrokerFilter` interfaces.
   * defines `LoadDataReporter` interfaces.
   * defines `NamespaceBundleSplitStrategy` interfaces.
   * defines `LoadManagerScheduler` interfaces.
   * defines `NamespaceUnloadStrategy` interfaces.
   * defines `LoadDataStore` interfaces.
   * defines `ExtensibleLoadManager` interfaces.
   * defines `LoadManagerContext` interfaces.
   * defines `BrokerLoadData` and `BrokerLookupData` data classes.
   
   ### Documentation
   
   <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->
   
   - [ ] `doc` <!-- Your PR contains doc changes. Please attach the local preview screenshots (run `sh start.sh` at `pulsar/site2/website`) to your PR description, or else your PR might not get merged. -->
   - [ ] `doc-required` <!-- Your PR changes impact docs and you will update later -->
   - [x] `doc-not-needed` <!-- Your PR changes do not impact docs -->
   - [ ] `doc-complete` <!-- Docs have been already added -->
   
   ### Matching PR in forked repository
   
   PR in forked repository: https://github.com/Demogorgon314/pulsar/pull/4
   
   <!--
   After opening this PR, the build in apache/pulsar will fail and instructions will
   be provided for opening a PR in the PR author's forked repository.
   
   apache/pulsar pull requests should be first tested in your own fork since the 
   apache/pulsar CI based on GitHub Actions has constrained resources and quota.
   GitHub Actions provides separate quota for pull requests that are executed in 
   a forked repository.
   
   The tests will be run in the forked repository until all PR review comments have
   been handled, the tests pass and the PR is approved by a reviewer.
   -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #18084: [improve][broker] PIP-192: Define new load manager base interfaces

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #18084:
URL: https://github.com/apache/pulsar/pull/18084#discussion_r1025982341


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistry.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.metadata.api.NotificationType;
+
+/**
+ * Responsible for registering the current Broker lookup info to
+ * the distributed store (e.g. Zookeeper) for broker discovery.
+ */
+public interface BrokerRegistry extends AutoCloseable {
+
+    /**
+     * Start broker registry.
+     */
+    void start();
+
+    /**
+     * Register local broker to metadata store.
+     */
+    void register();
+
+    /**
+     * Unregister the broker.
+     *
+     * Same as {@link org.apache.pulsar.broker.loadbalance.ModularLoadManager#disableBroker()}
+     */
+    void unregister() throws PulsarServerException;
+
+    /**
+     * Get the current broker lookup service address.
+     *
+     * @return The service url without the protocol prefix, 'http://'. e.g. broker-xyz:port
+     */
+    String getLookupServiceAddress();
+
+    /**
+     * Get available brokers.
+     *
+     * @return The brokers service url list.
+     */
+    List<String> getAvailableBrokers();
+
+    /**
+     * Async get available brokers.
+     *
+     * @return The brokers service url list.
+     */
+    CompletableFuture<List<String>> getAvailableBrokersAsync();
+
+    /**
+     * Fetch local-broker data from load-manager broker cache.

Review Comment:
   +1, I think we should not mention implementation details in API docs unless they are well explained. There is a bad case that `acknowledge` related APIs mentioned "pending ack". But what is "pending ack" is never explained.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #18084: [improve][broker] PIP-192: Define new load manager base interfaces

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #18084:
URL: https://github.com/apache/pulsar/pull/18084#discussion_r1025530690


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistry.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.metadata.api.NotificationType;
+
+/**
+ * Responsible for registering the current Broker lookup info to
+ * the distributed store (e.g. Zookeeper) for broker discovery.
+ */
+public interface BrokerRegistry {
+
+    /**
+     * Start broker registry.
+     */
+    void start();
+
+    /**
+     * Register local broker to metadata store.
+     */
+    void register();
+
+    /**
+     * Unregister the broker.
+     *
+     * Same as {@link org.apache.pulsar.broker.loadbalance.ModularLoadManager#disableBroker()}
+     */
+    void unregister() throws PulsarServerException;
+
+    /**
+     * Get the current broker lookup service address.
+     *
+     * @return The service url without the protocol prefix, 'http://'. e.g. broker-xyz:port
+     */
+    String getLookupServiceAddress();
+
+    /**
+     * Get available brokers.
+     *
+     * @return The brokers service url list.
+     */
+    List<String> getAvailableBrokers();
+
+    /**
+     * Async get available brokers.
+     *
+     * @return The brokers service url list.
+     */
+    CompletableFuture<List<String>> getAvailableBrokersAsync();
+
+    /**
+     * Fetch local-broker data from load-manager broker cache.
+     *
+     * @param broker The service url without the protocol prefix, 'http://'. e.g. broker-xyz:port
+     */
+    Optional<BrokerLookupData> lookup(String broker);
+
+    /**
+     * For each the broker lookup data.
+     * The key is lookupServiceAddress{@link BrokerRegistry#getLookupServiceAddress()}
+     */
+    void forEach(BiConsumer<String, BrokerLookupData> action);
+
+    /**
+     * Listen the broker register change.
+     *
+     * @param listener Key is lookup service address{@link BrokerRegistry#getLookupServiceAddress()}
+     *                 Value is notification type.
+     */
+    void listen(BiConsumer<String, NotificationType> listener);
+
+    /**
+     * Close the broker registry.
+     *
+     * @throws PulsarServerException if it fails to close the broker registry.
+     */
+    void close() throws PulsarServerException;

Review Comment:
   Should we just extends `AutoCloseable` for all interfaces that have `close` method?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistry.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.metadata.api.NotificationType;
+
+/**
+ * Responsible for registering the current Broker lookup info to
+ * the distributed store (e.g. Zookeeper) for broker discovery.
+ */
+public interface BrokerRegistry {
+
+    /**
+     * Start broker registry.
+     */
+    void start();
+
+    /**
+     * Register local broker to metadata store.
+     */
+    void register();
+
+    /**
+     * Unregister the broker.
+     *
+     * Same as {@link org.apache.pulsar.broker.loadbalance.ModularLoadManager#disableBroker()}
+     */
+    void unregister() throws PulsarServerException;

Review Comment:
   Is there any reason to follow the design of the existing APIs? I see the `PulsarServerException` will be thrown if ZK failed to disable the broker in `ModularLoadManager#disableBroker`. But the design is wrong, we should throw a more specific exception like `MetadataStoreException`.
   
   BTW, why is there an exception in `unregister`'s signature but no exception in `register`'s signature?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] shibd commented on a diff in pull request #18084: [improve][broker] PIP-192: Define new load manager base interfaces

Posted by GitBox <gi...@apache.org>.
shibd commented on code in PR #18084:
URL: https://github.com/apache/pulsar/pull/18084#discussion_r1025946257


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistry.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.metadata.api.NotificationType;
+
+/**
+ * Responsible for registering the current Broker lookup info to
+ * the distributed store (e.g. Zookeeper) for broker discovery.
+ */
+public interface BrokerRegistry {

Review Comment:
   https://github.com/apache/pulsar/pull/18084#discussion_r1025919136



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Demogorgon314 commented on a diff in pull request #18084: [improve][broker] PIP-192: Define new load manager base interfaces

Posted by GitBox <gi...@apache.org>.
Demogorgon314 commented on code in PR #18084:
URL: https://github.com/apache/pulsar/pull/18084#discussion_r999454823


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensible/data/BrokerLoadData.java:
##########
@@ -0,0 +1,324 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensible.data;
+
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import lombok.Data;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicDomain;
+import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
+import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
+import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
+import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
+
+/**
+ * Contains all the data that is maintained locally on each broker.
+ *
+ * Migrate from {@link org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData}.
+ * And removed the lookup data, see {@link BrokerLookupData}
+ */
+@Data
+public class BrokerLoadData {
+
+    public static final String TOPIC =
+            TopicDomain.non_persistent
+                    + "://"
+                    + NamespaceName.SYSTEM_NAMESPACE
+                    + "/broker-load-data";
+    private static final double gigaBitToByte = 128 * 1024 * 1024.0;
+
+    // Most recently available system resource usage.
+    private ResourceUsage cpu;
+    private ResourceUsage memory;
+    private ResourceUsage directMemory;
+
+    private ResourceUsage bandwidthIn;
+    private ResourceUsage bandwidthOut;
+
+    // Message data from the most recent namespace bundle stats.
+    private double msgThroughputIn;
+    private ResourceUsage msgThroughputInUsage;
+    private double msgThroughputOut;
+    private ResourceUsage msgThroughputOutUsage;
+    private double msgRateIn;
+    private double msgRateOut;
+
+    // Timestamp of last update.
+    private long lastUpdate;
+
+    // The stats given in the most recent invocation of update.
+    private Map<String, NamespaceBundleStats> lastStats;
+
+    private int numTopics;
+    private int numBundles;
+    private int numConsumers;
+    private int numProducers;
+
+    // All bundles belonging to this broker.
+    private Set<String> bundles;
+
+    // The bundles gained since the last invocation of update.
+    private Set<String> lastBundleGains;
+
+    // The bundles lost since the last invocation of update.
+    private Set<String> lastBundleLosses;
+
+    public BrokerLoadData() {
+        lastStats = new ConcurrentHashMap<>();
+        lastUpdate = System.currentTimeMillis();
+        cpu = new ResourceUsage();
+        memory = new ResourceUsage();
+        directMemory = new ResourceUsage();
+        bandwidthIn = new ResourceUsage();
+        bandwidthOut = new ResourceUsage();
+        msgThroughputInUsage = new ResourceUsage();
+        msgThroughputOutUsage = new ResourceUsage();
+        bundles = new HashSet<>();
+        lastBundleGains = new HashSet<>();
+        lastBundleLosses = new HashSet<>();
+    }
+
+    /**
+     * Using the system resource usage and bundle stats acquired from the Pulsar client, update this LocalBrokerData.
+     *
+     * @param systemResourceUsage
+     *            System resource usage (cpu, memory, and direct memory).
+     * @param bundleStats
+     *            The bundle stats retrieved from the Pulsar client.
+     */
+    public void update(final SystemResourceUsage systemResourceUsage,
+                       final Map<String, NamespaceBundleStats> bundleStats) {
+        updateSystemResourceUsage(systemResourceUsage);
+        updateBundleData(bundleStats);
+        lastStats = bundleStats;
+    }
+
+    /**
+     * Using another LocalBrokerData, update this.
+     *
+     * @param other
+     *            LocalBrokerData to update from.
+     */
+    public void update(final BrokerLoadData other) {
+        updateSystemResourceUsage(other.cpu, other.memory, other.directMemory, other.bandwidthIn, other.bandwidthOut);
+        updateBundleData(other.lastStats);
+        lastStats = other.lastStats;
+    }
+
+    // Set the cpu, memory, and direct memory to that of the new system resource usage data.
+    private void updateSystemResourceUsage(final SystemResourceUsage systemResourceUsage) {
+        updateSystemResourceUsage(systemResourceUsage.cpu, systemResourceUsage.memory, systemResourceUsage.directMemory,
+                systemResourceUsage.bandwidthIn, systemResourceUsage.bandwidthOut);
+    }
+
+    // Update resource usage given each individual usage.
+    private void updateSystemResourceUsage(final ResourceUsage cpu, final ResourceUsage memory,
+                                           final ResourceUsage directMemory, final ResourceUsage bandwidthIn,
+                                           final ResourceUsage bandwidthOut) {
+        this.cpu = cpu;
+        this.memory = memory;
+        this.directMemory = directMemory;
+        this.bandwidthIn = bandwidthIn;
+        this.bandwidthOut = bandwidthOut;
+    }
+
+    // Aggregate all message, throughput, topic count, bundle count, consumer
+    // count, and producer count across the
+    // given data. Also keep track of bundle gains and losses.
+    private void updateBundleData(final Map<String, NamespaceBundleStats> bundleStats) {
+        msgRateIn = 0;
+        msgRateOut = 0;
+        msgThroughputIn = 0;
+        msgThroughputOut = 0;
+        int totalNumTopics = 0;
+        int totalNumBundles = 0;
+        int totalNumConsumers = 0;
+        int totalNumProducers = 0;
+        final Iterator<String> oldBundleIterator = bundles.iterator();
+        while (oldBundleIterator.hasNext()) {
+            final String bundle = oldBundleIterator.next();
+            if (!bundleStats.containsKey(bundle)) {
+                // If this bundle is in the old bundle set but not the new one,
+                // we lost it.
+                lastBundleLosses.add(bundle);
+                oldBundleIterator.remove();
+            }
+        }
+        for (Map.Entry<String, NamespaceBundleStats> entry : bundleStats.entrySet()) {
+            final String bundle = entry.getKey();
+            final NamespaceBundleStats stats = entry.getValue();
+            if (!bundles.contains(bundle)) {
+                // If this bundle is in the new bundle set but not the old one,
+                // we gained it.
+                lastBundleGains.add(bundle);
+                bundles.add(bundle);
+            }
+            msgThroughputIn += stats.msgThroughputIn;
+            msgThroughputOut += stats.msgThroughputOut;
+            msgRateIn += stats.msgRateIn;
+            msgRateOut += stats.msgRateOut;
+            totalNumTopics += stats.topics;
+            ++totalNumBundles;
+            totalNumConsumers += stats.consumerCount;
+            totalNumProducers += stats.producerCount;
+        }
+        numTopics = totalNumTopics;
+        numBundles = totalNumBundles;
+        numConsumers = totalNumConsumers;
+        numProducers = totalNumProducers;
+    }
+
+    public void cleanDeltas() {
+        lastBundleGains.clear();
+        lastBundleLosses.clear();
+    }
+
+    public double getMaxResourceUsage() {
+        return max(cpu.percentUsage(), directMemory.percentUsage(), bandwidthIn.percentUsage(),
+                bandwidthOut.percentUsage()) / 100;
+    }
+
+    public double getMaxResourceUsageWithWeight(final double cpuWeight, final double memoryWeight,
+                                                final double directMemoryWeight, final double bandwidthInWeight,
+                                                final double bandwidthOutWeight) {
+        return max(cpu.percentUsage() * cpuWeight, memory.percentUsage() * memoryWeight,
+                directMemory.percentUsage() * directMemoryWeight, bandwidthIn.percentUsage() * bandwidthInWeight,
+                bandwidthOut.percentUsage() * bandwidthOutWeight) / 100;
+    }
+
+    public double getMaxResourceUsageWithWeightWithinLimit(final double cpuWeight, final double memoryWeight,
+                                                           final double directMemoryWeight,
+                                                           final double bandwidthInWeight,
+                                                           final double bandwidthOutWeight) {
+        return maxWithinLimit(100.0d,
+                cpu.percentUsage() * cpuWeight, memory.percentUsage() * memoryWeight,
+                directMemory.percentUsage() * directMemoryWeight, bandwidthIn.percentUsage() * bandwidthInWeight,
+                bandwidthOut.percentUsage() * bandwidthOutWeight) / 100;
+    }
+
+    private static double getNicSpeedBytesInSec(ServiceConfiguration conf) {
+        return conf.getLoadBalancerOverrideBrokerNicSpeedGbps().isPresent()
+                ? conf.getLoadBalancerOverrideBrokerNicSpeedGbps().get() * gigaBitToByte : -1.0;
+    }
+
+    synchronized ResourceUsage getMsgThroughputInUsage(double nicSpeedBytesInSec) {
+        if (msgThroughputInUsage.usage != msgThroughputIn) {
+            msgThroughputInUsage = new ResourceUsage(msgThroughputIn, nicSpeedBytesInSec);
+        }
+        return msgThroughputInUsage;
+    }
+
+    synchronized ResourceUsage getMsgThroughputOutUsage(double nicSpeedBytesInSec) {
+        if (msgThroughputOutUsage.usage != msgThroughputOut) {
+            msgThroughputOutUsage = new ResourceUsage(msgThroughputOut, nicSpeedBytesInSec);
+        }
+        return msgThroughputOutUsage;
+    }
+
+    public double getMaxResourceUsageWithExtendedNetworkSignal(ServiceConfiguration conf) {
+
+        double nicSpeedBytesInSec = getNicSpeedBytesInSec(conf);
+        return maxWithinLimit(100.0d,
+                cpu.percentUsage() * conf.getLoadBalancerCPUResourceWeight(),
+                memory.percentUsage() * conf.getLoadBalancerMemoryResourceWeight(),
+                directMemory.percentUsage() * conf.getLoadBalancerDirectMemoryResourceWeight(),
+                bandwidthIn.percentUsage() * conf.getLoadBalancerBandwithInResourceWeight(),
+                bandwidthOut.percentUsage() * conf.getLoadBalancerBandwithOutResourceWeight(),
+                getMsgThroughputInUsage(nicSpeedBytesInSec).percentUsage()
+                        * conf.getLoadBalancerBandwithInResourceWeight(),

Review Comment:
   I see. Maybe we can add it when we need it. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codecov-commenter commented on pull request #18084: [improve][broker] PIP-192: Define new load manager base interfaces

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on PR #18084:
URL: https://github.com/apache/pulsar/pull/18084#issuecomment-1319475343

   # [Codecov](https://codecov.io/gh/apache/pulsar/pull/18084?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#18084](https://codecov.io/gh/apache/pulsar/pull/18084?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (5cb7b56) into [master](https://codecov.io/gh/apache/pulsar/commit/79750231051db849350e3d35dd3706320466acac?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (7975023) will **decrease** coverage by `8.74%`.
   > The diff coverage is `59.37%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/pulsar/pull/18084/graphs/tree.svg?width=650&height=150&src=pr&token=acYqCpsK9J&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/pulsar/pull/18084?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master   #18084      +/-   ##
   ============================================
   - Coverage     45.62%   36.87%   -8.75%     
   + Complexity    10075     1958    -8117     
   ============================================
     Files           697      208     -489     
     Lines         68024    14426   -53598     
     Branches       7293     1582    -5711     
   ============================================
   - Hits          31033     5319   -25714     
   + Misses        33413     8529   -24884     
   + Partials       3578      578    -3000     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | unittests | `36.87% <59.37%> (-8.75%)` | :arrow_down: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/pulsar/pull/18084?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...nt/impl/PulsarClientImplementationBindingImpl.java](https://codecov.io/gh/apache/pulsar/pull/18084/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2NsaWVudC9pbXBsL1B1bHNhckNsaWVudEltcGxlbWVudGF0aW9uQmluZGluZ0ltcGwuamF2YQ==) | `72.41% <ø> (-0.47%)` | :arrow_down: |
   | [...ar/client/impl/conf/ConsumerConfigurationData.java](https://codecov.io/gh/apache/pulsar/pull/18084/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2NsaWVudC9pbXBsL2NvbmYvQ29uc3VtZXJDb25maWd1cmF0aW9uRGF0YS5qYXZh) | `92.55% <ø> (+2.12%)` | :arrow_up: |
   | [...he/pulsar/client/impl/TypedMessageBuilderImpl.java](https://codecov.io/gh/apache/pulsar/pull/18084/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2NsaWVudC9pbXBsL1R5cGVkTWVzc2FnZUJ1aWxkZXJJbXBsLmphdmE=) | `30.76% <59.37%> (+3.56%)` | :arrow_up: |
   | [...he/pulsar/client/impl/PartitionedProducerImpl.java](https://codecov.io/gh/apache/pulsar/pull/18084/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2NsaWVudC9pbXBsL1BhcnRpdGlvbmVkUHJvZHVjZXJJbXBsLmphdmE=) | `30.34% <0.00%> (-5.13%)` | :arrow_down: |
   | [.../pulsar/client/impl/ProducerStatsRecorderImpl.java](https://codecov.io/gh/apache/pulsar/pull/18084/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2NsaWVudC9pbXBsL1Byb2R1Y2VyU3RhdHNSZWNvcmRlckltcGwuamF2YQ==) | `84.04% <0.00%> (-0.62%)` | :arrow_down: |
   | [...va/org/apache/pulsar/client/impl/ProducerImpl.java](https://codecov.io/gh/apache/pulsar/pull/18084/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2NsaWVudC9pbXBsL1Byb2R1Y2VySW1wbC5qYXZh) | `16.43% <0.00%> (-0.59%)` | :arrow_down: |
   | [...pulsar/proxy/server/ServiceChannelInitializer.java](https://codecov.io/gh/apache/pulsar/pull/18084/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLXByb3h5L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9wdWxzYXIvcHJveHkvc2VydmVyL1NlcnZpY2VDaGFubmVsSW5pdGlhbGl6ZXIuamF2YQ==) | | |
   | [...ar/broker/protocol/ProtocolHandlerDefinitions.java](https://codecov.io/gh/apache/pulsar/pull/18084/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9wcm90b2NvbC9Qcm90b2NvbEhhbmRsZXJEZWZpbml0aW9ucy5qYXZh) | | |
   | [...ce/schema/validator/StringSchemaDataValidator.java](https://codecov.io/gh/apache/pulsar/pull/18084/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9zZXJ2aWNlL3NjaGVtYS92YWxpZGF0b3IvU3RyaW5nU2NoZW1hRGF0YVZhbGlkYXRvci5qYXZh) | | |
   | [...pache/bookkeeper/mledger/LedgerOffloaderStats.java](https://codecov.io/gh/apache/pulsar/pull/18084/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-bWFuYWdlZC1sZWRnZXIvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2Jvb2trZWVwZXIvbWxlZGdlci9MZWRnZXJPZmZsb2FkZXJTdGF0cy5qYXZh) | | |
   | ... and [490 more](https://codecov.io/gh/apache/pulsar/pull/18084/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Demogorgon314 commented on a diff in pull request #18084: [improve][broker] PIP-192: Define new load manager base interfaces

Posted by GitBox <gi...@apache.org>.
Demogorgon314 commented on code in PR #18084:
URL: https://github.com/apache/pulsar/pull/18084#discussion_r1026036119


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStore.java:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.store;
+
+import java.io.Closeable;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
+
+/**
+ * The load data store interface.
+ *
+ * @param <T> The Load data type.
+ */
+public interface LoadDataStore<T> extends Closeable {
+
+    /**
+     * Async push load data to store.
+     *
+     * @param key The load data key. (e.g. bundle)

Review Comment:
   Here is just an example. Actually, we always use the broker lookup service address as the key. I will remove this example, and explain it in impl class.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Demogorgon314 commented on a diff in pull request #18084: [improve][broker] PIP-192: Define new load manager base interfaces

Posted by GitBox <gi...@apache.org>.
Demogorgon314 commented on code in PR #18084:
URL: https://github.com/apache/pulsar/pull/18084#discussion_r998891940


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensible/data/BrokerLoadDataTest.java:
##########
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensible.data;
+
+import static org.testng.Assert.assertEquals;
+
+import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker")
+public class BrokerLoadDataTest {

Review Comment:
   Updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #18084: [improve][broker] PIP-192: Define new load manager base interfaces

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #18084:
URL: https://github.com/apache/pulsar/pull/18084#discussion_r1032042101


##########
pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java:
##########
@@ -268,7 +268,7 @@ public double getMaxResourceUsageWithWeightWithinLimit(final double cpuWeight, f
                 bandwidthOut.percentUsage() * bandwidthOutWeight) / 100;
     }
 
-    private static double max(double... args) {
+     public static double max(double... args) {

Review Comment:
   ```suggestion
       public static double max(double... args) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Demogorgon314 commented on a diff in pull request #18084: [improve][broker] PIP-192: Define new load manager base interfaces

Posted by GitBox <gi...@apache.org>.
Demogorgon314 commented on code in PR #18084:
URL: https://github.com/apache/pulsar/pull/18084#discussion_r1026043664


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistry.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.metadata.api.NotificationType;
+
+/**
+ * Responsible for registering the current Broker lookup info to
+ * the distributed store (e.g. Zookeeper) for broker discovery.
+ */
+public interface BrokerRegistry {
+
+    /**
+     * Start broker registry.
+     */
+    void start();
+
+    /**
+     * Register local broker to metadata store.
+     */
+    void register();
+
+    /**
+     * Unregister the broker.
+     *
+     * Same as {@link org.apache.pulsar.broker.loadbalance.ModularLoadManager#disableBroker()}
+     */
+    void unregister() throws PulsarServerException;

Review Comment:
   I have changed `register` and `unregister` methods to throw the  `MetadataStoreException `, and the `start`, `close` methods to throw `PulsarServerException`. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Demogorgon314 commented on a diff in pull request #18084: [improve][broker] PIP-192: Define new load manager base interfaces

Posted by GitBox <gi...@apache.org>.
Demogorgon314 commented on code in PR #18084:
URL: https://github.com/apache/pulsar/pull/18084#discussion_r1025919161


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistry.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.metadata.api.NotificationType;
+
+/**
+ * Responsible for registering the current Broker lookup info to
+ * the distributed store (e.g. Zookeeper) for broker discovery.
+ */
+public interface BrokerRegistry {
+
+    /**
+     * Start broker registry.
+     */
+    void start();
+
+    /**
+     * Register local broker to metadata store.
+     */
+    void register();
+
+    /**
+     * Unregister the broker.
+     *
+     * Same as {@link org.apache.pulsar.broker.loadbalance.ModularLoadManager#disableBroker()}
+     */
+    void unregister() throws PulsarServerException;

Review Comment:
   Throw `PulsarServerException ` might be better? Since unregister will call when close the load manager, and `PulsarService#close()` will catch the `PulsarServerException` and throw it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Demogorgon314 merged pull request #18084: [improve][broker] PIP-192: Define new load manager base interfaces

Posted by GitBox <gi...@apache.org>.
Demogorgon314 merged PR #18084:
URL: https://github.com/apache/pulsar/pull/18084


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Demogorgon314 commented on a diff in pull request #18084: [improve][broker] PIP-192: Define new load manager base interfaces

Posted by GitBox <gi...@apache.org>.
Demogorgon314 commented on code in PR #18084:
URL: https://github.com/apache/pulsar/pull/18084#discussion_r1030276945


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLoadData.java:
##########
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.data;
+
+import lombok.Data;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
+import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
+
+/**
+ * Contains all the data that is maintained locally on each broker.
+ *
+ * Migrate from {@link org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData}.
+ * And removed the lookup data, see {@link BrokerLookupData}
+ */

Review Comment:
   The `LocalBrokerData` contains load data and lookup data. Split it will be more flexible for future extensions. What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Demogorgon314 commented on a diff in pull request #18084: [improve][broker] PIP-192: Define new load manager base interfaces

Posted by GitBox <gi...@apache.org>.
Demogorgon314 commented on code in PR #18084:
URL: https://github.com/apache/pulsar/pull/18084#discussion_r1030151132


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/NamespaceBundleSplitStrategy.java:
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.scheduler;
+
+import java.util.Set;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
+import org.apache.pulsar.broker.loadbalance.extensions.models.Split;
+
+/**
+ * Determines which bundles should be split based on various thresholds.
+ *
+ * Migrate from {@link org.apache.pulsar.broker.loadbalance.BundleSplitStrategy}
+ */
+public interface NamespaceBundleSplitStrategy {
+
+    /**
+     * Determines which bundles, if any, should be split.
+     *
+     * @param context The context used for decisions.
+     * @param pulsarService Pulsar service to use.
+     * @return A set of the bundles that should be split.
+     */
+    Set<Split> findBundlesToSplit(LoadManagerContext context, PulsarService pulsarService);

Review Comment:
   Updated. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] eolivelli commented on pull request #18084: [improve][broker] PIP-192: Define new load manager base interfaces

Posted by GitBox <gi...@apache.org>.
eolivelli commented on PR #18084:
URL: https://github.com/apache/pulsar/pull/18084#issuecomment-1309384138

   Il Mer 9 Nov 2022, 08:21 Kai Wang ***@***.***> ha scritto:
   
   > @heesung-sn <https://github.com/heesung-sn> Use
   > org.apache.pulsar.broker.loadbalance.overlord ok to me.
   >
   
   I am not sure.
   We are not used to this kind of creative names. We could do it but I prefer
   something that is meaningful for users.
   
   I would go with .extensions
   
   Enrico
   
   > —
   > Reply to this email directly, view it on GitHub
   > <https://github.com/apache/pulsar/pull/18084#issuecomment-1308322218>, or
   > unsubscribe
   > <https://github.com/notifications/unsubscribe-auth/ACIHZNUSI4NFG67RF2GUNHLWHNGGRANCNFSM6AAAAAARHZDICI>
   > .
   > You are receiving this because you commented.Message ID:
   > ***@***.***>
   >
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] shibd commented on a diff in pull request #18084: [improve][broker] PIP-192: Define new load manager base interfaces

Posted by GitBox <gi...@apache.org>.
shibd commented on code in PR #18084:
URL: https://github.com/apache/pulsar/pull/18084#discussion_r1026091071


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStore.java:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.store;
+
+import java.io.Closeable;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
+
+/**
+ * The load data store interface.
+ *
+ * @param <T> The Load data type.
+ */
+public interface LoadDataStore<T> extends Closeable {

Review Comment:
   Okey.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Demogorgon314 commented on a diff in pull request #18084: [improve][broker] PIP-192: Define new load manager base interfaces

Posted by GitBox <gi...@apache.org>.
Demogorgon314 commented on code in PR #18084:
URL: https://github.com/apache/pulsar/pull/18084#discussion_r1025919161


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistry.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.metadata.api.NotificationType;
+
+/**
+ * Responsible for registering the current Broker lookup info to
+ * the distributed store (e.g. Zookeeper) for broker discovery.
+ */
+public interface BrokerRegistry {
+
+    /**
+     * Start broker registry.
+     */
+    void start();
+
+    /**
+     * Register local broker to metadata store.
+     */
+    void register();
+
+    /**
+     * Unregister the broker.
+     *
+     * Same as {@link org.apache.pulsar.broker.loadbalance.ModularLoadManager#disableBroker()}
+     */
+    void unregister() throws PulsarServerException;

Review Comment:
   Throw `PulsarServerException ` might be better? Since unregister will call when close the broker-registery and `BrokerService#unloadNamespaceBundlesGracefully`, `PulsarService#close()` will catch the `PulsarServerException`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] heesung-sn commented on pull request #18084: [improve][broker] PIP-192: Define new load manager base interfaces

Posted by GitBox <gi...@apache.org>.
heesung-sn commented on PR #18084:
URL: https://github.com/apache/pulsar/pull/18084#issuecomment-1324469363

   Lgtm


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #18084: [improve][broker] PIP-192: Define new load manager base interfaces

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #18084:
URL: https://github.com/apache/pulsar/pull/18084#discussion_r1030376426


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistry.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.metadata.api.NotificationType;
+
+/**
+ * Responsible for registering the current Broker lookup info to
+ * the distributed store (e.g. Zookeeper) for broker discovery.
+ */
+public interface BrokerRegistry extends AutoCloseable {
+
+    /**
+     * Start broker registry.
+     */
+    void start() throws PulsarServerException;
+
+    /**
+     * Register local broker to metadata store.
+     */
+    void register() throws MetadataStoreException;
+
+    /**
+     * Unregister the broker.
+     *
+     * Same as {@link org.apache.pulsar.broker.loadbalance.ModularLoadManager#disableBroker()}
+     */
+    void unregister() throws MetadataStoreException;
+
+    /**
+     * Get the current broker lookup service address.
+     *
+     * @return The service url without the protocol prefix, 'http://'. e.g. broker-xyz:port
+     */
+    String getLookupServiceAddress();
+
+    /**
+     * Get available brokers.
+     *
+     * @return The brokers service url list.
+     */
+    List<String> getAvailableBrokers();
+
+    /**
+     * Async get available brokers.
+     *
+     * @return The brokers service url list.
+     */
+    CompletableFuture<List<String>> getAvailableBrokersAsync();
+
+    /**
+     * Get the broker lookup data.
+     *
+     * @param broker The service url without the protocol prefix, 'http://'. e.g. broker-xyz:port
+     */
+    Optional<BrokerLookupData> lookup(String broker);
+
+    /**
+     * For each the broker lookup data.
+     * The key is lookupServiceAddress{@link BrokerRegistry#getLookupServiceAddress()}
+     */

Review Comment:
   No. It's okay to expose `forEach` here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] heesung-sn commented on pull request #18084: [improve][broker] PIP-192: Define new load manager base interfaces

Posted by GitBox <gi...@apache.org>.
heesung-sn commented on PR #18084:
URL: https://github.com/apache/pulsar/pull/18084#issuecomment-1307519987

   Sorry, my last email got corrupted. Here is the corrected one.
   
   I am not sure pulsar follows this naming convention, but I think we can
   make
   a creative name for this project and use it as the package name, like
   
    org.apache.pulsar.broker.loadbalance.(project name).
   
   I propose “Overlord”.
   
   org.apache.pulsar.broker.loadbalance.overlord
   
   Origin:
   https://starcraft.fandom.com/wiki/Overlord
   
   
   I am also ok with
   org.apache.pulsar.broker.loadbalance.extensions
   or org.apache.pulsar.broker.loadbalance.extensible.
   
   Thanks,
   Heesung
   
   On Tue, Nov 8, 2022 at 8:38 AM Heesung Sohn ***@***.***>
   wrote:
   
   > Hi,
   >
   > I am not sure pulsar follows this naming convention, but I think we can
   > make a creative name for this project and use it as the package name, like
   >
   >  org.apache.pulsar.broker.loadbalance.(project name).
   >
   > I propose “Overlord”.
   >
   > org.apache.pulsar.broker.loadbalance.overlord
   >
   > Origin:
   > https://starcraft.fandom.com/wiki/Overlord
   >
   >
   > I am also ok with
   > org.apache.pulsar.broker.loadbalance.extensions or
   > org.apache.pulsar.broker.loadbalance.extensible.
   >
   >
   > Thanks,
   > Heesung
   >
   >
   > On Tue, Nov 8, 2022 at 5:19 AM Kai Wang ***@***.***> wrote:
   >
   >> ***@***.**** commented on this pull request.
   >> ------------------------------
   >>
   >> In
   >> pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensible/BrokerRegistry.java
   >> <https://github.com/apache/pulsar/pull/18084#discussion_r1016623764>:
   >>
   >> > + * distributed with this work for additional information
   >> + * regarding copyright ownership.  The ASF licenses this file
   >> + * to you under the Apache License, Version 2.0 (the
   >> + * "License"); you may not use this file except in compliance
   >> + * with the License.  You may obtain a copy of the License at
   >> + *
   >> + *   http://www.apache.org/licenses/LICENSE-2.0
   >> + *
   >> + * Unless required by applicable law or agreed to in writing,
   >> + * software distributed under the License is distributed on an
   >> + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
   >> + * KIND, either express or implied.  See the License for the
   >> + * specific language governing permissions and limitations
   >> + * under the License.
   >> + */
   >> +package org.apache.pulsar.broker.loadbalance.extensible;
   >>
   >> I will more prefer to use org.apache.pulsar.broker.loadbalance.extensions
   >> or org.apache.pulsar.broker.loadbalance.extensible.
   >>
   >> @heesung-sn <https://github.com/heesung-sn> Do you have any other
   >> opinions?
   >>
   >> —
   >> Reply to this email directly, view it on GitHub
   >> <https://github.com/apache/pulsar/pull/18084#discussion_r1016623764>, or
   >> unsubscribe
   >> <https://github.com/notifications/unsubscribe-auth/AYVJ675AMK74H72NFMJEDUTWHJHPTANCNFSM6AAAAAARHZDICI>
   >> .
   >> You are receiving this because you were mentioned.Message ID:
   >> ***@***.***>
   >>
   >
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Demogorgon314 commented on a diff in pull request #18084: [improve][broker] PIP-192: Define new load manager base interfaces

Posted by GitBox <gi...@apache.org>.
Demogorgon314 commented on code in PR #18084:
URL: https://github.com/apache/pulsar/pull/18084#discussion_r1018575096


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensible/BrokerRegistry.java:
##########
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensible;

Review Comment:
   Updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] eolivelli commented on a diff in pull request #18084: [improve][broker] PIP-192: Define new load manager base interfaces

Posted by GitBox <gi...@apache.org>.
eolivelli commented on code in PR #18084:
URL: https://github.com/apache/pulsar/pull/18084#discussion_r1015644591


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensible/BrokerRegistry.java:
##########
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensible;

Review Comment:
   can we change this package name ?
   I suggest org.apache.pulsar.broker.loadbalance.api or org.apache.pulsar.broker.loadbalance.extensions



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Demogorgon314 commented on a diff in pull request #18084: [improve][broker] PIP-192: Define new load manager base interfaces

Posted by GitBox <gi...@apache.org>.
Demogorgon314 commented on code in PR #18084:
URL: https://github.com/apache/pulsar/pull/18084#discussion_r1026006079


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStore.java:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.store;
+
+import java.io.Closeable;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
+
+/**
+ * The load data store interface.
+ *
+ * @param <T> The Load data type.
+ */
+public interface LoadDataStore<T> extends Closeable {

Review Comment:
   The `LoadDataStore` can be a generic interface, so we don't need to implement load data stores for each load data.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Demogorgon314 commented on a diff in pull request #18084: [improve][broker] PIP-192: Define new load manager base interfaces

Posted by GitBox <gi...@apache.org>.
Demogorgon314 commented on code in PR #18084:
URL: https://github.com/apache/pulsar/pull/18084#discussion_r1026045259


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistry.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.metadata.api.NotificationType;
+
+/**
+ * Responsible for registering the current Broker lookup info to
+ * the distributed store (e.g. Zookeeper) for broker discovery.
+ */
+public interface BrokerRegistry extends AutoCloseable {
+
+    /**
+     * Start broker registry.
+     */
+    void start();
+
+    /**
+     * Register local broker to metadata store.
+     */
+    void register();
+
+    /**
+     * Unregister the broker.
+     *
+     * Same as {@link org.apache.pulsar.broker.loadbalance.ModularLoadManager#disableBroker()}
+     */
+    void unregister() throws PulsarServerException;
+
+    /**
+     * Get the current broker lookup service address.
+     *
+     * @return The service url without the protocol prefix, 'http://'. e.g. broker-xyz:port
+     */
+    String getLookupServiceAddress();
+
+    /**
+     * Get available brokers.
+     *
+     * @return The brokers service url list.
+     */
+    List<String> getAvailableBrokers();
+
+    /**
+     * Async get available brokers.
+     *
+     * @return The brokers service url list.
+     */
+    CompletableFuture<List<String>> getAvailableBrokersAsync();
+
+    /**
+     * Fetch local-broker data from load-manager broker cache.

Review Comment:
   Yes, you are right. The API docs don't need to cover the details, which I removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] shibd commented on a diff in pull request #18084: [improve][broker] PIP-192: Define new load manager base interfaces

Posted by GitBox <gi...@apache.org>.
shibd commented on code in PR #18084:
URL: https://github.com/apache/pulsar/pull/18084#discussion_r1026092194


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLookupData.java:
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.data;
+
+import java.util.Map;
+import java.util.Optional;
+import org.apache.pulsar.broker.lookup.LookupResult;
+import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener;
+import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData;
+
+/**
+ * Defines the information required to broker lookup.
+ */
+public record BrokerLookupData (String webServiceUrl,

Review Comment:
   Thanks for your explain. Maybe we can change PIP-192. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Demogorgon314 commented on a diff in pull request #18084: [improve][broker] PIP-192: Define new load manager base interfaces

Posted by GitBox <gi...@apache.org>.
Demogorgon314 commented on code in PR #18084:
URL: https://github.com/apache/pulsar/pull/18084#discussion_r1031953243


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLoadData.java:
##########
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.data;
+
+import lombok.Data;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
+import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
+
+/**
+ * Contains all the data that is maintained locally on each broker.
+ *
+ * Migrate from {@link org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData}.
+ * And removed the lookup data, see {@link BrokerLookupData}
+ */
+@Data
+public class BrokerLoadData {
+    private static final double gigaBitToByte = 128 * 1024 * 1024.0;

Review Comment:
   I have removed some temporary unused methods from `BrokerLoadData`. I think we can add it when we use it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] heesung-sn commented on a diff in pull request #18084: [improve][broker] PIP-192: Define new load manager base interfaces

Posted by GitBox <gi...@apache.org>.
heesung-sn commented on code in PR #18084:
URL: https://github.com/apache/pulsar/pull/18084#discussion_r998998461


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensible/data/BrokerLoadData.java:
##########
@@ -0,0 +1,324 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensible.data;
+
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import lombok.Data;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicDomain;
+import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
+import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
+import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
+import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
+
+/**
+ * Contains all the data that is maintained locally on each broker.
+ *
+ * Migrate from {@link org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData}.
+ * And removed the lookup data, see {@link BrokerLookupData}
+ */
+@Data
+public class BrokerLoadData {
+
+    public static final String TOPIC =
+            TopicDomain.non_persistent
+                    + "://"
+                    + NamespaceName.SYSTEM_NAMESPACE
+                    + "/broker-load-data";
+    private static final double gigaBitToByte = 128 * 1024 * 1024.0;
+
+    // Most recently available system resource usage.
+    private ResourceUsage cpu;
+    private ResourceUsage memory;
+    private ResourceUsage directMemory;
+
+    private ResourceUsage bandwidthIn;
+    private ResourceUsage bandwidthOut;
+
+    // Message data from the most recent namespace bundle stats.
+    private double msgThroughputIn;
+    private ResourceUsage msgThroughputInUsage;
+    private double msgThroughputOut;
+    private ResourceUsage msgThroughputOutUsage;
+    private double msgRateIn;
+    private double msgRateOut;
+
+    // Timestamp of last update.
+    private long lastUpdate;
+
+    // The stats given in the most recent invocation of update.
+    private Map<String, NamespaceBundleStats> lastStats;
+
+    private int numTopics;
+    private int numBundles;
+    private int numConsumers;
+    private int numProducers;
+
+    // All bundles belonging to this broker.
+    private Set<String> bundles;
+
+    // The bundles gained since the last invocation of update.
+    private Set<String> lastBundleGains;
+
+    // The bundles lost since the last invocation of update.
+    private Set<String> lastBundleLosses;
+
+    public BrokerLoadData() {
+        lastStats = new ConcurrentHashMap<>();
+        lastUpdate = System.currentTimeMillis();
+        cpu = new ResourceUsage();
+        memory = new ResourceUsage();
+        directMemory = new ResourceUsage();
+        bandwidthIn = new ResourceUsage();
+        bandwidthOut = new ResourceUsage();
+        msgThroughputInUsage = new ResourceUsage();
+        msgThroughputOutUsage = new ResourceUsage();
+        bundles = new HashSet<>();
+        lastBundleGains = new HashSet<>();
+        lastBundleLosses = new HashSet<>();
+    }
+
+    /**
+     * Using the system resource usage and bundle stats acquired from the Pulsar client, update this LocalBrokerData.
+     *
+     * @param systemResourceUsage
+     *            System resource usage (cpu, memory, and direct memory).
+     * @param bundleStats
+     *            The bundle stats retrieved from the Pulsar client.
+     */
+    public void update(final SystemResourceUsage systemResourceUsage,
+                       final Map<String, NamespaceBundleStats> bundleStats) {
+        updateSystemResourceUsage(systemResourceUsage);
+        updateBundleData(bundleStats);
+        lastStats = bundleStats;
+    }
+
+    /**
+     * Using another LocalBrokerData, update this.
+     *
+     * @param other
+     *            LocalBrokerData to update from.
+     */
+    public void update(final BrokerLoadData other) {
+        updateSystemResourceUsage(other.cpu, other.memory, other.directMemory, other.bandwidthIn, other.bandwidthOut);
+        updateBundleData(other.lastStats);
+        lastStats = other.lastStats;
+    }
+
+    // Set the cpu, memory, and direct memory to that of the new system resource usage data.
+    private void updateSystemResourceUsage(final SystemResourceUsage systemResourceUsage) {
+        updateSystemResourceUsage(systemResourceUsage.cpu, systemResourceUsage.memory, systemResourceUsage.directMemory,
+                systemResourceUsage.bandwidthIn, systemResourceUsage.bandwidthOut);
+    }
+
+    // Update resource usage given each individual usage.
+    private void updateSystemResourceUsage(final ResourceUsage cpu, final ResourceUsage memory,
+                                           final ResourceUsage directMemory, final ResourceUsage bandwidthIn,
+                                           final ResourceUsage bandwidthOut) {
+        this.cpu = cpu;
+        this.memory = memory;
+        this.directMemory = directMemory;
+        this.bandwidthIn = bandwidthIn;
+        this.bandwidthOut = bandwidthOut;
+    }
+
+    // Aggregate all message, throughput, topic count, bundle count, consumer
+    // count, and producer count across the
+    // given data. Also keep track of bundle gains and losses.
+    private void updateBundleData(final Map<String, NamespaceBundleStats> bundleStats) {
+        msgRateIn = 0;
+        msgRateOut = 0;
+        msgThroughputIn = 0;
+        msgThroughputOut = 0;
+        int totalNumTopics = 0;
+        int totalNumBundles = 0;
+        int totalNumConsumers = 0;
+        int totalNumProducers = 0;
+        final Iterator<String> oldBundleIterator = bundles.iterator();
+        while (oldBundleIterator.hasNext()) {
+            final String bundle = oldBundleIterator.next();
+            if (!bundleStats.containsKey(bundle)) {
+                // If this bundle is in the old bundle set but not the new one,
+                // we lost it.
+                lastBundleLosses.add(bundle);
+                oldBundleIterator.remove();
+            }
+        }
+        for (Map.Entry<String, NamespaceBundleStats> entry : bundleStats.entrySet()) {
+            final String bundle = entry.getKey();
+            final NamespaceBundleStats stats = entry.getValue();
+            if (!bundles.contains(bundle)) {
+                // If this bundle is in the new bundle set but not the old one,
+                // we gained it.
+                lastBundleGains.add(bundle);
+                bundles.add(bundle);
+            }
+            msgThroughputIn += stats.msgThroughputIn;
+            msgThroughputOut += stats.msgThroughputOut;
+            msgRateIn += stats.msgRateIn;
+            msgRateOut += stats.msgRateOut;
+            totalNumTopics += stats.topics;
+            ++totalNumBundles;
+            totalNumConsumers += stats.consumerCount;
+            totalNumProducers += stats.producerCount;
+        }
+        numTopics = totalNumTopics;
+        numBundles = totalNumBundles;
+        numConsumers = totalNumConsumers;
+        numProducers = totalNumProducers;
+    }
+
+    public void cleanDeltas() {
+        lastBundleGains.clear();
+        lastBundleLosses.clear();
+    }
+
+    public double getMaxResourceUsage() {
+        return max(cpu.percentUsage(), directMemory.percentUsage(), bandwidthIn.percentUsage(),
+                bandwidthOut.percentUsage()) / 100;
+    }
+
+    public double getMaxResourceUsageWithWeight(final double cpuWeight, final double memoryWeight,
+                                                final double directMemoryWeight, final double bandwidthInWeight,
+                                                final double bandwidthOutWeight) {
+        return max(cpu.percentUsage() * cpuWeight, memory.percentUsage() * memoryWeight,
+                directMemory.percentUsage() * directMemoryWeight, bandwidthIn.percentUsage() * bandwidthInWeight,
+                bandwidthOut.percentUsage() * bandwidthOutWeight) / 100;
+    }
+
+    public double getMaxResourceUsageWithWeightWithinLimit(final double cpuWeight, final double memoryWeight,
+                                                           final double directMemoryWeight,
+                                                           final double bandwidthInWeight,
+                                                           final double bandwidthOutWeight) {
+        return maxWithinLimit(100.0d,
+                cpu.percentUsage() * cpuWeight, memory.percentUsage() * memoryWeight,
+                directMemory.percentUsage() * directMemoryWeight, bandwidthIn.percentUsage() * bandwidthInWeight,
+                bandwidthOut.percentUsage() * bandwidthOutWeight) / 100;
+    }
+
+    private static double getNicSpeedBytesInSec(ServiceConfiguration conf) {
+        return conf.getLoadBalancerOverrideBrokerNicSpeedGbps().isPresent()
+                ? conf.getLoadBalancerOverrideBrokerNicSpeedGbps().get() * gigaBitToByte : -1.0;
+    }
+
+    synchronized ResourceUsage getMsgThroughputInUsage(double nicSpeedBytesInSec) {
+        if (msgThroughputInUsage.usage != msgThroughputIn) {
+            msgThroughputInUsage = new ResourceUsage(msgThroughputIn, nicSpeedBytesInSec);
+        }
+        return msgThroughputInUsage;
+    }
+
+    synchronized ResourceUsage getMsgThroughputOutUsage(double nicSpeedBytesInSec) {
+        if (msgThroughputOutUsage.usage != msgThroughputOut) {
+            msgThroughputOutUsage = new ResourceUsage(msgThroughputOut, nicSpeedBytesInSec);
+        }
+        return msgThroughputOutUsage;
+    }
+
+    public double getMaxResourceUsageWithExtendedNetworkSignal(ServiceConfiguration conf) {
+
+        double nicSpeedBytesInSec = getNicSpeedBytesInSec(conf);
+        return maxWithinLimit(100.0d,
+                cpu.percentUsage() * conf.getLoadBalancerCPUResourceWeight(),
+                memory.percentUsage() * conf.getLoadBalancerMemoryResourceWeight(),
+                directMemory.percentUsage() * conf.getLoadBalancerDirectMemoryResourceWeight(),
+                bandwidthIn.percentUsage() * conf.getLoadBalancerBandwithInResourceWeight(),
+                bandwidthOut.percentUsage() * conf.getLoadBalancerBandwithOutResourceWeight(),
+                getMsgThroughputInUsage(nicSpeedBytesInSec).percentUsage()
+                        * conf.getLoadBalancerBandwithInResourceWeight(),

Review Comment:
   
   
   Context:
   `getMaxResourceUsageWithExtendedNetworkSignal()` has been added in the PoC code. As an improvement, this func additionally considers msgThroughputInUsage, msgThroughputOutUsage (these are new), because bandwidthIn and bandwidthOut signals always output zero in the virtual network env. 
   
   I think we may need separate weight config for these new MsgThroughputIn and MsgThroughOut resources.
   
   getLoadBalancergetMsgThroughputInResourceWeight
   getLoadBalancergetMsgThroughputOutResourceWeight
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Demogorgon314 commented on a diff in pull request #18084: [improve][broker] PIP-192: Define new load manager base interfaces

Posted by GitBox <gi...@apache.org>.
Demogorgon314 commented on code in PR #18084:
URL: https://github.com/apache/pulsar/pull/18084#discussion_r998893059


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensible/data/BrokerLoadData.java:
##########
@@ -0,0 +1,324 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensible.data;
+
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import lombok.Data;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicDomain;
+import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
+import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
+import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
+import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
+
+/**
+ * Contains all the data that is maintained locally on each broker.
+ *
+ * Migrate from {@link org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData}.
+ * And removed the lookup data, see {@link BrokerLookupData}
+ */
+@Data
+public class BrokerLoadData {
+
+    public static final String TOPIC =
+            TopicDomain.non_persistent
+                    + "://"
+                    + NamespaceName.SYSTEM_NAMESPACE
+                    + "/broker-load-data";
+    private static final double gigaBitToByte = 128 * 1024 * 1024.0;
+
+    // Most recently available system resource usage.
+    private ResourceUsage cpu;
+    private ResourceUsage memory;
+    private ResourceUsage directMemory;
+
+    private ResourceUsage bandwidthIn;
+    private ResourceUsage bandwidthOut;
+
+    // Message data from the most recent namespace bundle stats.
+    private double msgThroughputIn;
+    private ResourceUsage msgThroughputInUsage;
+    private double msgThroughputOut;
+    private ResourceUsage msgThroughputOutUsage;
+    private double msgRateIn;
+    private double msgRateOut;
+
+    // Timestamp of last update.
+    private long lastUpdate;
+
+    // The stats given in the most recent invocation of update.
+    private Map<String, NamespaceBundleStats> lastStats;
+
+    private int numTopics;
+    private int numBundles;
+    private int numConsumers;
+    private int numProducers;
+
+    // All bundles belonging to this broker.
+    private Set<String> bundles;
+
+    // The bundles gained since the last invocation of update.
+    private Set<String> lastBundleGains;
+
+    // The bundles lost since the last invocation of update.
+    private Set<String> lastBundleLosses;
+
+    public BrokerLoadData() {
+        lastStats = new ConcurrentHashMap<>();
+        lastUpdate = System.currentTimeMillis();
+        cpu = new ResourceUsage();
+        memory = new ResourceUsage();
+        directMemory = new ResourceUsage();
+        bandwidthIn = new ResourceUsage();
+        bandwidthOut = new ResourceUsage();
+        msgThroughputInUsage = new ResourceUsage();
+        msgThroughputOutUsage = new ResourceUsage();
+        bundles = new HashSet<>();
+        lastBundleGains = new HashSet<>();
+        lastBundleLosses = new HashSet<>();
+    }
+
+    /**
+     * Using the system resource usage and bundle stats acquired from the Pulsar client, update this LocalBrokerData.
+     *
+     * @param systemResourceUsage
+     *            System resource usage (cpu, memory, and direct memory).
+     * @param bundleStats
+     *            The bundle stats retrieved from the Pulsar client.
+     */
+    public void update(final SystemResourceUsage systemResourceUsage,
+                       final Map<String, NamespaceBundleStats> bundleStats) {
+        updateSystemResourceUsage(systemResourceUsage);
+        updateBundleData(bundleStats);
+        lastStats = bundleStats;
+    }
+
+    /**
+     * Using another LocalBrokerData, update this.
+     *
+     * @param other
+     *            LocalBrokerData to update from.
+     */
+    public void update(final BrokerLoadData other) {
+        updateSystemResourceUsage(other.cpu, other.memory, other.directMemory, other.bandwidthIn, other.bandwidthOut);
+        updateBundleData(other.lastStats);
+        lastStats = other.lastStats;
+    }
+
+    // Set the cpu, memory, and direct memory to that of the new system resource usage data.
+    private void updateSystemResourceUsage(final SystemResourceUsage systemResourceUsage) {
+        updateSystemResourceUsage(systemResourceUsage.cpu, systemResourceUsage.memory, systemResourceUsage.directMemory,
+                systemResourceUsage.bandwidthIn, systemResourceUsage.bandwidthOut);
+    }
+
+    // Update resource usage given each individual usage.
+    private void updateSystemResourceUsage(final ResourceUsage cpu, final ResourceUsage memory,
+                                           final ResourceUsage directMemory, final ResourceUsage bandwidthIn,
+                                           final ResourceUsage bandwidthOut) {
+        this.cpu = cpu;
+        this.memory = memory;
+        this.directMemory = directMemory;
+        this.bandwidthIn = bandwidthIn;
+        this.bandwidthOut = bandwidthOut;
+    }
+
+    // Aggregate all message, throughput, topic count, bundle count, consumer
+    // count, and producer count across the
+    // given data. Also keep track of bundle gains and losses.
+    private void updateBundleData(final Map<String, NamespaceBundleStats> bundleStats) {
+        msgRateIn = 0;
+        msgRateOut = 0;
+        msgThroughputIn = 0;
+        msgThroughputOut = 0;
+        int totalNumTopics = 0;
+        int totalNumBundles = 0;
+        int totalNumConsumers = 0;
+        int totalNumProducers = 0;
+        final Iterator<String> oldBundleIterator = bundles.iterator();
+        while (oldBundleIterator.hasNext()) {
+            final String bundle = oldBundleIterator.next();
+            if (!bundleStats.containsKey(bundle)) {
+                // If this bundle is in the old bundle set but not the new one,
+                // we lost it.
+                lastBundleLosses.add(bundle);
+                oldBundleIterator.remove();
+            }
+        }
+        for (Map.Entry<String, NamespaceBundleStats> entry : bundleStats.entrySet()) {
+            final String bundle = entry.getKey();
+            final NamespaceBundleStats stats = entry.getValue();
+            if (!bundles.contains(bundle)) {
+                // If this bundle is in the new bundle set but not the old one,
+                // we gained it.
+                lastBundleGains.add(bundle);
+                bundles.add(bundle);
+            }
+            msgThroughputIn += stats.msgThroughputIn;
+            msgThroughputOut += stats.msgThroughputOut;
+            msgRateIn += stats.msgRateIn;
+            msgRateOut += stats.msgRateOut;
+            totalNumTopics += stats.topics;
+            ++totalNumBundles;
+            totalNumConsumers += stats.consumerCount;
+            totalNumProducers += stats.producerCount;
+        }
+        numTopics = totalNumTopics;
+        numBundles = totalNumBundles;
+        numConsumers = totalNumConsumers;
+        numProducers = totalNumProducers;
+    }
+
+    public void cleanDeltas() {
+        lastBundleGains.clear();
+        lastBundleLosses.clear();
+    }
+
+    public double getMaxResourceUsage() {
+        return max(cpu.percentUsage(), directMemory.percentUsage(), bandwidthIn.percentUsage(),
+                bandwidthOut.percentUsage()) / 100;
+    }
+
+    public double getMaxResourceUsageWithWeight(final double cpuWeight, final double memoryWeight,
+                                                final double directMemoryWeight, final double bandwidthInWeight,
+                                                final double bandwidthOutWeight) {
+        return max(cpu.percentUsage() * cpuWeight, memory.percentUsage() * memoryWeight,
+                directMemory.percentUsage() * directMemoryWeight, bandwidthIn.percentUsage() * bandwidthInWeight,
+                bandwidthOut.percentUsage() * bandwidthOutWeight) / 100;
+    }
+
+    public double getMaxResourceUsageWithWeightWithinLimit(final double cpuWeight, final double memoryWeight,
+                                                           final double directMemoryWeight,
+                                                           final double bandwidthInWeight,
+                                                           final double bandwidthOutWeight) {
+        return maxWithinLimit(100.0d,
+                cpu.percentUsage() * cpuWeight, memory.percentUsage() * memoryWeight,
+                directMemory.percentUsage() * directMemoryWeight, bandwidthIn.percentUsage() * bandwidthInWeight,
+                bandwidthOut.percentUsage() * bandwidthOutWeight) / 100;
+    }
+
+    private static double getNicSpeedBytesInSec(ServiceConfiguration conf) {
+        return conf.getLoadBalancerOverrideBrokerNicSpeedGbps().isPresent()
+                ? conf.getLoadBalancerOverrideBrokerNicSpeedGbps().get() * gigaBitToByte : -1.0;
+    }
+
+    synchronized ResourceUsage getMsgThroughputInUsage(double nicSpeedBytesInSec) {
+        if (msgThroughputInUsage.usage != msgThroughputIn) {
+            msgThroughputInUsage = new ResourceUsage(msgThroughputIn, nicSpeedBytesInSec);
+        }
+        return msgThroughputInUsage;
+    }
+
+    synchronized ResourceUsage getMsgThroughputOutUsage(double nicSpeedBytesInSec) {
+        if (msgThroughputOutUsage.usage != msgThroughputOut) {
+            msgThroughputOutUsage = new ResourceUsage(msgThroughputOut, nicSpeedBytesInSec);
+        }
+        return msgThroughputOutUsage;
+    }
+
+    public double getMaxResourceUsageWithExtendedNetworkSignal(ServiceConfiguration conf) {
+
+        double nicSpeedBytesInSec = getNicSpeedBytesInSec(conf);
+        return maxWithinLimit(100.0d,
+                cpu.percentUsage() * conf.getLoadBalancerCPUResourceWeight(),
+                memory.percentUsage() * conf.getLoadBalancerMemoryResourceWeight(),
+                directMemory.percentUsage() * conf.getLoadBalancerDirectMemoryResourceWeight(),
+                bandwidthIn.percentUsage() * conf.getLoadBalancerBandwithInResourceWeight(),
+                bandwidthOut.percentUsage() * conf.getLoadBalancerBandwithOutResourceWeight(),
+                getMsgThroughputInUsage(nicSpeedBytesInSec).percentUsage()
+                        * conf.getLoadBalancerBandwithInResourceWeight(),

Review Comment:
   Can you explain more about why we require updates in BrokerConfiguration?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Demogorgon314 commented on a diff in pull request #18084: [improve][broker] PIP-192: Define new load manager base interfaces

Posted by GitBox <gi...@apache.org>.
Demogorgon314 commented on code in PR #18084:
URL: https://github.com/apache/pulsar/pull/18084#discussion_r998891257


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensible/models/Split.java:
##########
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensible.models;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * Defines the information required for a bundle split.
+ */
+public record Split(String bundle, String sourceBroker, Map<String, Optional<String>> splitBundleToDestBroker) {

Review Comment:
   I will wait for your PR to merge and rebase it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] heesung-sn commented on pull request #18084: [improve][broker] PIP-192: Define new load manager base interfaces

Posted by GitBox <gi...@apache.org>.
heesung-sn commented on PR #18084:
URL: https://github.com/apache/pulsar/pull/18084#issuecomment-1307504956

   Hi,
   
   I am not sure pulsar follows this naming convention, but I think we can
   make a creative name for this project and use it as the package name, like
   
    org.apache.pulsar.broker.loadbalance.(project name).
   
   I propose “Overlord”.
   
   org.apache.pulsar.broker.loadbalance.overlord
   
   Origin:
   https://starcraft.fandom.com/wiki/Overlord
   
   
   I am also ok with
   org.apache.pulsar.broker.loadbalance.extensions or
   org.apache.pulsar.broker.loadbalance.extensible.
   
   
   Thanks,
   Heesung
   
   
   On Tue, Nov 8, 2022 at 5:19 AM Kai Wang ***@***.***> wrote:
   
   > ***@***.**** commented on this pull request.
   > ------------------------------
   >
   > In
   > pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensible/BrokerRegistry.java
   > <https://github.com/apache/pulsar/pull/18084#discussion_r1016623764>:
   >
   > > + * distributed with this work for additional information
   > + * regarding copyright ownership.  The ASF licenses this file
   > + * to you under the Apache License, Version 2.0 (the
   > + * "License"); you may not use this file except in compliance
   > + * with the License.  You may obtain a copy of the License at
   > + *
   > + *   http://www.apache.org/licenses/LICENSE-2.0
   > + *
   > + * Unless required by applicable law or agreed to in writing,
   > + * software distributed under the License is distributed on an
   > + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
   > + * KIND, either express or implied.  See the License for the
   > + * specific language governing permissions and limitations
   > + * under the License.
   > + */
   > +package org.apache.pulsar.broker.loadbalance.extensible;
   >
   > I will more prefer to use org.apache.pulsar.broker.loadbalance.extensions
   > or org.apache.pulsar.broker.loadbalance.extensible.
   >
   > @heesung-sn <https://github.com/heesung-sn> Do you have any other
   > opinions?
   >
   > —
   > Reply to this email directly, view it on GitHub
   > <https://github.com/apache/pulsar/pull/18084#discussion_r1016623764>, or
   > unsubscribe
   > <https://github.com/notifications/unsubscribe-auth/AYVJ675AMK74H72NFMJEDUTWHJHPTANCNFSM6AAAAAARHZDICI>
   > .
   > You are receiving this because you were mentioned.Message ID:
   > ***@***.***>
   >
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Demogorgon314 commented on a diff in pull request #18084: [improve][broker] PIP-192: Define new load manager base interfaces

Posted by GitBox <gi...@apache.org>.
Demogorgon314 commented on code in PR #18084:
URL: https://github.com/apache/pulsar/pull/18084#discussion_r1026042591


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/LoadManagerScheduler.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.scheduler;
+
+import java.io.Closeable;
+
+/**
+ * The load manager scheduler.
+ */
+public interface LoadManagerScheduler extends Closeable {

Review Comment:
   Updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Demogorgon314 commented on a diff in pull request #18084: [improve][broker] PIP-192: Define new load manager base interfaces

Posted by GitBox <gi...@apache.org>.
Demogorgon314 commented on code in PR #18084:
URL: https://github.com/apache/pulsar/pull/18084#discussion_r998890457


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensible/data/BrokerLoadData.java:
##########
@@ -0,0 +1,324 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensible.data;
+
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import lombok.Data;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicDomain;
+import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
+import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
+import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
+import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
+
+/**
+ * Contains all the data that is maintained locally on each broker.
+ *
+ * Migrate from {@link org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData}.
+ * And removed the lookup data, see {@link BrokerLookupData}
+ */
+@Data
+public class BrokerLoadData {
+
+    public static final String TOPIC =
+            TopicDomain.non_persistent
+                    + "://"
+                    + NamespaceName.SYSTEM_NAMESPACE
+                    + "/broker-load-data";
+    private static final double gigaBitToByte = 128 * 1024 * 1024.0;
+
+    // Most recently available system resource usage.
+    private ResourceUsage cpu;
+    private ResourceUsage memory;
+    private ResourceUsage directMemory;
+
+    private ResourceUsage bandwidthIn;
+    private ResourceUsage bandwidthOut;
+
+    // Message data from the most recent namespace bundle stats.
+    private double msgThroughputIn;
+    private ResourceUsage msgThroughputInUsage;
+    private double msgThroughputOut;
+    private ResourceUsage msgThroughputOutUsage;
+    private double msgRateIn;
+    private double msgRateOut;
+
+    // Timestamp of last update.
+    private long lastUpdate;
+
+    // The stats given in the most recent invocation of update.
+    private Map<String, NamespaceBundleStats> lastStats;

Review Comment:
   Updated.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensible/data/TopBundlesLoadData.java:
##########
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensible.data;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import lombok.Getter;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicDomain;
+import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
+
+@Getter
+public class TopBundlesLoadData {
+
+    public static final String TOPIC =
+            TopicDomain.non_persistent
+                    + "://"
+                    + NamespaceName.SYSTEM_NAMESPACE
+                    + "/top-bundle-load-data";
+
+    @JsonProperty("top_bundles_load_data")

Review Comment:
   Updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] heesung-sn commented on pull request #18084: [improve][broker] PIP-192: Define new load manager base interfaces

Posted by GitBox <gi...@apache.org>.
heesung-sn commented on PR #18084:
URL: https://github.com/apache/pulsar/pull/18084#issuecomment-1309449780

   ok. I am fine with `.extensions` too.
   
   Thanks,
   Heesung
   
   On Wed, Nov 9, 2022 at 1:16 PM Enrico Olivelli ***@***.***>
   wrote:
   
   > Il Mer 9 Nov 2022, 08:21 Kai Wang ***@***.***> ha scritto:
   >
   > > @heesung-sn <https://github.com/heesung-sn> Use
   > > org.apache.pulsar.broker.loadbalance.overlord ok to me.
   > >
   >
   > I am not sure.
   > We are not used to this kind of creative names. We could do it but I prefer
   > something that is meaningful for users.
   >
   > I would go with .extensions
   >
   > Enrico
   >
   > > —
   > > Reply to this email directly, view it on GitHub
   > > <https://github.com/apache/pulsar/pull/18084#issuecomment-1308322218>,
   > or
   > > unsubscribe
   > > <
   > https://github.com/notifications/unsubscribe-auth/ACIHZNUSI4NFG67RF2GUNHLWHNGGRANCNFSM6AAAAAARHZDICI
   > >
   > > .
   > > You are receiving this because you commented.Message ID:
   > > ***@***.***>
   > >
   >
   > —
   > Reply to this email directly, view it on GitHub
   > <https://github.com/apache/pulsar/pull/18084#issuecomment-1309384138>, or
   > unsubscribe
   > <https://github.com/notifications/unsubscribe-auth/AYVJ674I3LHUL2PZFJDNN73WHQIB3ANCNFSM6AAAAAARHZDICI>
   > .
   > You are receiving this because you were mentioned.Message ID:
   > ***@***.***>
   >
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Demogorgon314 commented on a diff in pull request #18084: [improve][broker] PIP-192: Define new load manager base interfaces

Posted by GitBox <gi...@apache.org>.
Demogorgon314 commented on code in PR #18084:
URL: https://github.com/apache/pulsar/pull/18084#discussion_r1027501178


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistry.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.metadata.api.NotificationType;
+
+/**
+ * Responsible for registering the current Broker lookup info to
+ * the distributed store (e.g. Zookeeper) for broker discovery.
+ */
+public interface BrokerRegistry extends AutoCloseable {
+
+    /**
+     * Start broker registry.
+     */
+    void start() throws PulsarServerException;
+
+    /**
+     * Register local broker to metadata store.
+     */
+    void register() throws MetadataStoreException;
+
+    /**
+     * Unregister the broker.
+     *
+     * Same as {@link org.apache.pulsar.broker.loadbalance.ModularLoadManager#disableBroker()}
+     */
+    void unregister() throws MetadataStoreException;
+
+    /**
+     * Get the current broker lookup service address.
+     *
+     * @return The service url without the protocol prefix, 'http://'. e.g. broker-xyz:port
+     */
+    String getLookupServiceAddress();
+
+    /**
+     * Get available brokers.
+     *
+     * @return The brokers service url list.
+     */
+    List<String> getAvailableBrokers();
+
+    /**
+     * Async get available brokers.
+     *
+     * @return The brokers service url list.
+     */
+    CompletableFuture<List<String>> getAvailableBrokersAsync();

Review Comment:
   Updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #18084: [improve][broker] PIP-192: Define new load manager base interfaces

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #18084:
URL: https://github.com/apache/pulsar/pull/18084#discussion_r1027498084


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLoadData.java:
##########
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.data;
+
+import lombok.Data;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
+import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
+
+/**
+ * Contains all the data that is maintained locally on each broker.
+ *
+ * Migrate from {@link org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData}.
+ * And removed the lookup data, see {@link BrokerLookupData}
+ */

Review Comment:
   Maybe we can reuse `BrokerLoadData` and `BrokerLookupData` in `LocalBrokerData` now to avoid duplicated code.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistry.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.metadata.api.NotificationType;
+
+/**
+ * Responsible for registering the current Broker lookup info to
+ * the distributed store (e.g. Zookeeper) for broker discovery.
+ */
+public interface BrokerRegistry extends AutoCloseable {
+
+    /**
+     * Start broker registry.
+     */
+    void start() throws PulsarServerException;
+
+    /**
+     * Register local broker to metadata store.
+     */
+    void register() throws MetadataStoreException;
+
+    /**
+     * Unregister the broker.
+     *
+     * Same as {@link org.apache.pulsar.broker.loadbalance.ModularLoadManager#disableBroker()}
+     */
+    void unregister() throws MetadataStoreException;
+
+    /**
+     * Get the current broker lookup service address.
+     *
+     * @return The service url without the protocol prefix, 'http://'. e.g. broker-xyz:port
+     */
+    String getLookupServiceAddress();
+
+    /**
+     * Get available brokers.
+     *
+     * @return The brokers service url list.
+     */
+    List<String> getAvailableBrokers();
+
+    /**
+     * Async get available brokers.
+     *
+     * @return The brokers service url list.
+     */
+    CompletableFuture<List<String>> getAvailableBrokersAsync();
+
+    /**
+     * Get the broker lookup data.
+     *
+     * @param broker The service url without the protocol prefix, 'http://'. e.g. broker-xyz:port
+     */
+    Optional<BrokerLookupData> lookup(String broker);
+
+    /**
+     * For each the broker lookup data.
+     * The key is lookupServiceAddress{@link BrokerRegistry#getLookupServiceAddress()}
+     */

Review Comment:
   Why did you define a `forEach` method? Once we have a `Map<String, BrokerLookupData> map`, we can just use `map.forEach(...)`, it's more clear than `brokerRegistry.forEach(...)`.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistry.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.metadata.api.NotificationType;
+
+/**
+ * Responsible for registering the current Broker lookup info to
+ * the distributed store (e.g. Zookeeper) for broker discovery.
+ */
+public interface BrokerRegistry extends AutoCloseable {
+
+    /**
+     * Start broker registry.
+     */
+    void start() throws PulsarServerException;
+
+    /**
+     * Register local broker to metadata store.
+     */
+    void register() throws MetadataStoreException;
+
+    /**
+     * Unregister the broker.
+     *
+     * Same as {@link org.apache.pulsar.broker.loadbalance.ModularLoadManager#disableBroker()}
+     */
+    void unregister() throws MetadataStoreException;
+
+    /**
+     * Get the current broker lookup service address.
+     *
+     * @return The service url without the protocol prefix, 'http://'. e.g. broker-xyz:port
+     */
+    String getLookupServiceAddress();
+
+    /**
+     * Get available brokers.
+     *
+     * @return The brokers service url list.
+     */
+    List<String> getAvailableBrokers();
+
+    /**
+     * Async get available brokers.
+     *
+     * @return The brokers service url list.
+     */
+    CompletableFuture<List<String>> getAvailableBrokersAsync();
+
+    /**
+     * Get the broker lookup data.
+     *
+     * @param broker The service url without the protocol prefix, 'http://'. e.g. broker-xyz:port
+     */
+    Optional<BrokerLookupData> lookup(String broker);

Review Comment:
   It seems to be a synchronous API. There are two questions:
   1. Why don't you add an asynchronous API instead?
   2. Why does it throw no exception?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #18084: [improve][broker] PIP-192: Define new load manager base interfaces

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #18084:
URL: https://github.com/apache/pulsar/pull/18084#discussion_r1027465275


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistry.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.metadata.api.NotificationType;
+
+/**
+ * Responsible for registering the current Broker lookup info to
+ * the distributed store (e.g. Zookeeper) for broker discovery.
+ */
+public interface BrokerRegistry extends AutoCloseable {
+
+    /**
+     * Start broker registry.
+     */
+    void start() throws PulsarServerException;
+
+    /**
+     * Register local broker to metadata store.
+     */
+    void register() throws MetadataStoreException;
+
+    /**
+     * Unregister the broker.
+     *
+     * Same as {@link org.apache.pulsar.broker.loadbalance.ModularLoadManager#disableBroker()}
+     */
+    void unregister() throws MetadataStoreException;
+
+    /**
+     * Get the current broker lookup service address.
+     *
+     * @return The service url without the protocol prefix, 'http://'. e.g. broker-xyz:port
+     */
+    String getLookupServiceAddress();
+
+    /**
+     * Get available brokers.
+     *
+     * @return The brokers service url list.
+     */
+    List<String> getAvailableBrokers();
+
+    /**
+     * Async get available brokers.
+     *
+     * @return The brokers service url list.
+     */
+    CompletableFuture<List<String>> getAvailableBrokersAsync();

Review Comment:
   I think we don't need a synchronous API in the interface. The synchronous APIs are usually exposed to application users for code simplicity. But in the core code of Pulsar, we should use asynchronous APIs as much as possible.
   
   If you want a synchronous API, you can simply use the `join` or `get` method, which is determined by if the exception should be ignored. BTW, when expose a synchronous API to users, the exception should not be ignored.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #18084: [improve][broker] PIP-192: Define new load manager base interfaces

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #18084:
URL: https://github.com/apache/pulsar/pull/18084#discussion_r1031939571


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLoadData.java:
##########
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.data;
+
+import lombok.Data;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
+import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
+
+/**
+ * Contains all the data that is maintained locally on each broker.
+ *
+ * Migrate from {@link org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData}.
+ * And removed the lookup data, see {@link BrokerLookupData}
+ */
+@Data
+public class BrokerLoadData {
+    private static final double gigaBitToByte = 128 * 1024 * 1024.0;
+
+    // Most recently available system resource usage.
+    private ResourceUsage cpu;
+    private ResourceUsage memory;
+    private ResourceUsage directMemory;
+
+    private ResourceUsage bandwidthIn;
+    private ResourceUsage bandwidthOut;
+
+    // Message data from the most recent namespace bundle stats.
+    private double msgThroughputIn;
+    private ResourceUsage msgThroughputInUsage;
+    private double msgThroughputOut;
+    private ResourceUsage msgThroughputOutUsage;
+    private double msgRateIn;
+    private double msgRateOut;
+
+    public BrokerLoadData() {
+        cpu = new ResourceUsage();
+        memory = new ResourceUsage();
+        directMemory = new ResourceUsage();
+        bandwidthIn = new ResourceUsage();
+        bandwidthOut = new ResourceUsage();
+        msgThroughputInUsage = new ResourceUsage();
+        msgThroughputOutUsage = new ResourceUsage();
+    }
+
+    /**
+     * Using the system resource usage and bundle stats acquired from the Pulsar client, update this LocalBrokerData.
+     *
+     * @param systemResourceUsage
+     *            System resource usage (cpu, memory, and direct memory).
+     */
+    public void update(final SystemResourceUsage systemResourceUsage) {
+        updateSystemResourceUsage(systemResourceUsage);
+    }
+
+    /**
+     * Using another LocalBrokerData, update this.
+     *
+     * @param other
+     *            LocalBrokerData to update from.
+     */
+    public void update(final BrokerLoadData other) {
+        updateSystemResourceUsage(other.cpu, other.memory, other.directMemory, other.bandwidthIn, other.bandwidthOut);
+    }
+
+    // Set the cpu, memory, and direct memory to that of the new system resource usage data.
+    private void updateSystemResourceUsage(final SystemResourceUsage systemResourceUsage) {
+        updateSystemResourceUsage(systemResourceUsage.cpu, systemResourceUsage.memory, systemResourceUsage.directMemory,
+                systemResourceUsage.bandwidthIn, systemResourceUsage.bandwidthOut);
+    }
+
+    // Update resource usage given each individual usage.
+    private void updateSystemResourceUsage(final ResourceUsage cpu, final ResourceUsage memory,
+                                           final ResourceUsage directMemory, final ResourceUsage bandwidthIn,
+                                           final ResourceUsage bandwidthOut) {
+        this.cpu = cpu;
+        this.memory = memory;
+        this.directMemory = directMemory;
+        this.bandwidthIn = bandwidthIn;
+        this.bandwidthOut = bandwidthOut;
+    }
+
+    public double getMaxResourceUsage() {
+        return max(cpu.percentUsage(), directMemory.percentUsage(), bandwidthIn.percentUsage(),
+                bandwidthOut.percentUsage()) / 100;
+    }
+
+    public double getMaxResourceUsageWithWeight(final double cpuWeight, final double memoryWeight,
+                                                final double directMemoryWeight, final double bandwidthInWeight,
+                                                final double bandwidthOutWeight) {
+        return max(cpu.percentUsage() * cpuWeight, memory.percentUsage() * memoryWeight,
+                directMemory.percentUsage() * directMemoryWeight, bandwidthIn.percentUsage() * bandwidthInWeight,
+                bandwidthOut.percentUsage() * bandwidthOutWeight) / 100;
+    }
+
+    public double getMaxResourceUsageWithWeightWithinLimit(final double cpuWeight, final double memoryWeight,
+                                                           final double directMemoryWeight,
+                                                           final double bandwidthInWeight,
+                                                           final double bandwidthOutWeight) {
+        return maxWithinLimit(100.0d,
+                cpu.percentUsage() * cpuWeight, memory.percentUsage() * memoryWeight,
+                directMemory.percentUsage() * directMemoryWeight, bandwidthIn.percentUsage() * bandwidthInWeight,
+                bandwidthOut.percentUsage() * bandwidthOutWeight) / 100;
+    }
+
+    private static double getNicSpeedBytesInSec(ServiceConfiguration conf) {
+        return conf.getLoadBalancerOverrideBrokerNicSpeedGbps().isPresent()
+                ? conf.getLoadBalancerOverrideBrokerNicSpeedGbps().get() * gigaBitToByte : -1.0;
+    }
+
+    synchronized ResourceUsage getMsgThroughputInUsage(double nicSpeedBytesInSec) {
+        if (msgThroughputInUsage.usage != msgThroughputIn) {
+            msgThroughputInUsage = new ResourceUsage(msgThroughputIn, nicSpeedBytesInSec);
+        }
+        return msgThroughputInUsage;
+    }
+
+    synchronized ResourceUsage getMsgThroughputOutUsage(double nicSpeedBytesInSec) {
+        if (msgThroughputOutUsage.usage != msgThroughputOut) {
+            msgThroughputOutUsage = new ResourceUsage(msgThroughputOut, nicSpeedBytesInSec);
+        }
+        return msgThroughputOutUsage;
+    }
+
+    public double getMaxResourceUsageWithExtendedNetworkSignal(ServiceConfiguration conf) {
+
+        double nicSpeedBytesInSec = getNicSpeedBytesInSec(conf);
+        return maxWithinLimit(100.0d,
+                cpu.percentUsage() * conf.getLoadBalancerCPUResourceWeight(),
+                memory.percentUsage() * conf.getLoadBalancerMemoryResourceWeight(),
+                directMemory.percentUsage() * conf.getLoadBalancerDirectMemoryResourceWeight(),
+                bandwidthIn.percentUsage() * conf.getLoadBalancerBandwithInResourceWeight(),
+                bandwidthOut.percentUsage() * conf.getLoadBalancerBandwithOutResourceWeight(),
+                getMsgThroughputInUsage(nicSpeedBytesInSec).percentUsage()
+                        * conf.getLoadBalancerBandwithInResourceWeight(),
+                getMsgThroughputOutUsage(nicSpeedBytesInSec).percentUsage()
+                        * conf.getLoadBalancerBandwithOutResourceWeight())
+                / 100;
+    }
+
+    public double getMaxResourceUsage(ServiceConfiguration conf) {
+        return max(
+                cpu.percentUsage() * conf.getLoadBalancerCPUResourceWeight(),
+                memory.percentUsage() * conf.getLoadBalancerMemoryResourceWeight(),
+                directMemory.percentUsage() * conf.getLoadBalancerDirectMemoryResourceWeight(),
+                bandwidthIn.percentUsage() * conf.getLoadBalancerBandwithInResourceWeight(),
+                bandwidthOut.percentUsage() * conf.getLoadBalancerBandwithOutResourceWeight())
+                / 100;
+    }
+
+    private static double maxWithinLimit(double limit, double...args) {
+        double max = 0.0;
+        for (double d : args) {
+            if (d > max && d <= limit) {
+                max = d;
+            }
+        }
+        return max;
+    }
+
+    private static double max(double...args) {
+        double max = Double.NEGATIVE_INFINITY;
+
+        for (double d : args) {
+            if (d > max) {
+                max = d;
+            }
+        }
+
+        return max;
+    }

Review Comment:
   Duplicated with `LocalBrokerData`



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLoadData.java:
##########
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.data;
+
+import lombok.Data;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
+import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
+
+/**
+ * Contains all the data that is maintained locally on each broker.
+ *
+ * Migrate from {@link org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData}.
+ * And removed the lookup data, see {@link BrokerLookupData}
+ */
+@Data
+public class BrokerLoadData {
+    private static final double gigaBitToByte = 128 * 1024 * 1024.0;
+
+    // Most recently available system resource usage.
+    private ResourceUsage cpu;
+    private ResourceUsage memory;
+    private ResourceUsage directMemory;
+
+    private ResourceUsage bandwidthIn;
+    private ResourceUsage bandwidthOut;
+
+    // Message data from the most recent namespace bundle stats.
+    private double msgThroughputIn;
+    private ResourceUsage msgThroughputInUsage;
+    private double msgThroughputOut;
+    private ResourceUsage msgThroughputOutUsage;
+    private double msgRateIn;
+    private double msgRateOut;
+
+    public BrokerLoadData() {
+        cpu = new ResourceUsage();
+        memory = new ResourceUsage();
+        directMemory = new ResourceUsage();
+        bandwidthIn = new ResourceUsage();
+        bandwidthOut = new ResourceUsage();
+        msgThroughputInUsage = new ResourceUsage();
+        msgThroughputOutUsage = new ResourceUsage();
+    }
+
+    /**
+     * Using the system resource usage and bundle stats acquired from the Pulsar client, update this LocalBrokerData.
+     *
+     * @param systemResourceUsage
+     *            System resource usage (cpu, memory, and direct memory).
+     */
+    public void update(final SystemResourceUsage systemResourceUsage) {
+        updateSystemResourceUsage(systemResourceUsage);
+    }
+
+    /**
+     * Using another LocalBrokerData, update this.
+     *
+     * @param other
+     *            LocalBrokerData to update from.
+     */
+    public void update(final BrokerLoadData other) {
+        updateSystemResourceUsage(other.cpu, other.memory, other.directMemory, other.bandwidthIn, other.bandwidthOut);
+    }
+
+    // Set the cpu, memory, and direct memory to that of the new system resource usage data.
+    private void updateSystemResourceUsage(final SystemResourceUsage systemResourceUsage) {
+        updateSystemResourceUsage(systemResourceUsage.cpu, systemResourceUsage.memory, systemResourceUsage.directMemory,
+                systemResourceUsage.bandwidthIn, systemResourceUsage.bandwidthOut);
+    }
+
+    // Update resource usage given each individual usage.
+    private void updateSystemResourceUsage(final ResourceUsage cpu, final ResourceUsage memory,
+                                           final ResourceUsage directMemory, final ResourceUsage bandwidthIn,
+                                           final ResourceUsage bandwidthOut) {
+        this.cpu = cpu;
+        this.memory = memory;
+        this.directMemory = directMemory;
+        this.bandwidthIn = bandwidthIn;
+        this.bandwidthOut = bandwidthOut;
+    }
+
+    public double getMaxResourceUsage() {
+        return max(cpu.percentUsage(), directMemory.percentUsage(), bandwidthIn.percentUsage(),
+                bandwidthOut.percentUsage()) / 100;
+    }
+
+    public double getMaxResourceUsageWithWeight(final double cpuWeight, final double memoryWeight,
+                                                final double directMemoryWeight, final double bandwidthInWeight,
+                                                final double bandwidthOutWeight) {
+        return max(cpu.percentUsage() * cpuWeight, memory.percentUsage() * memoryWeight,
+                directMemory.percentUsage() * directMemoryWeight, bandwidthIn.percentUsage() * bandwidthInWeight,
+                bandwidthOut.percentUsage() * bandwidthOutWeight) / 100;
+    }
+
+    public double getMaxResourceUsageWithWeightWithinLimit(final double cpuWeight, final double memoryWeight,
+                                                           final double directMemoryWeight,
+                                                           final double bandwidthInWeight,
+                                                           final double bandwidthOutWeight) {
+        return maxWithinLimit(100.0d,
+                cpu.percentUsage() * cpuWeight, memory.percentUsage() * memoryWeight,
+                directMemory.percentUsage() * directMemoryWeight, bandwidthIn.percentUsage() * bandwidthInWeight,
+                bandwidthOut.percentUsage() * bandwidthOutWeight) / 100;
+    }
+
+    private static double getNicSpeedBytesInSec(ServiceConfiguration conf) {
+        return conf.getLoadBalancerOverrideBrokerNicSpeedGbps().isPresent()
+                ? conf.getLoadBalancerOverrideBrokerNicSpeedGbps().get() * gigaBitToByte : -1.0;
+    }
+
+    synchronized ResourceUsage getMsgThroughputInUsage(double nicSpeedBytesInSec) {
+        if (msgThroughputInUsage.usage != msgThroughputIn) {
+            msgThroughputInUsage = new ResourceUsage(msgThroughputIn, nicSpeedBytesInSec);
+        }
+        return msgThroughputInUsage;
+    }
+
+    synchronized ResourceUsage getMsgThroughputOutUsage(double nicSpeedBytesInSec) {
+        if (msgThroughputOutUsage.usage != msgThroughputOut) {
+            msgThroughputOutUsage = new ResourceUsage(msgThroughputOut, nicSpeedBytesInSec);
+        }
+        return msgThroughputOutUsage;
+    }
+
+    public double getMaxResourceUsageWithExtendedNetworkSignal(ServiceConfiguration conf) {
+
+        double nicSpeedBytesInSec = getNicSpeedBytesInSec(conf);
+        return maxWithinLimit(100.0d,

Review Comment:
   And we can't apply the limitation of resource usage. 
   https://github.com/apache/pulsar/issues/18598 has provided more details.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLoadData.java:
##########
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.data;
+
+import lombok.Data;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
+import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
+
+/**
+ * Contains all the data that is maintained locally on each broker.
+ *
+ * Migrate from {@link org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData}.
+ * And removed the lookup data, see {@link BrokerLookupData}
+ */
+@Data
+public class BrokerLoadData {
+    private static final double gigaBitToByte = 128 * 1024 * 1024.0;

Review Comment:
   We have `org.apache.pulsar.client.api.SizeUnit`
   
   I think we can add a method in SizeUnit
   
   ```
       public double toBytes(double value) {
           return value * bytes;
       }
   ```
   
   And use the SizeUnit directly. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #18084: [improve][broker] PIP-192: Define new load manager base interfaces

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #18084:
URL: https://github.com/apache/pulsar/pull/18084#discussion_r1025515576


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilter.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.filter;
+
+import java.util.List;
+import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
+import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
+
+/**
+ * Filter out unqualified Brokers, which are not entered into LoadBalancer for decision-making.
+ */
+public interface BrokerFilter {
+
+    /**
+     * The broker filter name.
+     */
+    String name();
+
+    /**
+     * Filter out unqualified brokers based on implementation.
+     *
+     * @param brokers The full brokers.
+     * @param context The load manager context.
+     */
+    void filter(List<String> brokers, LoadManagerContext context) throws BrokerFilterException;

Review Comment:
   Though I see the API of `BrokerFilter` under `org.apache.pulsar.broker.loadbalance` package also returns void. But returning void looks not intuitive for a "filter".
   
   I prefer returning a `List<String>` no matter if a new list or the `brokers` is returned.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] shibd commented on a diff in pull request #18084: [improve][broker] PIP-192: Define new load manager base interfaces

Posted by GitBox <gi...@apache.org>.
shibd commented on code in PR #18084:
URL: https://github.com/apache/pulsar/pull/18084#discussion_r1025928362


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistry.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.metadata.api.NotificationType;
+
+/**
+ * Responsible for registering the current Broker lookup info to
+ * the distributed store (e.g. Zookeeper) for broker discovery.
+ */
+public interface BrokerRegistry extends AutoCloseable {
+
+    /**
+     * Start broker registry.
+     */
+    void start();
+
+    /**
+     * Register local broker to metadata store.
+     */
+    void register();
+
+    /**
+     * Unregister the broker.
+     *
+     * Same as {@link org.apache.pulsar.broker.loadbalance.ModularLoadManager#disableBroker()}
+     */
+    void unregister() throws PulsarServerException;
+
+    /**
+     * Get the current broker lookup service address.
+     *
+     * @return The service url without the protocol prefix, 'http://'. e.g. broker-xyz:port
+     */
+    String getLookupServiceAddress();
+
+    /**
+     * Get available brokers.
+     *
+     * @return The brokers service url list.
+     */
+    List<String> getAvailableBrokers();
+
+    /**
+     * Async get available brokers.
+     *
+     * @return The brokers service url list.
+     */
+    CompletableFuture<List<String>> getAvailableBrokersAsync();
+
+    /**
+     * Fetch local-broker data from load-manager broker cache.

Review Comment:
   Where is load-manager broker cache? 



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistry.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.metadata.api.NotificationType;
+
+/**
+ * Responsible for registering the current Broker lookup info to
+ * the distributed store (e.g. Zookeeper) for broker discovery.
+ */
+public interface BrokerRegistry {

Review Comment:
   Seems better to change to `BrokerLookupDataRegistry` or `LocalBrokerDataRegistry(if you agree to change BrokerLooupData to LocalBrokerData)` ?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStore.java:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.store;
+
+import java.io.Closeable;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
+
+/**
+ * The load data store interface.
+ *
+ * @param <T> The Load data type.
+ */
+public interface LoadDataStore<T> extends Closeable {
+
+    /**
+     * Async push load data to store.
+     *
+     * @param key The load data key. (e.g. bundle)

Review Comment:
   What are the elements of a bundle? Should there be a namespace to distinguish?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/LoadManagerScheduler.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.scheduler;
+
+import java.io.Closeable;
+
+/**
+ * The load manager scheduler.
+ */
+public interface LoadManagerScheduler extends Closeable {

Review Comment:
   Can you explain what this class schedules?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLookupData.java:
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.data;
+
+import java.util.Map;
+import java.util.Optional;
+import org.apache.pulsar.broker.lookup.LookupResult;
+import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener;
+import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData;
+
+/**
+ * Defines the information required to broker lookup.
+ */
+public record BrokerLookupData (String webServiceUrl,

Review Comment:
   Refer [PIP-192](https://github.com/apache/pulsar/issues/16691), this record should be `LocalBrokerData`?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStore.java:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.store;
+
+import java.io.Closeable;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
+
+/**
+ * The load data store interface.
+ *
+ * @param <T> The Load data type.
+ */
+public interface LoadDataStore<T> extends Closeable {

Review Comment:
   Refer PIP, Should change it to `BrokerLoadDataStore`?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManager.java:
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions;
+
+import java.io.Closeable;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.lookup.LookupResult;
+import org.apache.pulsar.broker.namespace.LookupOptions;
+import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.common.naming.NamespaceBundle;
+import org.apache.pulsar.common.naming.ServiceUnitId;
+
+/**
+ * Find the appropriate broker for service unit (e.g. bundle) through different load balancer Implementation.
+ */
+public interface ExtensibleLoadManager extends Closeable {
+
+    /**
+     * Start the extensible load manager.
+     *
+     * 1. Start the broker registry.
+     * 2. Register self to registry.
+     * 3. Start the load data store.
+     * 4. Init the load manager context.
+     * 5. Start load data reporter.
+     * 6. Start the namespace unload scheduler.
+     * 7. Start the namespace split scheduler.
+     * 8. Listen the broker up or down, so we can split immediately.
+     */
+    void start() throws PulsarServerException;
+
+    /**
+     * Initialize this load manager using the given pulsar service.
+     */
+    void initialize(PulsarService pulsar);
+
+    /**
+     * The incoming service unit (e.g. bundle) selects the appropriate broker through strategies.
+     *
+     * @param topic The optional topic, some method won't provide topic var in this param
+     *              (e.g. {@link NamespaceService#internalGetWebServiceUrl(NamespaceBundle, LookupOptions)}),
+     *              So the topic is optional.
+     * @param serviceUnit service unit (e.g. bundle).
+     * @return Simple resource.
+     */
+    CompletableFuture<Optional<LookupResult>> assign(Optional<ServiceUnitId> topic, ServiceUnitId serviceUnit);

Review Comment:
   Who is to invoke this method? Should the method return `BrokerLookupData`? We should minimize coupling with the current implementation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Demogorgon314 commented on a diff in pull request #18084: [improve][broker] PIP-192: Define new load manager base interfaces

Posted by GitBox <gi...@apache.org>.
Demogorgon314 commented on code in PR #18084:
URL: https://github.com/apache/pulsar/pull/18084#discussion_r998891502


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensible/store/LoadDataStore.java:
##########
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensible.store;
+
+import java.io.Closeable;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
+
+/**
+ * The load data store interface.
+ *
+ * @param <T> The Load data type.
+ */
+public interface LoadDataStore<T> extends Closeable {
+
+    /**
+     * Push load data to store.
+     *
+     * @param key
+     *           The load data key.
+     * @param loadData
+     *           The load data.
+     */
+    void push(String key, T loadData) throws LoadDataStoreException;
+
+    /**
+     * Async push load data to store.
+     *
+     * @param key
+     *           The load data key.
+     * @param loadData
+     *           The load data.
+     */
+    CompletableFuture<Void> pushAsync(String key, T loadData);

Review Comment:
   Updated, and preserved the required method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Demogorgon314 commented on a diff in pull request #18084: [improve][broker] PIP-192: Define new load manager base interfaces

Posted by GitBox <gi...@apache.org>.
Demogorgon314 commented on code in PR #18084:
URL: https://github.com/apache/pulsar/pull/18084#discussion_r1036751574


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLoadData.java:
##########
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.data;
+
+import lombok.Data;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
+import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
+
+/**
+ * Contains all the data that is maintained locally on each broker.
+ *
+ * Migrate from {@link org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData}.
+ * And removed the lookup data, see {@link BrokerLookupData}
+ */
+@Data
+public class BrokerLoadData {
+    private static final double gigaBitToByte = 128 * 1024 * 1024.0;

Review Comment:
   Ping @codelipenghui



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] eolivelli commented on a diff in pull request #18084: [improve][broker] PIP-192: Define new load manager base interfaces

Posted by GitBox <gi...@apache.org>.
eolivelli commented on code in PR #18084:
URL: https://github.com/apache/pulsar/pull/18084#discussion_r1030140669


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/NamespaceBundleSplitStrategy.java:
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.scheduler;
+
+import java.util.Set;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
+import org.apache.pulsar.broker.loadbalance.extensions.models.Split;
+
+/**
+ * Determines which bundles should be split based on various thresholds.
+ *
+ * Migrate from {@link org.apache.pulsar.broker.loadbalance.BundleSplitStrategy}
+ */
+public interface NamespaceBundleSplitStrategy {
+
+    /**
+     * Determines which bundles, if any, should be split.
+     *
+     * @param context The context used for decisions.
+     * @param pulsarService Pulsar service to use.
+     * @return A set of the bundles that should be split.
+     */
+    Set<Split> findBundlesToSplit(LoadManagerContext context, PulsarService pulsarService);

Review Comment:
   why do we need to pass PulsarService pulsarService here ?
   it is not an "interface" so we are leaking some "implementation details"
   
   in any case....won't it be injected to implementations at construction time ?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] heesung-sn commented on a diff in pull request #18084: [improve][broker] PIP-192: Define new load manager base interfaces

Posted by GitBox <gi...@apache.org>.
heesung-sn commented on code in PR #18084:
URL: https://github.com/apache/pulsar/pull/18084#discussion_r998433901


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensible/BrokerRegistry.java:
##########
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensible;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.loadbalance.extensible.data.BrokerLookupData;
+import org.apache.pulsar.metadata.api.NotificationType;
+
+/**
+ * Responsible for registering the current Broker lookup info to
+ * the distributed store (e.g. Zookeeper) for broker discovery.
+ */
+public interface BrokerRegistry {
+
+    /**
+     * Start broker registry.
+     */
+    void start();
+
+    /**
+     * Register local broker to metadata store.
+     */
+    void register();
+
+    /**
+     * Unregister the broker.
+     *
+     * Same as {@link org.apache.pulsar.broker.loadbalance.ModularLoadManager#disableBroker()}
+     */
+    void unregister() throws PulsarServerException;
+
+    /**
+     * Get the current broker lookup service address.
+     */
+    String getLookupServiceAddress();
+
+    /**
+     * Get available brokers.
+     */
+    List<String> getAvailableBrokers();
+
+    /**
+     * Async get available brokers.
+     */
+    CompletableFuture<List<String>> getAvailableBrokersAsync();
+
+    /**
+     * Fetch local-broker data from load-manager broker cache.
+     *
+     * @param broker The load-balancer path.
+     */
+    Optional<BrokerLookupData> lookup(String broker);
+
+    /**
+     * For each the broker lookup data.
+     * The key is lookupServiceAddress
+     */
+    void forEach(BiConsumer<String, BrokerLookupData> action);
+
+    /**
+     * Listen the broker register change.
+     */
+    void listen(BiConsumer<String, NotificationType> listener);
+
+    /**
+     * Close the broker registry.
+     */
+    void close() throws Exception;

Review Comment:
   should we throw `PulsarServerException` instead?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensible/data/BrokerLoadData.java:
##########
@@ -0,0 +1,324 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensible.data;
+
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import lombok.Data;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicDomain;
+import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
+import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
+import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
+import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
+
+/**
+ * Contains all the data that is maintained locally on each broker.
+ *
+ * Migrate from {@link org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData}.
+ * And removed the lookup data, see {@link BrokerLookupData}
+ */
+@Data
+public class BrokerLoadData {
+
+    public static final String TOPIC =
+            TopicDomain.non_persistent
+                    + "://"
+                    + NamespaceName.SYSTEM_NAMESPACE
+                    + "/broker-load-data";
+    private static final double gigaBitToByte = 128 * 1024 * 1024.0;
+
+    // Most recently available system resource usage.
+    private ResourceUsage cpu;
+    private ResourceUsage memory;
+    private ResourceUsage directMemory;
+
+    private ResourceUsage bandwidthIn;
+    private ResourceUsage bandwidthOut;
+
+    // Message data from the most recent namespace bundle stats.
+    private double msgThroughputIn;
+    private ResourceUsage msgThroughputInUsage;
+    private double msgThroughputOut;
+    private ResourceUsage msgThroughputOutUsage;
+    private double msgRateIn;
+    private double msgRateOut;
+
+    // Timestamp of last update.
+    private long lastUpdate;
+
+    // The stats given in the most recent invocation of update.
+    private Map<String, NamespaceBundleStats> lastStats;

Review Comment:
   Please note that when we serialize this object to JSON(when publishing this object to the `BrokerLoadDataStore`). 
   
   We need to ignore the majority of these parameters (e.g. bundles and lastStats) to minimize the payload size. 



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensible/data/BrokerLoadDataTest.java:
##########
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensible.data;
+
+import static org.testng.Assert.assertEquals;
+
+import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker")
+public class BrokerLoadDataTest {

Review Comment:
   I think we need to add more unit tests for the public methods in this class.
   
   Please add `//: TODO` comment not to forget this work.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensible/BrokerRegistry.java:
##########
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensible;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.loadbalance.extensible.data.BrokerLookupData;
+import org.apache.pulsar.metadata.api.NotificationType;
+
+/**
+ * Responsible for registering the current Broker lookup info to
+ * the distributed store (e.g. Zookeeper) for broker discovery.
+ */
+public interface BrokerRegistry {
+
+    /**
+     * Start broker registry.
+     */
+    void start();
+
+    /**
+     * Register local broker to metadata store.
+     */
+    void register();
+
+    /**
+     * Unregister the broker.
+     *
+     * Same as {@link org.apache.pulsar.broker.loadbalance.ModularLoadManager#disableBroker()}
+     */
+    void unregister() throws PulsarServerException;
+
+    /**
+     * Get the current broker lookup service address.
+     */
+    String getLookupServiceAddress();
+
+    /**
+     * Get available brokers.
+     */
+    List<String> getAvailableBrokers();
+
+    /**
+     * Async get available brokers.
+     */
+    CompletableFuture<List<String>> getAvailableBrokersAsync();
+
+    /**
+     * Fetch local-broker data from load-manager broker cache.
+     *
+     * @param broker The load-balancer path.

Review Comment:
   nit: can we specify the broker param format with an example?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensible/BrokerRegistry.java:
##########
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensible;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.loadbalance.extensible.data.BrokerLookupData;
+import org.apache.pulsar.metadata.api.NotificationType;
+
+/**
+ * Responsible for registering the current Broker lookup info to
+ * the distributed store (e.g. Zookeeper) for broker discovery.
+ */
+public interface BrokerRegistry {
+
+    /**
+     * Start broker registry.
+     */
+    void start();
+
+    /**
+     * Register local broker to metadata store.
+     */
+    void register();
+
+    /**
+     * Unregister the broker.
+     *
+     * Same as {@link org.apache.pulsar.broker.loadbalance.ModularLoadManager#disableBroker()}
+     */
+    void unregister() throws PulsarServerException;
+
+    /**
+     * Get the current broker lookup service address.

Review Comment:
   nit: can we explain more about the `lookup service address` here?( format specification and example)



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensible/ExtensibleLoadManager.java:
##########
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensible;
+
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.lookup.LookupResult;
+import org.apache.pulsar.common.naming.ServiceUnitId;
+
+/**
+ * Find the appropriate broker for bundle through different load balancer Implementation.
+ */
+public interface ExtensibleLoadManager {
+
+    /**
+     * Start the extensible load manager.
+     *
+     * 1. Start the broker registry.
+     * 2. Register self to registry.
+     * 3. Start the load data store.
+     * 4. Init the load manager context.
+     * 5. Start load data reporter.
+     * 6. Start the namespace unload scheduler.
+     * 7. Start the namespace split scheduler.
+     * 8. Listen the broker up or down, so we can split immediately.
+     */
+    void start() throws PulsarServerException;
+
+    /**
+     * Initialize this load manager using the given pulsar service.
+     */
+    void initialize(PulsarService pulsar);
+
+    /**
+     * The incoming bundle selects the appropriate broker through strategies.

Review Comment:
   nit: bundle selects -> service unit (e.g bundle) selects.
   
   I think we could use a generic term, `service unit` instead of bundle in our docs and variables.
   Please update this in the other places too.
   



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensible/ExtensibleLoadManager.java:
##########
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensible;
+
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.lookup.LookupResult;
+import org.apache.pulsar.common.naming.ServiceUnitId;
+
+/**
+ * Find the appropriate broker for bundle through different load balancer Implementation.
+ */
+public interface ExtensibleLoadManager {
+
+    /**
+     * Start the extensible load manager.
+     *
+     * 1. Start the broker registry.
+     * 2. Register self to registry.
+     * 3. Start the load data store.
+     * 4. Init the load manager context.
+     * 5. Start load data reporter.
+     * 6. Start the namespace unload scheduler.
+     * 7. Start the namespace split scheduler.
+     * 8. Listen the broker up or down, so we can split immediately.
+     */
+    void start() throws PulsarServerException;
+
+    /**
+     * Initialize this load manager using the given pulsar service.
+     */
+    void initialize(PulsarService pulsar);
+
+    /**
+     * The incoming bundle selects the appropriate broker through strategies.
+     *
+     * @param serviceUnit Bundle.
+     * @return Simple resource.
+     */
+    Optional<String> discover(ServiceUnitId serviceUnit);

Review Comment:
   In the PoC, we only call `discover` inside `assign`. Do we need to expose this discover in this interface?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensible/ExtensibleLoadManager.java:
##########
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensible;
+
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.lookup.LookupResult;
+import org.apache.pulsar.common.naming.ServiceUnitId;
+
+/**
+ * Find the appropriate broker for bundle through different load balancer Implementation.
+ */
+public interface ExtensibleLoadManager {
+
+    /**
+     * Start the extensible load manager.
+     *
+     * 1. Start the broker registry.
+     * 2. Register self to registry.
+     * 3. Start the load data store.
+     * 4. Init the load manager context.
+     * 5. Start load data reporter.
+     * 6. Start the namespace unload scheduler.
+     * 7. Start the namespace split scheduler.
+     * 8. Listen the broker up or down, so we can split immediately.
+     */
+    void start() throws PulsarServerException;
+
+    /**
+     * Initialize this load manager using the given pulsar service.
+     */
+    void initialize(PulsarService pulsar);
+
+    /**
+     * The incoming bundle selects the appropriate broker through strategies.
+     *
+     * @param serviceUnit Bundle.
+     * @return Simple resource.
+     */
+    Optional<String> discover(ServiceUnitId serviceUnit);
+
+
+    /**
+     * The incoming bundle selects the appropriate broker through strategies.
+     *
+     * @param serviceUnit Bundle.
+     * @return Simple resource.
+     */
+    CompletableFuture<Optional<LookupResult>> assign(Optional<ServiceUnitId> topic, ServiceUnitId serviceUnit);

Review Comment:
   nit: Ideally, I think we need to expect the `topic` to be non-optional. I remember This was optional in PoC because some of the callers do not have the topic var in their stack.
   `
   private CompletableFuture<Optional<URL>> internalGetWebServiceUrl(NamespaceBundle bundle, LookupOptions options) {`
   
   



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensible/models/Split.java:
##########
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensible.models;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * Defines the information required for a bundle split.
+ */
+public record Split(String bundle, String sourceBroker, Map<String, Optional<String>> splitBundleToDestBroker) {

Review Comment:
   I see this `Split` and `Unload` records have been added in another PR. Hopefully, this does not conflict. https://github.com/apache/pulsar/pull/18079



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensible/data/TopBundlesLoadData.java:
##########
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensible.data;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import lombok.Getter;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicDomain;
+import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
+
+@Getter
+public class TopBundlesLoadData {
+
+    public static final String TOPIC =
+            TopicDomain.non_persistent
+                    + "://"
+                    + NamespaceName.SYSTEM_NAMESPACE
+                    + "/top-bundle-load-data";
+
+    @JsonProperty("top_bundles_load_data")

Review Comment:
   Let's remove this JsonProperty("top_bundles_load_data") and confirm if this is really required.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensible/store/LoadDataStore.java:
##########
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensible.store;
+
+import java.io.Closeable;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
+
+/**
+ * The load data store interface.
+ *
+ * @param <T> The Load data type.
+ */
+public interface LoadDataStore<T> extends Closeable {
+
+    /**
+     * Push load data to store.
+     *
+     * @param key
+     *           The load data key.
+     * @param loadData
+     *           The load data.
+     */
+    void push(String key, T loadData) throws LoadDataStoreException;
+
+    /**
+     * Async push load data to store.
+     *
+     * @param key
+     *           The load data key.
+     * @param loadData
+     *           The load data.
+     */
+    CompletableFuture<Void> pushAsync(String key, T loadData);

Review Comment:
   I think we mostly need pushAsync() and get().
   
   Can we start with the required ones first and add them later if required?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensible/data/BrokerLoadData.java:
##########
@@ -0,0 +1,324 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensible.data;
+
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import lombok.Data;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicDomain;
+import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
+import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
+import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
+import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
+
+/**
+ * Contains all the data that is maintained locally on each broker.
+ *
+ * Migrate from {@link org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData}.
+ * And removed the lookup data, see {@link BrokerLookupData}
+ */
+@Data
+public class BrokerLoadData {
+
+    public static final String TOPIC =
+            TopicDomain.non_persistent
+                    + "://"
+                    + NamespaceName.SYSTEM_NAMESPACE
+                    + "/broker-load-data";
+    private static final double gigaBitToByte = 128 * 1024 * 1024.0;
+
+    // Most recently available system resource usage.
+    private ResourceUsage cpu;
+    private ResourceUsage memory;
+    private ResourceUsage directMemory;
+
+    private ResourceUsage bandwidthIn;
+    private ResourceUsage bandwidthOut;
+
+    // Message data from the most recent namespace bundle stats.
+    private double msgThroughputIn;
+    private ResourceUsage msgThroughputInUsage;
+    private double msgThroughputOut;
+    private ResourceUsage msgThroughputOutUsage;
+    private double msgRateIn;
+    private double msgRateOut;
+
+    // Timestamp of last update.
+    private long lastUpdate;
+
+    // The stats given in the most recent invocation of update.
+    private Map<String, NamespaceBundleStats> lastStats;
+
+    private int numTopics;
+    private int numBundles;
+    private int numConsumers;
+    private int numProducers;
+
+    // All bundles belonging to this broker.
+    private Set<String> bundles;
+
+    // The bundles gained since the last invocation of update.
+    private Set<String> lastBundleGains;
+
+    // The bundles lost since the last invocation of update.
+    private Set<String> lastBundleLosses;
+
+    public BrokerLoadData() {
+        lastStats = new ConcurrentHashMap<>();
+        lastUpdate = System.currentTimeMillis();
+        cpu = new ResourceUsage();
+        memory = new ResourceUsage();
+        directMemory = new ResourceUsage();
+        bandwidthIn = new ResourceUsage();
+        bandwidthOut = new ResourceUsage();
+        msgThroughputInUsage = new ResourceUsage();
+        msgThroughputOutUsage = new ResourceUsage();
+        bundles = new HashSet<>();
+        lastBundleGains = new HashSet<>();
+        lastBundleLosses = new HashSet<>();
+    }
+
+    /**
+     * Using the system resource usage and bundle stats acquired from the Pulsar client, update this LocalBrokerData.
+     *
+     * @param systemResourceUsage
+     *            System resource usage (cpu, memory, and direct memory).
+     * @param bundleStats
+     *            The bundle stats retrieved from the Pulsar client.
+     */
+    public void update(final SystemResourceUsage systemResourceUsage,
+                       final Map<String, NamespaceBundleStats> bundleStats) {
+        updateSystemResourceUsage(systemResourceUsage);
+        updateBundleData(bundleStats);
+        lastStats = bundleStats;
+    }
+
+    /**
+     * Using another LocalBrokerData, update this.
+     *
+     * @param other
+     *            LocalBrokerData to update from.
+     */
+    public void update(final BrokerLoadData other) {
+        updateSystemResourceUsage(other.cpu, other.memory, other.directMemory, other.bandwidthIn, other.bandwidthOut);
+        updateBundleData(other.lastStats);
+        lastStats = other.lastStats;
+    }
+
+    // Set the cpu, memory, and direct memory to that of the new system resource usage data.
+    private void updateSystemResourceUsage(final SystemResourceUsage systemResourceUsage) {
+        updateSystemResourceUsage(systemResourceUsage.cpu, systemResourceUsage.memory, systemResourceUsage.directMemory,
+                systemResourceUsage.bandwidthIn, systemResourceUsage.bandwidthOut);
+    }
+
+    // Update resource usage given each individual usage.
+    private void updateSystemResourceUsage(final ResourceUsage cpu, final ResourceUsage memory,
+                                           final ResourceUsage directMemory, final ResourceUsage bandwidthIn,
+                                           final ResourceUsage bandwidthOut) {
+        this.cpu = cpu;
+        this.memory = memory;
+        this.directMemory = directMemory;
+        this.bandwidthIn = bandwidthIn;
+        this.bandwidthOut = bandwidthOut;
+    }
+
+    // Aggregate all message, throughput, topic count, bundle count, consumer
+    // count, and producer count across the
+    // given data. Also keep track of bundle gains and losses.
+    private void updateBundleData(final Map<String, NamespaceBundleStats> bundleStats) {
+        msgRateIn = 0;
+        msgRateOut = 0;
+        msgThroughputIn = 0;
+        msgThroughputOut = 0;
+        int totalNumTopics = 0;
+        int totalNumBundles = 0;
+        int totalNumConsumers = 0;
+        int totalNumProducers = 0;
+        final Iterator<String> oldBundleIterator = bundles.iterator();
+        while (oldBundleIterator.hasNext()) {
+            final String bundle = oldBundleIterator.next();
+            if (!bundleStats.containsKey(bundle)) {
+                // If this bundle is in the old bundle set but not the new one,
+                // we lost it.
+                lastBundleLosses.add(bundle);
+                oldBundleIterator.remove();
+            }
+        }
+        for (Map.Entry<String, NamespaceBundleStats> entry : bundleStats.entrySet()) {
+            final String bundle = entry.getKey();
+            final NamespaceBundleStats stats = entry.getValue();
+            if (!bundles.contains(bundle)) {
+                // If this bundle is in the new bundle set but not the old one,
+                // we gained it.
+                lastBundleGains.add(bundle);
+                bundles.add(bundle);
+            }
+            msgThroughputIn += stats.msgThroughputIn;
+            msgThroughputOut += stats.msgThroughputOut;
+            msgRateIn += stats.msgRateIn;
+            msgRateOut += stats.msgRateOut;
+            totalNumTopics += stats.topics;
+            ++totalNumBundles;
+            totalNumConsumers += stats.consumerCount;
+            totalNumProducers += stats.producerCount;
+        }
+        numTopics = totalNumTopics;
+        numBundles = totalNumBundles;
+        numConsumers = totalNumConsumers;
+        numProducers = totalNumProducers;
+    }
+
+    public void cleanDeltas() {
+        lastBundleGains.clear();
+        lastBundleLosses.clear();
+    }
+
+    public double getMaxResourceUsage() {
+        return max(cpu.percentUsage(), directMemory.percentUsage(), bandwidthIn.percentUsage(),
+                bandwidthOut.percentUsage()) / 100;
+    }
+
+    public double getMaxResourceUsageWithWeight(final double cpuWeight, final double memoryWeight,
+                                                final double directMemoryWeight, final double bandwidthInWeight,
+                                                final double bandwidthOutWeight) {
+        return max(cpu.percentUsage() * cpuWeight, memory.percentUsage() * memoryWeight,
+                directMemory.percentUsage() * directMemoryWeight, bandwidthIn.percentUsage() * bandwidthInWeight,
+                bandwidthOut.percentUsage() * bandwidthOutWeight) / 100;
+    }
+
+    public double getMaxResourceUsageWithWeightWithinLimit(final double cpuWeight, final double memoryWeight,
+                                                           final double directMemoryWeight,
+                                                           final double bandwidthInWeight,
+                                                           final double bandwidthOutWeight) {
+        return maxWithinLimit(100.0d,
+                cpu.percentUsage() * cpuWeight, memory.percentUsage() * memoryWeight,
+                directMemory.percentUsage() * directMemoryWeight, bandwidthIn.percentUsage() * bandwidthInWeight,
+                bandwidthOut.percentUsage() * bandwidthOutWeight) / 100;
+    }
+
+    private static double getNicSpeedBytesInSec(ServiceConfiguration conf) {
+        return conf.getLoadBalancerOverrideBrokerNicSpeedGbps().isPresent()
+                ? conf.getLoadBalancerOverrideBrokerNicSpeedGbps().get() * gigaBitToByte : -1.0;
+    }
+
+    synchronized ResourceUsage getMsgThroughputInUsage(double nicSpeedBytesInSec) {
+        if (msgThroughputInUsage.usage != msgThroughputIn) {
+            msgThroughputInUsage = new ResourceUsage(msgThroughputIn, nicSpeedBytesInSec);
+        }
+        return msgThroughputInUsage;
+    }
+
+    synchronized ResourceUsage getMsgThroughputOutUsage(double nicSpeedBytesInSec) {
+        if (msgThroughputOutUsage.usage != msgThroughputOut) {
+            msgThroughputOutUsage = new ResourceUsage(msgThroughputOut, nicSpeedBytesInSec);
+        }
+        return msgThroughputOutUsage;
+    }
+
+    public double getMaxResourceUsageWithExtendedNetworkSignal(ServiceConfiguration conf) {
+
+        double nicSpeedBytesInSec = getNicSpeedBytesInSec(conf);
+        return maxWithinLimit(100.0d,
+                cpu.percentUsage() * conf.getLoadBalancerCPUResourceWeight(),
+                memory.percentUsage() * conf.getLoadBalancerMemoryResourceWeight(),
+                directMemory.percentUsage() * conf.getLoadBalancerDirectMemoryResourceWeight(),
+                bandwidthIn.percentUsage() * conf.getLoadBalancerBandwithInResourceWeight(),
+                bandwidthOut.percentUsage() * conf.getLoadBalancerBandwithOutResourceWeight(),
+                getMsgThroughputInUsage(nicSpeedBytesInSec).percentUsage()
+                        * conf.getLoadBalancerBandwithInResourceWeight(),

Review Comment:
   This change will require updates in BrokerConfiguration. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] heesung-sn commented on a diff in pull request #18084: [improve][broker] PIP-192: Define new load manager base interfaces

Posted by GitBox <gi...@apache.org>.
heesung-sn commented on code in PR #18084:
URL: https://github.com/apache/pulsar/pull/18084#discussion_r999670196


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensible/data/BrokerLoadData.java:
##########
@@ -0,0 +1,324 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensible.data;
+
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import lombok.Data;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicDomain;
+import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
+import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
+import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
+import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
+
+/**
+ * Contains all the data that is maintained locally on each broker.
+ *
+ * Migrate from {@link org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData}.
+ * And removed the lookup data, see {@link BrokerLookupData}
+ */
+@Data
+public class BrokerLoadData {
+
+    public static final String TOPIC =
+            TopicDomain.non_persistent
+                    + "://"
+                    + NamespaceName.SYSTEM_NAMESPACE
+                    + "/broker-load-data";
+    private static final double gigaBitToByte = 128 * 1024 * 1024.0;
+
+    // Most recently available system resource usage.
+    private ResourceUsage cpu;
+    private ResourceUsage memory;
+    private ResourceUsage directMemory;
+
+    private ResourceUsage bandwidthIn;
+    private ResourceUsage bandwidthOut;
+
+    // Message data from the most recent namespace bundle stats.
+    private double msgThroughputIn;
+    private ResourceUsage msgThroughputInUsage;
+    private double msgThroughputOut;
+    private ResourceUsage msgThroughputOutUsage;
+    private double msgRateIn;
+    private double msgRateOut;
+
+    // Timestamp of last update.
+    private long lastUpdate;
+
+    // The stats given in the most recent invocation of update.
+    private Map<String, NamespaceBundleStats> lastStats;
+
+    private int numTopics;
+    private int numBundles;
+    private int numConsumers;
+    private int numProducers;
+
+    // All bundles belonging to this broker.
+    private Set<String> bundles;
+
+    // The bundles gained since the last invocation of update.
+    private Set<String> lastBundleGains;
+
+    // The bundles lost since the last invocation of update.
+    private Set<String> lastBundleLosses;
+
+    public BrokerLoadData() {
+        lastStats = new ConcurrentHashMap<>();
+        lastUpdate = System.currentTimeMillis();
+        cpu = new ResourceUsage();
+        memory = new ResourceUsage();
+        directMemory = new ResourceUsage();
+        bandwidthIn = new ResourceUsage();
+        bandwidthOut = new ResourceUsage();
+        msgThroughputInUsage = new ResourceUsage();
+        msgThroughputOutUsage = new ResourceUsage();
+        bundles = new HashSet<>();
+        lastBundleGains = new HashSet<>();
+        lastBundleLosses = new HashSet<>();
+    }
+
+    /**
+     * Using the system resource usage and bundle stats acquired from the Pulsar client, update this LocalBrokerData.
+     *
+     * @param systemResourceUsage
+     *            System resource usage (cpu, memory, and direct memory).
+     * @param bundleStats
+     *            The bundle stats retrieved from the Pulsar client.
+     */
+    public void update(final SystemResourceUsage systemResourceUsage,
+                       final Map<String, NamespaceBundleStats> bundleStats) {
+        updateSystemResourceUsage(systemResourceUsage);
+        updateBundleData(bundleStats);
+        lastStats = bundleStats;
+    }
+
+    /**
+     * Using another LocalBrokerData, update this.
+     *
+     * @param other
+     *            LocalBrokerData to update from.
+     */
+    public void update(final BrokerLoadData other) {
+        updateSystemResourceUsage(other.cpu, other.memory, other.directMemory, other.bandwidthIn, other.bandwidthOut);
+        updateBundleData(other.lastStats);
+        lastStats = other.lastStats;
+    }
+
+    // Set the cpu, memory, and direct memory to that of the new system resource usage data.
+    private void updateSystemResourceUsage(final SystemResourceUsage systemResourceUsage) {
+        updateSystemResourceUsage(systemResourceUsage.cpu, systemResourceUsage.memory, systemResourceUsage.directMemory,
+                systemResourceUsage.bandwidthIn, systemResourceUsage.bandwidthOut);
+    }
+
+    // Update resource usage given each individual usage.
+    private void updateSystemResourceUsage(final ResourceUsage cpu, final ResourceUsage memory,
+                                           final ResourceUsage directMemory, final ResourceUsage bandwidthIn,
+                                           final ResourceUsage bandwidthOut) {
+        this.cpu = cpu;
+        this.memory = memory;
+        this.directMemory = directMemory;
+        this.bandwidthIn = bandwidthIn;
+        this.bandwidthOut = bandwidthOut;
+    }
+
+    // Aggregate all message, throughput, topic count, bundle count, consumer
+    // count, and producer count across the
+    // given data. Also keep track of bundle gains and losses.
+    private void updateBundleData(final Map<String, NamespaceBundleStats> bundleStats) {
+        msgRateIn = 0;
+        msgRateOut = 0;
+        msgThroughputIn = 0;
+        msgThroughputOut = 0;
+        int totalNumTopics = 0;
+        int totalNumBundles = 0;
+        int totalNumConsumers = 0;
+        int totalNumProducers = 0;
+        final Iterator<String> oldBundleIterator = bundles.iterator();
+        while (oldBundleIterator.hasNext()) {
+            final String bundle = oldBundleIterator.next();
+            if (!bundleStats.containsKey(bundle)) {
+                // If this bundle is in the old bundle set but not the new one,
+                // we lost it.
+                lastBundleLosses.add(bundle);
+                oldBundleIterator.remove();
+            }
+        }
+        for (Map.Entry<String, NamespaceBundleStats> entry : bundleStats.entrySet()) {
+            final String bundle = entry.getKey();
+            final NamespaceBundleStats stats = entry.getValue();
+            if (!bundles.contains(bundle)) {
+                // If this bundle is in the new bundle set but not the old one,
+                // we gained it.
+                lastBundleGains.add(bundle);
+                bundles.add(bundle);
+            }
+            msgThroughputIn += stats.msgThroughputIn;
+            msgThroughputOut += stats.msgThroughputOut;
+            msgRateIn += stats.msgRateIn;
+            msgRateOut += stats.msgRateOut;
+            totalNumTopics += stats.topics;
+            ++totalNumBundles;
+            totalNumConsumers += stats.consumerCount;
+            totalNumProducers += stats.producerCount;
+        }
+        numTopics = totalNumTopics;
+        numBundles = totalNumBundles;
+        numConsumers = totalNumConsumers;
+        numProducers = totalNumProducers;
+    }
+
+    public void cleanDeltas() {
+        lastBundleGains.clear();
+        lastBundleLosses.clear();
+    }
+
+    public double getMaxResourceUsage() {
+        return max(cpu.percentUsage(), directMemory.percentUsage(), bandwidthIn.percentUsage(),
+                bandwidthOut.percentUsage()) / 100;
+    }
+
+    public double getMaxResourceUsageWithWeight(final double cpuWeight, final double memoryWeight,
+                                                final double directMemoryWeight, final double bandwidthInWeight,
+                                                final double bandwidthOutWeight) {
+        return max(cpu.percentUsage() * cpuWeight, memory.percentUsage() * memoryWeight,
+                directMemory.percentUsage() * directMemoryWeight, bandwidthIn.percentUsage() * bandwidthInWeight,
+                bandwidthOut.percentUsage() * bandwidthOutWeight) / 100;
+    }
+
+    public double getMaxResourceUsageWithWeightWithinLimit(final double cpuWeight, final double memoryWeight,
+                                                           final double directMemoryWeight,
+                                                           final double bandwidthInWeight,
+                                                           final double bandwidthOutWeight) {
+        return maxWithinLimit(100.0d,
+                cpu.percentUsage() * cpuWeight, memory.percentUsage() * memoryWeight,
+                directMemory.percentUsage() * directMemoryWeight, bandwidthIn.percentUsage() * bandwidthInWeight,
+                bandwidthOut.percentUsage() * bandwidthOutWeight) / 100;
+    }
+
+    private static double getNicSpeedBytesInSec(ServiceConfiguration conf) {
+        return conf.getLoadBalancerOverrideBrokerNicSpeedGbps().isPresent()
+                ? conf.getLoadBalancerOverrideBrokerNicSpeedGbps().get() * gigaBitToByte : -1.0;
+    }
+
+    synchronized ResourceUsage getMsgThroughputInUsage(double nicSpeedBytesInSec) {
+        if (msgThroughputInUsage.usage != msgThroughputIn) {
+            msgThroughputInUsage = new ResourceUsage(msgThroughputIn, nicSpeedBytesInSec);
+        }
+        return msgThroughputInUsage;
+    }
+
+    synchronized ResourceUsage getMsgThroughputOutUsage(double nicSpeedBytesInSec) {
+        if (msgThroughputOutUsage.usage != msgThroughputOut) {
+            msgThroughputOutUsage = new ResourceUsage(msgThroughputOut, nicSpeedBytesInSec);
+        }
+        return msgThroughputOutUsage;
+    }
+
+    public double getMaxResourceUsageWithExtendedNetworkSignal(ServiceConfiguration conf) {
+
+        double nicSpeedBytesInSec = getNicSpeedBytesInSec(conf);
+        return maxWithinLimit(100.0d,
+                cpu.percentUsage() * conf.getLoadBalancerCPUResourceWeight(),
+                memory.percentUsage() * conf.getLoadBalancerMemoryResourceWeight(),
+                directMemory.percentUsage() * conf.getLoadBalancerDirectMemoryResourceWeight(),
+                bandwidthIn.percentUsage() * conf.getLoadBalancerBandwithInResourceWeight(),
+                bandwidthOut.percentUsage() * conf.getLoadBalancerBandwithOutResourceWeight(),
+                getMsgThroughputInUsage(nicSpeedBytesInSec).percentUsage()
+                        * conf.getLoadBalancerBandwithInResourceWeight(),

Review Comment:
   sure. we can add it later when needed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Demogorgon314 commented on pull request #18084: [improve][broker] PIP-192: Define new load manager base interfaces

Posted by GitBox <gi...@apache.org>.
Demogorgon314 commented on PR #18084:
URL: https://github.com/apache/pulsar/pull/18084#issuecomment-1313071919

   Ping @eolivelli @codelipenghui @BewareMyPower @shibd 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #18084: [improve][broker] PIP-192: Define new load manager base interfaces

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #18084:
URL: https://github.com/apache/pulsar/pull/18084#discussion_r1025980203


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistry.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.metadata.api.NotificationType;
+
+/**
+ * Responsible for registering the current Broker lookup info to
+ * the distributed store (e.g. Zookeeper) for broker discovery.
+ */
+public interface BrokerRegistry {
+
+    /**
+     * Start broker registry.
+     */
+    void start();
+
+    /**
+     * Register local broker to metadata store.
+     */
+    void register();
+
+    /**
+     * Unregister the broker.
+     *
+     * Same as {@link org.apache.pulsar.broker.loadbalance.ModularLoadManager#disableBroker()}
+     */
+    void unregister() throws PulsarServerException;

Review Comment:
   It makes sense to me. But for my 2nd question, should we not catch any exception for other methods like `register` and `start`, but we need to catch `PulsarServerException` for `unregister`? It looks a little unnatural.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Demogorgon314 commented on a diff in pull request #18084: [improve][broker] PIP-192: Define new load manager base interfaces

Posted by GitBox <gi...@apache.org>.
Demogorgon314 commented on code in PR #18084:
URL: https://github.com/apache/pulsar/pull/18084#discussion_r1016623764


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensible/BrokerRegistry.java:
##########
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensible;

Review Comment:
   I will more prefer to use `org.apache.pulsar.broker.loadbalance.extensions` or `org.apache.pulsar.broker.loadbalance.extensible`. 
   
   @heesung-sn Do you have any other opinions?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Demogorgon314 commented on a diff in pull request #18084: [improve][broker] PIP-192: Define new load manager base interfaces

Posted by GitBox <gi...@apache.org>.
Demogorgon314 commented on code in PR #18084:
URL: https://github.com/apache/pulsar/pull/18084#discussion_r1030269101


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistry.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.metadata.api.NotificationType;
+
+/**
+ * Responsible for registering the current Broker lookup info to
+ * the distributed store (e.g. Zookeeper) for broker discovery.
+ */
+public interface BrokerRegistry extends AutoCloseable {
+
+    /**
+     * Start broker registry.
+     */
+    void start() throws PulsarServerException;
+
+    /**
+     * Register local broker to metadata store.
+     */
+    void register() throws MetadataStoreException;
+
+    /**
+     * Unregister the broker.
+     *
+     * Same as {@link org.apache.pulsar.broker.loadbalance.ModularLoadManager#disableBroker()}
+     */
+    void unregister() throws MetadataStoreException;
+
+    /**
+     * Get the current broker lookup service address.
+     *
+     * @return The service url without the protocol prefix, 'http://'. e.g. broker-xyz:port
+     */
+    String getLookupServiceAddress();
+
+    /**
+     * Get available brokers.
+     *
+     * @return The brokers service url list.
+     */
+    List<String> getAvailableBrokers();
+
+    /**
+     * Async get available brokers.
+     *
+     * @return The brokers service url list.
+     */
+    CompletableFuture<List<String>> getAvailableBrokersAsync();
+
+    /**
+     * Get the broker lookup data.
+     *
+     * @param broker The service url without the protocol prefix, 'http://'. e.g. broker-xyz:port
+     */
+    Optional<BrokerLookupData> lookup(String broker);
+
+    /**
+     * For each the broker lookup data.
+     * The key is lookupServiceAddress{@link BrokerRegistry#getLookupServiceAddress()}
+     */

Review Comment:
   In `BrokerRegistry` impl, it has `brokerLookupDataMap` to cache all `BrokerLookupData`, do you mean we expose this map? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Demogorgon314 commented on a diff in pull request #18084: [improve][broker] PIP-192: Define new load manager base interfaces

Posted by GitBox <gi...@apache.org>.
Demogorgon314 commented on code in PR #18084:
URL: https://github.com/apache/pulsar/pull/18084#discussion_r1030278029


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistry.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.metadata.api.NotificationType;
+
+/**
+ * Responsible for registering the current Broker lookup info to
+ * the distributed store (e.g. Zookeeper) for broker discovery.
+ */
+public interface BrokerRegistry extends AutoCloseable {
+
+    /**
+     * Start broker registry.
+     */
+    void start() throws PulsarServerException;
+
+    /**
+     * Register local broker to metadata store.
+     */
+    void register() throws MetadataStoreException;
+
+    /**
+     * Unregister the broker.
+     *
+     * Same as {@link org.apache.pulsar.broker.loadbalance.ModularLoadManager#disableBroker()}
+     */
+    void unregister() throws MetadataStoreException;
+
+    /**
+     * Get the current broker lookup service address.
+     *
+     * @return The service url without the protocol prefix, 'http://'. e.g. broker-xyz:port
+     */
+    String getLookupServiceAddress();
+
+    /**
+     * Get available brokers.
+     *
+     * @return The brokers service url list.
+     */
+    List<String> getAvailableBrokers();
+
+    /**
+     * Async get available brokers.
+     *
+     * @return The brokers service url list.
+     */
+    CompletableFuture<List<String>> getAvailableBrokersAsync();
+
+    /**
+     * Get the broker lookup data.
+     *
+     * @param broker The service url without the protocol prefix, 'http://'. e.g. broker-xyz:port
+     */
+    Optional<BrokerLookupData> lookup(String broker);

Review Comment:
   Yes, we can use asynchronous API here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Demogorgon314 commented on a diff in pull request #18084: [improve][broker] PIP-192: Define new load manager base interfaces

Posted by GitBox <gi...@apache.org>.
Demogorgon314 commented on code in PR #18084:
URL: https://github.com/apache/pulsar/pull/18084#discussion_r998890325


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensible/ExtensibleLoadManager.java:
##########
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensible;
+
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.lookup.LookupResult;
+import org.apache.pulsar.common.naming.ServiceUnitId;
+
+/**
+ * Find the appropriate broker for bundle through different load balancer Implementation.
+ */
+public interface ExtensibleLoadManager {
+
+    /**
+     * Start the extensible load manager.
+     *
+     * 1. Start the broker registry.
+     * 2. Register self to registry.
+     * 3. Start the load data store.
+     * 4. Init the load manager context.
+     * 5. Start load data reporter.
+     * 6. Start the namespace unload scheduler.
+     * 7. Start the namespace split scheduler.
+     * 8. Listen the broker up or down, so we can split immediately.
+     */
+    void start() throws PulsarServerException;
+
+    /**
+     * Initialize this load manager using the given pulsar service.
+     */
+    void initialize(PulsarService pulsar);
+
+    /**
+     * The incoming bundle selects the appropriate broker through strategies.
+     *
+     * @param serviceUnit Bundle.
+     * @return Simple resource.
+     */
+    Optional<String> discover(ServiceUnitId serviceUnit);

Review Comment:
   Updated.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensible/ExtensibleLoadManager.java:
##########
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensible;
+
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.lookup.LookupResult;
+import org.apache.pulsar.common.naming.ServiceUnitId;
+
+/**
+ * Find the appropriate broker for bundle through different load balancer Implementation.
+ */
+public interface ExtensibleLoadManager {
+
+    /**
+     * Start the extensible load manager.
+     *
+     * 1. Start the broker registry.
+     * 2. Register self to registry.
+     * 3. Start the load data store.
+     * 4. Init the load manager context.
+     * 5. Start load data reporter.
+     * 6. Start the namespace unload scheduler.
+     * 7. Start the namespace split scheduler.
+     * 8. Listen the broker up or down, so we can split immediately.
+     */
+    void start() throws PulsarServerException;
+
+    /**
+     * Initialize this load manager using the given pulsar service.
+     */
+    void initialize(PulsarService pulsar);
+
+    /**
+     * The incoming bundle selects the appropriate broker through strategies.

Review Comment:
   Updated.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensible/ExtensibleLoadManager.java:
##########
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensible;
+
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.lookup.LookupResult;
+import org.apache.pulsar.common.naming.ServiceUnitId;
+
+/**
+ * Find the appropriate broker for bundle through different load balancer Implementation.
+ */
+public interface ExtensibleLoadManager {
+
+    /**
+     * Start the extensible load manager.
+     *
+     * 1. Start the broker registry.
+     * 2. Register self to registry.
+     * 3. Start the load data store.
+     * 4. Init the load manager context.
+     * 5. Start load data reporter.
+     * 6. Start the namespace unload scheduler.
+     * 7. Start the namespace split scheduler.
+     * 8. Listen the broker up or down, so we can split immediately.
+     */
+    void start() throws PulsarServerException;
+
+    /**
+     * Initialize this load manager using the given pulsar service.
+     */
+    void initialize(PulsarService pulsar);
+
+    /**
+     * The incoming bundle selects the appropriate broker through strategies.
+     *
+     * @param serviceUnit Bundle.
+     * @return Simple resource.
+     */
+    Optional<String> discover(ServiceUnitId serviceUnit);
+
+
+    /**
+     * The incoming bundle selects the appropriate broker through strategies.
+     *
+     * @param serviceUnit Bundle.
+     * @return Simple resource.
+     */
+    CompletableFuture<Optional<LookupResult>> assign(Optional<ServiceUnitId> topic, ServiceUnitId serviceUnit);

Review Comment:
   Updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Demogorgon314 commented on a diff in pull request #18084: [improve][broker] PIP-192: Define new load manager base interfaces

Posted by GitBox <gi...@apache.org>.
Demogorgon314 commented on code in PR #18084:
URL: https://github.com/apache/pulsar/pull/18084#discussion_r998890208


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensible/BrokerRegistry.java:
##########
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensible;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.loadbalance.extensible.data.BrokerLookupData;
+import org.apache.pulsar.metadata.api.NotificationType;
+
+/**
+ * Responsible for registering the current Broker lookup info to
+ * the distributed store (e.g. Zookeeper) for broker discovery.
+ */
+public interface BrokerRegistry {
+
+    /**
+     * Start broker registry.
+     */
+    void start();
+
+    /**
+     * Register local broker to metadata store.
+     */
+    void register();
+
+    /**
+     * Unregister the broker.
+     *
+     * Same as {@link org.apache.pulsar.broker.loadbalance.ModularLoadManager#disableBroker()}
+     */
+    void unregister() throws PulsarServerException;
+
+    /**
+     * Get the current broker lookup service address.
+     */
+    String getLookupServiceAddress();
+
+    /**
+     * Get available brokers.
+     */
+    List<String> getAvailableBrokers();
+
+    /**
+     * Async get available brokers.
+     */
+    CompletableFuture<List<String>> getAvailableBrokersAsync();
+
+    /**
+     * Fetch local-broker data from load-manager broker cache.
+     *
+     * @param broker The load-balancer path.

Review Comment:
   Fixed.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensible/BrokerRegistry.java:
##########
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensible;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.loadbalance.extensible.data.BrokerLookupData;
+import org.apache.pulsar.metadata.api.NotificationType;
+
+/**
+ * Responsible for registering the current Broker lookup info to
+ * the distributed store (e.g. Zookeeper) for broker discovery.
+ */
+public interface BrokerRegistry {
+
+    /**
+     * Start broker registry.
+     */
+    void start();
+
+    /**
+     * Register local broker to metadata store.
+     */
+    void register();
+
+    /**
+     * Unregister the broker.
+     *
+     * Same as {@link org.apache.pulsar.broker.loadbalance.ModularLoadManager#disableBroker()}
+     */
+    void unregister() throws PulsarServerException;
+
+    /**
+     * Get the current broker lookup service address.
+     */
+    String getLookupServiceAddress();
+
+    /**
+     * Get available brokers.
+     */
+    List<String> getAvailableBrokers();
+
+    /**
+     * Async get available brokers.
+     */
+    CompletableFuture<List<String>> getAvailableBrokersAsync();
+
+    /**
+     * Fetch local-broker data from load-manager broker cache.
+     *
+     * @param broker The load-balancer path.
+     */
+    Optional<BrokerLookupData> lookup(String broker);
+
+    /**
+     * For each the broker lookup data.
+     * The key is lookupServiceAddress
+     */
+    void forEach(BiConsumer<String, BrokerLookupData> action);
+
+    /**
+     * Listen the broker register change.
+     */
+    void listen(BiConsumer<String, NotificationType> listener);
+
+    /**
+     * Close the broker registry.
+     */
+    void close() throws Exception;

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Demogorgon314 commented on a diff in pull request #18084: [improve][broker] PIP-192: Define new load manager base interfaces

Posted by GitBox <gi...@apache.org>.
Demogorgon314 commented on code in PR #18084:
URL: https://github.com/apache/pulsar/pull/18084#discussion_r1026003680


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLookupData.java:
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.data;
+
+import java.util.Map;
+import java.util.Optional;
+import org.apache.pulsar.broker.lookup.LookupResult;
+import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener;
+import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData;
+
+/**
+ * Defines the information required to broker lookup.
+ */
+public record BrokerLookupData (String webServiceUrl,

Review Comment:
   IMO, `BrokerLookupData` maybe better since we already have a `LocalBrokerData` class (it contains lookup data and load data) in the old load manager impl. This record should only have lookup data.
   
   The load data has split to `BrokerLoadData`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Demogorgon314 commented on pull request #18084: [improve][broker] PIP-192: Define new load manager base interfaces

Posted by GitBox <gi...@apache.org>.
Demogorgon314 commented on PR #18084:
URL: https://github.com/apache/pulsar/pull/18084#issuecomment-1308322218

   @heesung-sn Use `org.apache.pulsar.broker.loadbalance.overlord` ok to me.  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org