You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by "Demogorgon314 (via GitHub)" <gi...@apache.org> on 2023/04/10 15:00:30 UTC

[GitHub] [pulsar] Demogorgon314 opened a new pull request, #20058: [improve][broker] PIP-192: Redirect the request to current load manager

Demogorgon314 opened a new pull request, #20058:
URL: https://github.com/apache/pulsar/pull/20058

   PIP: https://github.com/apache/pulsar/issues/16691
   
   ### Motivation
   
   Currently, when we are deploying or rollback the load manager, 
   we might get a different bundle owner from the old and new broker,
   it will cause producer or consumer to fail.
   
   ### Modifications
   
   After this PR, when do the lookup, it will check if we are doing deploy or rollback,
   and forward the request to the correct broker.
   
   ### Documentation
   
   <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->
   
   - [ ] `doc` <!-- Your PR contains doc changes. -->
   - [ ] `doc-required` <!-- Your PR changes impact docs and you will update later -->
   - [x] `doc-not-needed` <!-- Your PR changes do not impact docs -->
   - [ ] `doc-complete` <!-- Docs have been already added -->


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Demogorgon314 commented on a diff in pull request #20058: [improve][broker] PIP-192: Redirect the request to current load manager

Posted by "Demogorgon314 (via GitHub)" <gi...@apache.org>.
Demogorgon314 commented on code in PR #20058:
URL: https://github.com/apache/pulsar/pull/20058#discussion_r1162184812


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLookupData.java:
##########
@@ -67,6 +69,16 @@ public Optional<String> getProtocol(String protocol) {
         return Optional.ofNullable(this.protocols().get(protocol));
     }
 

Review Comment:
   Yes, we need to override the method. Because we implement `ServiceLookupData`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] heesung-sn commented on a diff in pull request #20058: [improve][broker] PIP-192: Redirect the request to current load manager

Posted by "heesung-sn (via GitHub)" <gi...@apache.org>.
heesung-sn commented on code in PR #20058:
URL: https://github.com/apache/pulsar/pull/20058#discussion_r1162187454


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/RedirectManager.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.manager;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.broker.lookup.LookupResult;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.metadata.api.coordination.LockManager;
+import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData;
+
+@Slf4j
+public class RedirectManager {
+    protected static final String LOOKUP_DATA_PATH = "/loadbalance/brokers";
+
+    private final PulsarService pulsar;
+
+    private final LockManager<BrokerLookupData> brokerLookupDataLockManager;
+
+
+    public RedirectManager(PulsarService pulsar) {
+        this.pulsar = pulsar;
+        this.brokerLookupDataLockManager = pulsar.getCoordinationService().getLockManager(BrokerLookupData.class);
+    }
+
+    public CompletableFuture<Map<String, BrokerLookupData>> getAvailableBrokerLookupDataAsync() {

Review Comment:
   I see. Maybe we can share a static func(BrokerRegistry. getAvailableBrokerLookupDataAsync) for this getter.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Demogorgon314 commented on a diff in pull request #20058: [improve][broker] PIP-192: Redirect the request to current load manager

Posted by "Demogorgon314 (via GitHub)" <gi...@apache.org>.
Demogorgon314 commented on code in PR #20058:
URL: https://github.com/apache/pulsar/pull/20058#discussion_r1162573922


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/RedirectManager.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.manager;
+
+import static org.apache.pulsar.broker.loadbalance.LoadManager.LOADBALANCE_BROKERS_ROOT;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.broker.lookup.LookupResult;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.metadata.api.coordination.LockManager;
+import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData;
+
+@Slf4j
+public class RedirectManager {
+    private final PulsarService pulsar;
+
+    private final LockManager<BrokerLookupData> brokerLookupDataLockManager;
+
+
+    public RedirectManager(PulsarService pulsar) {
+        this.pulsar = pulsar;
+        this.brokerLookupDataLockManager = pulsar.getCoordinationService().getLockManager(BrokerLookupData.class);
+    }
+
+    public CompletableFuture<Map<String, BrokerLookupData>> getAvailableBrokerLookupDataAsync() {
+        return brokerLookupDataLockManager.listLocks(LOADBALANCE_BROKERS_ROOT).thenCompose(availableBrokers -> {
+            Map<String, BrokerLookupData> map = new ConcurrentHashMap<>();
+            List<CompletableFuture<Void>> futures = new ArrayList<>();
+            for (String brokerId : availableBrokers) {
+                futures.add(this.brokerLookupDataLockManager.readLock(
+                        String.format("%s/%s", LOADBALANCE_BROKERS_ROOT, brokerId)).thenAccept(lookupDataOpt -> {
+                    if (lookupDataOpt.isPresent()) {
+                        map.put(brokerId, lookupDataOpt.get());
+                    } else {
+                        log.warn("Got an empty lookup data, brokerId: {}", brokerId);
+                    }
+                }));
+            }
+
+            return FutureUtil.waitForAll(futures).thenApply(__ -> map);
+        });
+    }
+
+    public CompletableFuture<Optional<LookupResult>> findRedirectLookupResultAsync() {
+        String currentLMClassName = pulsar.getConfiguration().getLoadManagerClassName();
+        boolean debug = ExtensibleLoadManagerImpl.debug(pulsar.getConfig(), log);
+        return getAvailableBrokerLookupDataAsync().thenApply(lookupDataMap -> {
+            if (lookupDataMap.isEmpty()) {
+                log.warn("No available broker found");
+                return Optional.empty();

Review Comment:
   Updated.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/RedirectManager.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.manager;
+
+import static org.apache.pulsar.broker.loadbalance.LoadManager.LOADBALANCE_BROKERS_ROOT;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.broker.lookup.LookupResult;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.metadata.api.coordination.LockManager;
+import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData;
+
+@Slf4j
+public class RedirectManager {
+    private final PulsarService pulsar;
+
+    private final LockManager<BrokerLookupData> brokerLookupDataLockManager;
+
+
+    public RedirectManager(PulsarService pulsar) {
+        this.pulsar = pulsar;
+        this.brokerLookupDataLockManager = pulsar.getCoordinationService().getLockManager(BrokerLookupData.class);
+    }
+
+    public CompletableFuture<Map<String, BrokerLookupData>> getAvailableBrokerLookupDataAsync() {
+        return brokerLookupDataLockManager.listLocks(LOADBALANCE_BROKERS_ROOT).thenCompose(availableBrokers -> {
+            Map<String, BrokerLookupData> map = new ConcurrentHashMap<>();
+            List<CompletableFuture<Void>> futures = new ArrayList<>();
+            for (String brokerId : availableBrokers) {
+                futures.add(this.brokerLookupDataLockManager.readLock(
+                        String.format("%s/%s", LOADBALANCE_BROKERS_ROOT, brokerId)).thenAccept(lookupDataOpt -> {
+                    if (lookupDataOpt.isPresent()) {
+                        map.put(brokerId, lookupDataOpt.get());
+                    } else {
+                        log.warn("Got an empty lookup data, brokerId: {}", brokerId);
+                    }
+                }));
+            }
+
+            return FutureUtil.waitForAll(futures).thenApply(__ -> map);
+        });
+    }
+
+    public CompletableFuture<Optional<LookupResult>> findRedirectLookupResultAsync() {
+        String currentLMClassName = pulsar.getConfiguration().getLoadManagerClassName();
+        boolean debug = ExtensibleLoadManagerImpl.debug(pulsar.getConfig(), log);
+        return getAvailableBrokerLookupDataAsync().thenApply(lookupDataMap -> {
+            if (lookupDataMap.isEmpty()) {
+                log.warn("No available broker found");
+                return Optional.empty();
+            }
+            AtomicReference<ServiceLookupData> latestServiceLookupData = new AtomicReference<>();
+            AtomicLong lastStartTimestamp = new AtomicLong(0L);
+            lookupDataMap.forEach((key, value) -> {
+                if (lastStartTimestamp.get() < value.getStartTimestamp()) {
+                    lastStartTimestamp.set(value.getStartTimestamp());
+                    latestServiceLookupData.set(value);
+                }
+            });
+            if (latestServiceLookupData.get() == null) {
+                log.warn("No available broker found");
+                return Optional.empty();

Review Comment:
   Updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Demogorgon314 commented on a diff in pull request #20058: [improve][broker] PIP-192: Redirect the request to current load manager

Posted by "Demogorgon314 (via GitHub)" <gi...@apache.org>.
Demogorgon314 commented on code in PR #20058:
URL: https://github.com/apache/pulsar/pull/20058#discussion_r1162213444


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerLMClassFilter.java:
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.filter;
+
+import java.util.Map;
+import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
+import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.common.naming.ServiceUnitId;
+
+public class BrokerLMClassFilter implements BrokerFilter {
+
+    public static final String FILTER_NAME = "broker_LM_class_filter";
+    @Override
+    public String name() {
+        return FILTER_NAME;
+    }
+
+    @Override
+    public Map<String, BrokerLookupData> filter(
+            Map<String, BrokerLookupData> brokers,
+            ServiceUnitId serviceUnit,
+            LoadManagerContext context)
+            throws BrokerFilterException {
+        if (brokers.isEmpty()) {
+            return brokers;
+        }
+        brokers.entrySet().removeIf(entry -> {
+            BrokerLookupData v = entry.getValue();
+            return !v.getLoadManagerClassName().equals(context.brokerConfiguration().getLoadManagerClassName());

Review Comment:
   Maybe we shouldn't dynamically change this config? If we only rely on restart to update the config, then we don't have this issue?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] heesung-sn commented on a diff in pull request #20058: [improve][broker] PIP-192: Redirect the request to current load manager

Posted by "heesung-sn (via GitHub)" <gi...@apache.org>.
heesung-sn commented on code in PR #20058:
URL: https://github.com/apache/pulsar/pull/20058#discussion_r1162185526


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java:
##########
@@ -106,6 +107,8 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
 
     private static final long MONITOR_INTERVAL_IN_MILLIS = 120_000;
 
+    private static final String ELECTION_ROOT = "/loadbalance/extension/leader";

Review Comment:
   I see.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] heesung-sn commented on a diff in pull request #20058: [improve][broker] PIP-192: Redirect the request to current load manager

Posted by "heesung-sn (via GitHub)" <gi...@apache.org>.
heesung-sn commented on code in PR #20058:
URL: https://github.com/apache/pulsar/pull/20058#discussion_r1162218388


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerLMClassFilter.java:
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.filter;
+
+import java.util.Map;
+import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
+import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.common.naming.ServiceUnitId;
+
+public class BrokerLMClassFilter implements BrokerFilter {
+
+    public static final String FILTER_NAME = "broker_LM_class_filter";
+    @Override
+    public String name() {
+        return FILTER_NAME;
+    }
+
+    @Override
+    public Map<String, BrokerLookupData> filter(
+            Map<String, BrokerLookupData> brokers,
+            ServiceUnitId serviceUnit,
+            LoadManagerContext context)
+            throws BrokerFilterException {
+        if (brokers.isEmpty()) {
+            return brokers;
+        }
+        brokers.entrySet().removeIf(entry -> {
+            BrokerLookupData v = entry.getValue();
+            return !v.getLoadManagerClassName().equals(context.brokerConfiguration().getLoadManagerClassName());

Review Comment:
   Sorry, we are redirecting the lookups to those brokers who have the latest LM name. I think we are good here.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/BrokerLMClassFilter.java:
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.impl;
+
+import java.util.Set;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.BrokerFilter;
+import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
+import org.apache.pulsar.broker.loadbalance.LoadData;
+import org.apache.pulsar.policies.data.loadbalancer.BundleData;
+
+public class BrokerLMClassFilter implements BrokerFilter {
+
+    @Override
+    public void filter(Set<String> brokers, BundleData bundleToAssign,
+                       LoadData loadData,
+                       ServiceConfiguration conf) throws BrokerFilterException {
+        loadData.getBrokerData().forEach((key, value) -> {
+            if (!value.getLocalData().getLoadManagerClassName()
+                    .equals(conf.getLoadManagerClassName())) {

Review Comment:
   Similarly,
   during the ExtensionLM deployment, the brokers before deployment could still have the ModularLM class name in its config. In this case, the brokers will still assign bundles to the brokers with ModularLM class name.
   
   Are admins expected to update the loadbalance class name for all brokers first, and then start deployment?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] heesung-sn commented on a diff in pull request #20058: [improve][broker] PIP-192: Redirect the request to current load manager

Posted by "heesung-sn (via GitHub)" <gi...@apache.org>.
heesung-sn commented on code in PR #20058:
URL: https://github.com/apache/pulsar/pull/20058#discussion_r1162218388


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerLMClassFilter.java:
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.filter;
+
+import java.util.Map;
+import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
+import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.common.naming.ServiceUnitId;
+
+public class BrokerLMClassFilter implements BrokerFilter {
+
+    public static final String FILTER_NAME = "broker_LM_class_filter";
+    @Override
+    public String name() {
+        return FILTER_NAME;
+    }
+
+    @Override
+    public Map<String, BrokerLookupData> filter(
+            Map<String, BrokerLookupData> brokers,
+            ServiceUnitId serviceUnit,
+            LoadManagerContext context)
+            throws BrokerFilterException {
+        if (brokers.isEmpty()) {
+            return brokers;
+        }
+        brokers.entrySet().removeIf(entry -> {
+            BrokerLookupData v = entry.getValue();
+            return !v.getLoadManagerClassName().equals(context.brokerConfiguration().getLoadManagerClassName());

Review Comment:
   Oh. I am sorry. We are redirecting lookups to those brokers who have the latest(target deploy and rollback) LM name. So, I think we are good here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] heesung-sn commented on a diff in pull request #20058: [improve][broker] PIP-192: Redirect the request to current load manager

Posted by "heesung-sn (via GitHub)" <gi...@apache.org>.
heesung-sn commented on code in PR #20058:
URL: https://github.com/apache/pulsar/pull/20058#discussion_r1162187454


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/RedirectManager.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.manager;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.broker.lookup.LookupResult;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.metadata.api.coordination.LockManager;
+import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData;
+
+@Slf4j
+public class RedirectManager {
+    protected static final String LOOKUP_DATA_PATH = "/loadbalance/brokers";
+
+    private final PulsarService pulsar;
+
+    private final LockManager<BrokerLookupData> brokerLookupDataLockManager;
+
+
+    public RedirectManager(PulsarService pulsar) {
+        this.pulsar = pulsar;
+        this.brokerLookupDataLockManager = pulsar.getCoordinationService().getLockManager(BrokerLookupData.class);
+    }
+
+    public CompletableFuture<Map<String, BrokerLookupData>> getAvailableBrokerLookupDataAsync() {

Review Comment:
   I see. Maybe we can share a static func for this getter.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Demogorgon314 commented on a diff in pull request #20058: [improve][broker] PIP-192: Redirect the request to current load manager

Posted by "Demogorgon314 (via GitHub)" <gi...@apache.org>.
Demogorgon314 commented on code in PR #20058:
URL: https://github.com/apache/pulsar/pull/20058#discussion_r1162211402


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/RedirectManager.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.manager;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.broker.lookup.LookupResult;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.metadata.api.coordination.LockManager;
+import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData;
+
+@Slf4j
+public class RedirectManager {
+    protected static final String LOOKUP_DATA_PATH = "/loadbalance/brokers";
+
+    private final PulsarService pulsar;
+
+    private final LockManager<BrokerLookupData> brokerLookupDataLockManager;
+
+
+    public RedirectManager(PulsarService pulsar) {
+        this.pulsar = pulsar;
+        this.brokerLookupDataLockManager = pulsar.getCoordinationService().getLockManager(BrokerLookupData.class);
+    }
+
+    public CompletableFuture<Map<String, BrokerLookupData>> getAvailableBrokerLookupDataAsync() {
+        return brokerLookupDataLockManager.listLocks(LOOKUP_DATA_PATH).thenCompose(availableBrokers -> {
+            Map<String, BrokerLookupData> map = new ConcurrentHashMap<>();
+            List<CompletableFuture<Void>> futures = new ArrayList<>();
+            for (String brokerId : availableBrokers) {
+                futures.add(this.brokerLookupDataLockManager.readLock(
+                        String.format("%s/%s", LOOKUP_DATA_PATH, brokerId)).thenAccept(lookupDataOpt -> {
+                    if (lookupDataOpt.isPresent()) {
+                        map.put(brokerId, lookupDataOpt.get());
+                    } else {
+                        log.warn("Got an empty lookup data, brokerId: {}", brokerId);
+                    }
+                }));
+            }
+
+            return FutureUtil.waitForAll(futures).thenApply(__ -> map);
+        });
+    }
+
+    public CompletableFuture<Optional<LookupResult>> findRedirectLookupResultAsync() {
+        String currentLMClassName = pulsar.getConfiguration().getLoadManagerClassName();
+        return getAvailableBrokerLookupDataAsync().thenApply(lookupDataMap -> {
+            if (lookupDataMap.isEmpty()) {
+                log.warn("No available broker found");
+                return Optional.empty();
+            }
+            AtomicReference<ServiceLookupData> serviceLookupData = new AtomicReference<>();

Review Comment:
   Updated.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/RedirectManager.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.manager;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.broker.lookup.LookupResult;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.metadata.api.coordination.LockManager;
+import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData;
+
+@Slf4j
+public class RedirectManager {
+    protected static final String LOOKUP_DATA_PATH = "/loadbalance/brokers";
+
+    private final PulsarService pulsar;
+
+    private final LockManager<BrokerLookupData> brokerLookupDataLockManager;
+
+
+    public RedirectManager(PulsarService pulsar) {
+        this.pulsar = pulsar;
+        this.brokerLookupDataLockManager = pulsar.getCoordinationService().getLockManager(BrokerLookupData.class);
+    }
+
+    public CompletableFuture<Map<String, BrokerLookupData>> getAvailableBrokerLookupDataAsync() {
+        return brokerLookupDataLockManager.listLocks(LOOKUP_DATA_PATH).thenCompose(availableBrokers -> {
+            Map<String, BrokerLookupData> map = new ConcurrentHashMap<>();
+            List<CompletableFuture<Void>> futures = new ArrayList<>();
+            for (String brokerId : availableBrokers) {
+                futures.add(this.brokerLookupDataLockManager.readLock(
+                        String.format("%s/%s", LOOKUP_DATA_PATH, brokerId)).thenAccept(lookupDataOpt -> {
+                    if (lookupDataOpt.isPresent()) {
+                        map.put(brokerId, lookupDataOpt.get());
+                    } else {
+                        log.warn("Got an empty lookup data, brokerId: {}", brokerId);
+                    }
+                }));
+            }
+
+            return FutureUtil.waitForAll(futures).thenApply(__ -> map);
+        });
+    }
+
+    public CompletableFuture<Optional<LookupResult>> findRedirectLookupResultAsync() {
+        String currentLMClassName = pulsar.getConfiguration().getLoadManagerClassName();
+        return getAvailableBrokerLookupDataAsync().thenApply(lookupDataMap -> {
+            if (lookupDataMap.isEmpty()) {
+                log.warn("No available broker found");
+                return Optional.empty();
+            }
+            AtomicReference<ServiceLookupData> serviceLookupData = new AtomicReference<>();
+            AtomicLong lastStartTimestamp = new AtomicLong(0L);
+            lookupDataMap.forEach((key, value) -> {
+                if (lastStartTimestamp.get() < value.getStartTimestamp()) {
+                    lastStartTimestamp.set(value.getStartTimestamp());
+                    serviceLookupData.set(value);
+                }
+            });
+            if (serviceLookupData.get() == null) {
+                log.warn("No available broker found");
+                return Optional.empty();
+            }
+            if (serviceLookupData.get().getLoadManagerClassName().equals(currentLMClassName)) {
+                if (log.isDebugEnabled()) {

Review Comment:
   Sure



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerLMClassFilter.java:
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.filter;
+
+import java.util.Map;
+import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
+import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.common.naming.ServiceUnitId;
+
+public class BrokerLMClassFilter implements BrokerFilter {

Review Comment:
   Sure.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Demogorgon314 commented on a diff in pull request #20058: [improve][broker] PIP-192: Redirect the request to current load manager

Posted by "Demogorgon314 (via GitHub)" <gi...@apache.org>.
Demogorgon314 commented on code in PR #20058:
URL: https://github.com/apache/pulsar/pull/20058#discussion_r1162211304


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilterTestBase.java:
##########
@@ -115,6 +116,7 @@ public BrokerLookupData getLookupData(String version) {
         }};
         return new BrokerLookupData(
                 webServiceUrl, webServiceUrlTls, pulsarServiceUrl,
-                pulsarServiceUrlTls, advertisedListeners, protocols, true, true, version);
+                pulsarServiceUrlTls, advertisedListeners, protocols, true, true,
+                ExtensibleLoadManagerImpl.class.getName(), -1, version);

Review Comment:
   Updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Demogorgon314 commented on a diff in pull request #20058: [improve][broker] PIP-192: Redirect the request to current load manager

Posted by "Demogorgon314 (via GitHub)" <gi...@apache.org>.
Demogorgon314 commented on code in PR #20058:
URL: https://github.com/apache/pulsar/pull/20058#discussion_r1162182853


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java:
##########
@@ -51,7 +51,7 @@
 @Slf4j
 public class BrokerRegistryImpl implements BrokerRegistry {
 
-    protected static final String LOOKUP_DATA_PATH = "/loadbalance/extension/brokers";
+    protected static final String LOOKUP_DATA_PATH = "/loadbalance/brokers";

Review Comment:
   Yes, both implement the `ServiceLookupData`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] heesung-sn commented on a diff in pull request #20058: [improve][broker] PIP-192: Redirect the request to current load manager

Posted by "heesung-sn (via GitHub)" <gi...@apache.org>.
heesung-sn commented on code in PR #20058:
URL: https://github.com/apache/pulsar/pull/20058#discussion_r1161885291


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLookupData.java:
##########
@@ -67,6 +69,16 @@ public Optional<String> getProtocol(String protocol) {
         return Optional.ofNullable(this.protocols().get(protocol));
     }
 

Review Comment:
   Do we need to define these getters from the `record` class? I thought the getters are already provided.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerLMClassFilter.java:
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.filter;
+
+import java.util.Map;
+import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
+import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.common.naming.ServiceUnitId;
+
+public class BrokerLMClassFilter implements BrokerFilter {

Review Comment:
   Can't we rename this to `BrokerLoadManagerClassFilter`



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java:
##########
@@ -106,6 +107,8 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
 
     private static final long MONITOR_INTERVAL_IN_MILLIS = 120_000;
 
+    private static final String ELECTION_ROOT = "/loadbalance/extension/leader";

Review Comment:
   why do we need to change the leader znode path?(can't the same leader play both old and new LM leader?) can't we use the same path? `/loadbalance/leader` 



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/BrokerLMClassFilter.java:
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.impl;
+
+import java.util.Set;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.BrokerFilter;
+import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
+import org.apache.pulsar.broker.loadbalance.LoadData;
+import org.apache.pulsar.policies.data.loadbalancer.BundleData;
+
+public class BrokerLMClassFilter implements BrokerFilter {
+
+    @Override
+    public void filter(Set<String> brokers, BundleData bundleToAssign,
+                       LoadData loadData,
+                       ServiceConfiguration conf) throws BrokerFilterException {
+        loadData.getBrokerData().forEach((key, value) -> {
+            if (!value.getLocalData().getLoadManagerClassName()
+                    .equals(conf.getLoadManagerClassName())) {

Review Comment:
   Similarly,
   during the ExtensionLM deployment, the brokers before deployment could still have the ModularLM class name in its config. In this case, the brokers will still assign bundles to the brokers with ModularLM class name.
   
   Are admins expected to update the loadbalance class name for all brokers first, and then start deployment?



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilterTestBase.java:
##########
@@ -115,6 +116,7 @@ public BrokerLookupData getLookupData(String version) {
         }};
         return new BrokerLookupData(
                 webServiceUrl, webServiceUrlTls, pulsarServiceUrl,
-                pulsarServiceUrlTls, advertisedListeners, protocols, true, true, version);
+                pulsarServiceUrlTls, advertisedListeners, protocols, true, true,
+                ExtensibleLoadManagerImpl.class.getName(), -1, version);

Review Comment:
   We should pass loadmanagerClassName as arg and cover brokers with different BrokerLookupData .loadmanagerClassName during rollback <--> deployment.
   
   case new LM deployment:
   1. brokers with new LM
   2. brokers with old LM
   
   case rollback to old LM:
   1. brokers with new LM
   2. brokers with old LM
   
   



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/RedirectManager.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.manager;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.broker.lookup.LookupResult;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.metadata.api.coordination.LockManager;
+import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData;
+
+@Slf4j
+public class RedirectManager {
+    protected static final String LOOKUP_DATA_PATH = "/loadbalance/brokers";
+
+    private final PulsarService pulsar;
+
+    private final LockManager<BrokerLookupData> brokerLookupDataLockManager;
+
+
+    public RedirectManager(PulsarService pulsar) {
+        this.pulsar = pulsar;
+        this.brokerLookupDataLockManager = pulsar.getCoordinationService().getLockManager(BrokerLookupData.class);
+    }
+
+    public CompletableFuture<Map<String, BrokerLookupData>> getAvailableBrokerLookupDataAsync() {
+        return brokerLookupDataLockManager.listLocks(LOOKUP_DATA_PATH).thenCompose(availableBrokers -> {
+            Map<String, BrokerLookupData> map = new ConcurrentHashMap<>();
+            List<CompletableFuture<Void>> futures = new ArrayList<>();
+            for (String brokerId : availableBrokers) {
+                futures.add(this.brokerLookupDataLockManager.readLock(
+                        String.format("%s/%s", LOOKUP_DATA_PATH, brokerId)).thenAccept(lookupDataOpt -> {
+                    if (lookupDataOpt.isPresent()) {
+                        map.put(brokerId, lookupDataOpt.get());
+                    } else {
+                        log.warn("Got an empty lookup data, brokerId: {}", brokerId);
+                    }
+                }));
+            }
+
+            return FutureUtil.waitForAll(futures).thenApply(__ -> map);
+        });
+    }
+
+    public CompletableFuture<Optional<LookupResult>> findRedirectLookupResultAsync() {
+        String currentLMClassName = pulsar.getConfiguration().getLoadManagerClassName();
+        return getAvailableBrokerLookupDataAsync().thenApply(lookupDataMap -> {
+            if (lookupDataMap.isEmpty()) {
+                log.warn("No available broker found");
+                return Optional.empty();
+            }
+            AtomicReference<ServiceLookupData> serviceLookupData = new AtomicReference<>();

Review Comment:
   can we rename serviceLookupData to `latestServiceLookupData`?
   
   Also I think we need to have multiple candidate brokers to redirect to(and randomly select one among them). Redirecting to the single broker could be a bottleneck in case of thundering herd lookups.
   
   For this, probably we need two loops:
   First phase: find the latestServiceLookupData.
   Second phase: find the redirect candidates whose LoadManager class name is the same as  latestServiceLookupData. 
   
   



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerLMClassFilter.java:
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.filter;
+
+import java.util.Map;
+import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
+import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.common.naming.ServiceUnitId;
+
+public class BrokerLMClassFilter implements BrokerFilter {
+
+    public static final String FILTER_NAME = "broker_LM_class_filter";
+    @Override
+    public String name() {
+        return FILTER_NAME;
+    }
+
+    @Override
+    public Map<String, BrokerLookupData> filter(
+            Map<String, BrokerLookupData> brokers,
+            ServiceUnitId serviceUnit,
+            LoadManagerContext context)
+            throws BrokerFilterException {
+        if (brokers.isEmpty()) {
+            return brokers;
+        }
+        brokers.entrySet().removeIf(entry -> {
+            BrokerLookupData v = entry.getValue();
+            return !v.getLoadManagerClassName().equals(context.brokerConfiguration().getLoadManagerClassName());

Review Comment:
   During the rollback, the brokers before rollback could still have the Extension class name in its config. In this case, the brokers will still assign bundles to the brokers with Extension classname.
   
   Are admins expected to update the loadbalance class name for all brokers first, and then start deployment?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/RedirectManager.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.manager;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.broker.lookup.LookupResult;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.metadata.api.coordination.LockManager;
+import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData;
+
+@Slf4j
+public class RedirectManager {
+    protected static final String LOOKUP_DATA_PATH = "/loadbalance/brokers";
+
+    private final PulsarService pulsar;
+
+    private final LockManager<BrokerLookupData> brokerLookupDataLockManager;
+
+
+    public RedirectManager(PulsarService pulsar) {
+        this.pulsar = pulsar;
+        this.brokerLookupDataLockManager = pulsar.getCoordinationService().getLockManager(BrokerLookupData.class);
+    }
+
+    public CompletableFuture<Map<String, BrokerLookupData>> getAvailableBrokerLookupDataAsync() {

Review Comment:
   Do we need this class? Why not use BrokerRegistry?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java:
##########
@@ -51,7 +51,7 @@
 @Slf4j
 public class BrokerRegistryImpl implements BrokerRegistry {
 
-    protected static final String LOOKUP_DATA_PATH = "/loadbalance/extension/brokers";
+    protected static final String LOOKUP_DATA_PATH = "/loadbalance/brokers";

Review Comment:
   Are the registry data compatible with both new and old lookup data? 



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/RedirectManager.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.manager;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.broker.lookup.LookupResult;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.metadata.api.coordination.LockManager;
+import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData;
+
+@Slf4j
+public class RedirectManager {
+    protected static final String LOOKUP_DATA_PATH = "/loadbalance/brokers";
+
+    private final PulsarService pulsar;
+
+    private final LockManager<BrokerLookupData> brokerLookupDataLockManager;
+
+
+    public RedirectManager(PulsarService pulsar) {
+        this.pulsar = pulsar;
+        this.brokerLookupDataLockManager = pulsar.getCoordinationService().getLockManager(BrokerLookupData.class);
+    }
+
+    public CompletableFuture<Map<String, BrokerLookupData>> getAvailableBrokerLookupDataAsync() {
+        return brokerLookupDataLockManager.listLocks(LOOKUP_DATA_PATH).thenCompose(availableBrokers -> {
+            Map<String, BrokerLookupData> map = new ConcurrentHashMap<>();
+            List<CompletableFuture<Void>> futures = new ArrayList<>();
+            for (String brokerId : availableBrokers) {
+                futures.add(this.brokerLookupDataLockManager.readLock(
+                        String.format("%s/%s", LOOKUP_DATA_PATH, brokerId)).thenAccept(lookupDataOpt -> {
+                    if (lookupDataOpt.isPresent()) {
+                        map.put(brokerId, lookupDataOpt.get());
+                    } else {
+                        log.warn("Got an empty lookup data, brokerId: {}", brokerId);
+                    }
+                }));
+            }
+
+            return FutureUtil.waitForAll(futures).thenApply(__ -> map);
+        });
+    }
+
+    public CompletableFuture<Optional<LookupResult>> findRedirectLookupResultAsync() {
+        String currentLMClassName = pulsar.getConfiguration().getLoadManagerClassName();
+        return getAvailableBrokerLookupDataAsync().thenApply(lookupDataMap -> {
+            if (lookupDataMap.isEmpty()) {
+                log.warn("No available broker found");
+                return Optional.empty();
+            }
+            AtomicReference<ServiceLookupData> serviceLookupData = new AtomicReference<>();
+            AtomicLong lastStartTimestamp = new AtomicLong(0L);
+            lookupDataMap.forEach((key, value) -> {
+                if (lastStartTimestamp.get() < value.getStartTimestamp()) {
+                    lastStartTimestamp.set(value.getStartTimestamp());
+                    serviceLookupData.set(value);
+                }
+            });
+            if (serviceLookupData.get() == null) {
+                log.warn("No available broker found");
+                return Optional.empty();
+            }
+            if (serviceLookupData.get().getLoadManagerClassName().equals(currentLMClassName)) {
+                if (log.isDebugEnabled()) {

Review Comment:
   Can we use ExtensibleLoadManagerImpl.debug()?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Demogorgon314 commented on a diff in pull request #20058: [improve][broker] PIP-192: Redirect the request to current load manager

Posted by "Demogorgon314 (via GitHub)" <gi...@apache.org>.
Demogorgon314 commented on code in PR #20058:
URL: https://github.com/apache/pulsar/pull/20058#discussion_r1162186415


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/RedirectManager.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.manager;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.broker.lookup.LookupResult;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.metadata.api.coordination.LockManager;
+import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData;
+
+@Slf4j
+public class RedirectManager {
+    protected static final String LOOKUP_DATA_PATH = "/loadbalance/brokers";
+
+    private final PulsarService pulsar;
+
+    private final LockManager<BrokerLookupData> brokerLookupDataLockManager;
+
+
+    public RedirectManager(PulsarService pulsar) {
+        this.pulsar = pulsar;
+        this.brokerLookupDataLockManager = pulsar.getCoordinationService().getLockManager(BrokerLookupData.class);
+    }
+
+    public CompletableFuture<Map<String, BrokerLookupData>> getAvailableBrokerLookupDataAsync() {

Review Comment:
   If using `BrokerRegistry `, we must register the call broker to registry. Maybe we can add a read-only mode for `BrokerRegistry`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Demogorgon314 commented on a diff in pull request #20058: [improve][broker] PIP-192: Redirect the request to current load manager

Posted by "Demogorgon314 (via GitHub)" <gi...@apache.org>.
Demogorgon314 commented on code in PR #20058:
URL: https://github.com/apache/pulsar/pull/20058#discussion_r1162185849


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerLMClassFilter.java:
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.filter;
+
+import java.util.Map;
+import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
+import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.common.naming.ServiceUnitId;
+
+public class BrokerLMClassFilter implements BrokerFilter {
+
+    public static final String FILTER_NAME = "broker_LM_class_filter";
+    @Override
+    public String name() {
+        return FILTER_NAME;
+    }
+
+    @Override
+    public Map<String, BrokerLookupData> filter(
+            Map<String, BrokerLookupData> brokers,
+            ServiceUnitId serviceUnit,
+            LoadManagerContext context)
+            throws BrokerFilterException {
+        if (brokers.isEmpty()) {
+            return brokers;
+        }
+        brokers.entrySet().removeIf(entry -> {
+            BrokerLookupData v = entry.getValue();
+            return !v.getLoadManagerClassName().equals(context.brokerConfiguration().getLoadManagerClassName());

Review Comment:
   Do you mean dynamic update the config? If we do a rolling update will it restart the broker, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] heesung-sn commented on a diff in pull request #20058: [improve][broker] PIP-192: Redirect the request to current load manager

Posted by "heesung-sn (via GitHub)" <gi...@apache.org>.
heesung-sn commented on code in PR #20058:
URL: https://github.com/apache/pulsar/pull/20058#discussion_r1162186982


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerLMClassFilter.java:
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.filter;
+
+import java.util.Map;
+import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
+import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.common.naming.ServiceUnitId;
+
+public class BrokerLMClassFilter implements BrokerFilter {
+
+    public static final String FILTER_NAME = "broker_LM_class_filter";
+    @Override
+    public String name() {
+        return FILTER_NAME;
+    }
+
+    @Override
+    public Map<String, BrokerLookupData> filter(
+            Map<String, BrokerLookupData> brokers,
+            ServiceUnitId serviceUnit,
+            LoadManagerContext context)
+            throws BrokerFilterException {
+        if (brokers.isEmpty()) {
+            return brokers;
+        }
+        brokers.entrySet().removeIf(entry -> {
+            BrokerLookupData v = entry.getValue();
+            return !v.getLoadManagerClassName().equals(context.brokerConfiguration().getLoadManagerClassName());

Review Comment:
   Yes dynamic config. Yes, the rolling update will restart the broker, but what if lookup comes to the new broker, which still has the ExtensionLM class name?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] heesung-sn commented on a diff in pull request #20058: [improve][broker] PIP-192: Redirect the request to current load manager

Posted by "heesung-sn (via GitHub)" <gi...@apache.org>.
heesung-sn commented on code in PR #20058:
URL: https://github.com/apache/pulsar/pull/20058#discussion_r1162219908


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java:
##########
@@ -151,7 +151,7 @@ public String getBrokerId() {
     @Override
     public CompletableFuture<List<String>> getAvailableBrokersAsync() {
         this.checkState();
-        return brokerLookupDataLockManager.listLocks(LOOKUP_DATA_PATH).thenApply(Lists::newArrayList);

Review Comment:
   why removing `thenApply(Lists::newArrayList);`? Can't the caller update the cache?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Demogorgon314 commented on a diff in pull request #20058: [improve][broker] PIP-192: Redirect the request to current load manager

Posted by "Demogorgon314 (via GitHub)" <gi...@apache.org>.
Demogorgon314 commented on code in PR #20058:
URL: https://github.com/apache/pulsar/pull/20058#discussion_r1162225366


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java:
##########
@@ -151,7 +151,7 @@ public String getBrokerId() {
     @Override
     public CompletableFuture<List<String>> getAvailableBrokersAsync() {
         this.checkState();
-        return brokerLookupDataLockManager.listLocks(LOOKUP_DATA_PATH).thenApply(Lists::newArrayList);

Review Comment:
   Oh, yes changed back.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] gaoran10 commented on a diff in pull request #20058: [improve][broker] PIP-192: Redirect the request to current load manager

Posted by "gaoran10 (via GitHub)" <gi...@apache.org>.
gaoran10 commented on code in PR #20058:
URL: https://github.com/apache/pulsar/pull/20058#discussion_r1162480028


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/RedirectManager.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.manager;
+
+import static org.apache.pulsar.broker.loadbalance.LoadManager.LOADBALANCE_BROKERS_ROOT;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.broker.lookup.LookupResult;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.metadata.api.coordination.LockManager;
+import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData;
+
+@Slf4j
+public class RedirectManager {
+    private final PulsarService pulsar;
+
+    private final LockManager<BrokerLookupData> brokerLookupDataLockManager;
+
+
+    public RedirectManager(PulsarService pulsar) {
+        this.pulsar = pulsar;
+        this.brokerLookupDataLockManager = pulsar.getCoordinationService().getLockManager(BrokerLookupData.class);
+    }
+
+    public CompletableFuture<Map<String, BrokerLookupData>> getAvailableBrokerLookupDataAsync() {
+        return brokerLookupDataLockManager.listLocks(LOADBALANCE_BROKERS_ROOT).thenCompose(availableBrokers -> {
+            Map<String, BrokerLookupData> map = new ConcurrentHashMap<>();
+            List<CompletableFuture<Void>> futures = new ArrayList<>();
+            for (String brokerId : availableBrokers) {
+                futures.add(this.brokerLookupDataLockManager.readLock(
+                        String.format("%s/%s", LOADBALANCE_BROKERS_ROOT, brokerId)).thenAccept(lookupDataOpt -> {
+                    if (lookupDataOpt.isPresent()) {
+                        map.put(brokerId, lookupDataOpt.get());
+                    } else {
+                        log.warn("Got an empty lookup data, brokerId: {}", brokerId);
+                    }
+                }));
+            }
+
+            return FutureUtil.waitForAll(futures).thenApply(__ -> map);
+        });
+    }
+
+    public CompletableFuture<Optional<LookupResult>> findRedirectLookupResultAsync() {
+        String currentLMClassName = pulsar.getConfiguration().getLoadManagerClassName();
+        boolean debug = ExtensibleLoadManagerImpl.debug(pulsar.getConfig(), log);
+        return getAvailableBrokerLookupDataAsync().thenApply(lookupDataMap -> {
+            if (lookupDataMap.isEmpty()) {
+                log.warn("No available broker found");
+                return Optional.empty();

Review Comment:
   Maybe we can throw an exception directly.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/RedirectManager.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.manager;
+
+import static org.apache.pulsar.broker.loadbalance.LoadManager.LOADBALANCE_BROKERS_ROOT;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.broker.lookup.LookupResult;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.metadata.api.coordination.LockManager;
+import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData;
+
+@Slf4j
+public class RedirectManager {
+    private final PulsarService pulsar;
+
+    private final LockManager<BrokerLookupData> brokerLookupDataLockManager;
+
+
+    public RedirectManager(PulsarService pulsar) {
+        this.pulsar = pulsar;
+        this.brokerLookupDataLockManager = pulsar.getCoordinationService().getLockManager(BrokerLookupData.class);
+    }
+
+    public CompletableFuture<Map<String, BrokerLookupData>> getAvailableBrokerLookupDataAsync() {
+        return brokerLookupDataLockManager.listLocks(LOADBALANCE_BROKERS_ROOT).thenCompose(availableBrokers -> {
+            Map<String, BrokerLookupData> map = new ConcurrentHashMap<>();
+            List<CompletableFuture<Void>> futures = new ArrayList<>();
+            for (String brokerId : availableBrokers) {
+                futures.add(this.brokerLookupDataLockManager.readLock(
+                        String.format("%s/%s", LOADBALANCE_BROKERS_ROOT, brokerId)).thenAccept(lookupDataOpt -> {
+                    if (lookupDataOpt.isPresent()) {
+                        map.put(brokerId, lookupDataOpt.get());
+                    } else {
+                        log.warn("Got an empty lookup data, brokerId: {}", brokerId);
+                    }
+                }));
+            }
+
+            return FutureUtil.waitForAll(futures).thenApply(__ -> map);
+        });
+    }
+
+    public CompletableFuture<Optional<LookupResult>> findRedirectLookupResultAsync() {
+        String currentLMClassName = pulsar.getConfiguration().getLoadManagerClassName();
+        boolean debug = ExtensibleLoadManagerImpl.debug(pulsar.getConfig(), log);
+        return getAvailableBrokerLookupDataAsync().thenApply(lookupDataMap -> {
+            if (lookupDataMap.isEmpty()) {
+                log.warn("No available broker found");
+                return Optional.empty();
+            }
+            AtomicReference<ServiceLookupData> latestServiceLookupData = new AtomicReference<>();
+            AtomicLong lastStartTimestamp = new AtomicLong(0L);
+            lookupDataMap.forEach((key, value) -> {
+                if (lastStartTimestamp.get() < value.getStartTimestamp()) {
+                    lastStartTimestamp.set(value.getStartTimestamp());
+                    latestServiceLookupData.set(value);
+                }
+            });
+            if (latestServiceLookupData.get() == null) {
+                log.warn("No available broker found");
+                return Optional.empty();

Review Comment:
   Maybe we can throw an exception directly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Technoboy- merged pull request #20058: [improve][broker] PIP-192: Redirect the request to current load manager

Posted by "Technoboy- (via GitHub)" <gi...@apache.org>.
Technoboy- merged PR #20058:
URL: https://github.com/apache/pulsar/pull/20058


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Demogorgon314 commented on a diff in pull request #20058: [improve][broker] PIP-192: Redirect the request to current load manager

Posted by "Demogorgon314 (via GitHub)" <gi...@apache.org>.
Demogorgon314 commented on code in PR #20058:
URL: https://github.com/apache/pulsar/pull/20058#discussion_r1162184289


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java:
##########
@@ -106,6 +107,8 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
 
     private static final long MONITOR_INTERVAL_IN_MILLIS = 120_000;
 
+    private static final String ELECTION_ROOT = "/loadbalance/extension/leader";

Review Comment:
   If the same leader plays both old and new LM leader, it will have problems. When lookup the LM system topic, it might redirect to the old leader broker, but it doesn't start the new LM in this broker, so it will redirect to the new LM broker again...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] heesung-sn commented on a diff in pull request #20058: [improve][broker] PIP-192: Redirect the request to current load manager

Posted by "heesung-sn (via GitHub)" <gi...@apache.org>.
heesung-sn commented on code in PR #20058:
URL: https://github.com/apache/pulsar/pull/20058#discussion_r1161880903


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java:
##########
@@ -106,6 +107,8 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
 
     private static final long MONITOR_INTERVAL_IN_MILLIS = 120_000;
 
+    private static final String ELECTION_ROOT = "/loadbalance/extension/leader";

Review Comment:
   why do we need to change the leader znode path?(can't the same leader play the old or new LM leader?) can't we use the same path? `/loadbalance/leader` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] heesung-sn commented on a diff in pull request #20058: [improve][broker] PIP-192: Redirect the request to current load manager

Posted by "heesung-sn (via GitHub)" <gi...@apache.org>.
heesung-sn commented on code in PR #20058:
URL: https://github.com/apache/pulsar/pull/20058#discussion_r1161880903


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java:
##########
@@ -106,6 +107,8 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
 
     private static final long MONITOR_INTERVAL_IN_MILLIS = 120_000;
 
+    private static final String ELECTION_ROOT = "/loadbalance/extension/leader";

Review Comment:
   why do we need to change the leader znode path?(can't the same leader play both old or new LM leader?) can't we use the same path? `/loadbalance/leader` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org