You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by "Demogorgon314 (via GitHub)" <gi...@apache.org> on 2023/02/22 05:33:53 UTC

[GitHub] [pulsar] Demogorgon314 opened a new pull request, #19592: [improve][broker] PIP-192: Support broker isolation policy

Demogorgon314 opened a new pull request, #19592:
URL: https://github.com/apache/pulsar/pull/19592

   Master Issue: https://github.com/apache/pulsar/issues/16691
   
   ### Motivation
   
   Currently, The `ExtensibleLoadManager` don't support broker isolation policy yet.
   
   ### Modifications
   
   Add a `BrokerIsolationPoliciesFilter` to filter out an unqualified broker with isolation policies.
   Add broker isolation support for `TransferShedder`.
   
   ### Documentation
   
   <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->
   
   - [ ] `doc` <!-- Your PR contains doc changes. -->
   - [ ] `doc-required` <!-- Your PR changes impact docs and you will update later -->
   - [x] `doc-not-needed` <!-- Your PR changes do not impact docs -->
   - [ ] `doc-complete` <!-- Docs have been already added -->
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Demogorgon314 commented on a diff in pull request #19592: [improve][broker] PIP-192: Support broker isolation policy

Posted by "Demogorgon314 (via GitHub)" <gi...@apache.org>.
Demogorgon314 commented on code in PR #19592:
URL: https://github.com/apache/pulsar/pull/19592#discussion_r1122630418


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/policies/IsolationPoliciesHelper.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.policies;
+
+import io.netty.util.concurrent.FastThreadLocal;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
+import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies;
+import org.apache.pulsar.common.naming.ServiceUnitId;
+
+@Slf4j
+public class IsolationPoliciesHelper {
+
+    private final SimpleResourceAllocationPolicies policies;
+
+    public IsolationPoliciesHelper(SimpleResourceAllocationPolicies policies) {
+        this.policies = policies;
+    }
+
+    private static final FastThreadLocal<Set<String>> localBrokerCandidateCache = new FastThreadLocal<>() {
+        @Override
+        protected Set<String> initialValue() {
+            return new HashSet<>();
+        }
+    };
+
+    public Set<String> applyIsolationPolicies(Map<String, BrokerLookupData> availableBrokers,

Review Comment:
   Yes, let's keep the logic here temporarily, since your proposal will change the original behavior. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] heesung-sn commented on a diff in pull request #19592: [improve][broker] PIP-192: Support broker isolation policy

Posted by "heesung-sn (via GitHub)" <gi...@apache.org>.
heesung-sn commented on code in PR #19592:
URL: https://github.com/apache/pulsar/pull/19592#discussion_r1119197844


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java:
##########
@@ -334,16 +344,16 @@ public UnloadDecision findBundlesForUnloading(LoadManagerContext context,
                     int remainingTopBundles = topBundlesLoadData.size();
                     for (var e : topBundlesLoadData) {
                         String bundle = e.bundleName();
-                        if (!recentlyUnloadedBundles.containsKey(bundle) && isTransferable(bundle)) {
+                        if (!recentlyUnloadedBundles.containsKey(bundle)
+                                && isTransferable(context, bundle, maxBroker, Optional.ofNullable(minBroker))) {
                             var bundleData = e.stats();
                             double throughput = bundleData.msgThroughputIn + bundleData.msgThroughputOut;
                             if (remainingTopBundles > 1
                                     && (trafficMarkedToOffload < offloadThroughput
                                     || !atLeastOneBundleSelected)) {
                                 if (transfer) {
                                     selectedBundlesCache.put(maxBroker,
-                                            new Unload(maxBroker, bundle,
-                                                    Optional.of(minBroker)));
+                                            new Unload(maxBroker, bundle, Optional.ofNullable(minBroker)));

Review Comment:
   In what cases, do we expect a null minBroker? If minBroker is null, then we should skip this Unload.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java:
##########
@@ -434,4 +452,45 @@ private boolean isTransferable(String bundle) {
         }
         return true;
     }
+
+    /**
+     * Check the gave bundle and broker can be transfer or unload with isolation policies applied.
+     *
+     * @param context The load manager context.
+     * @param namespaceBundle The bundle try to unload or transfer.
+     * @param maxBroker The current broker.
+     * @param minBroker The broker will be transfer to.
+     * @return Can be transfer/unload or not.
+     */
+    private boolean canTransferWithIsolationPoliciesToBroker(LoadManagerContext context,
+                                                             NamespaceBundle namespaceBundle,
+                                                             String maxBroker,
+                                                             Optional<String> minBroker) {
+        if (isolationPoliciesHelper == null ||
+                !allocationPolicies.areIsolationPoliciesPresent(namespaceBundle.getNamespaceObject())) {
+            return true;
+        }
+        int timeout = context.brokerConfiguration().getMetadataStoreOperationTimeoutSeconds();
+        boolean transfer = context.brokerConfiguration().isLoadBalancerTransferEnabled();
+        Map<String, BrokerLookupData> availableBrokers;
+        try {
+            availableBrokers =

Review Comment:
   Can we pass this `availableBrokers` as a func arg? It seems redundant to call this for every candidate transfer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Demogorgon314 commented on a diff in pull request #19592: [improve][broker] PIP-192: Support broker isolation policy

Posted by "Demogorgon314 (via GitHub)" <gi...@apache.org>.
Demogorgon314 commented on code in PR #19592:
URL: https://github.com/apache/pulsar/pull/19592#discussion_r1119533942


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java:
##########
@@ -434,4 +452,45 @@ private boolean isTransferable(String bundle) {
         }
         return true;
     }
+
+    /**
+     * Check the gave bundle and broker can be transfer or unload with isolation policies applied.
+     *
+     * @param context The load manager context.
+     * @param namespaceBundle The bundle try to unload or transfer.
+     * @param maxBroker The current broker.
+     * @param minBroker The broker will be transfer to.
+     * @return Can be transfer/unload or not.
+     */
+    private boolean canTransferWithIsolationPoliciesToBroker(LoadManagerContext context,
+                                                             NamespaceBundle namespaceBundle,
+                                                             String maxBroker,
+                                                             Optional<String> minBroker) {
+        if (isolationPoliciesHelper == null ||
+                !allocationPolicies.areIsolationPoliciesPresent(namespaceBundle.getNamespaceObject())) {
+            return true;
+        }
+        int timeout = context.brokerConfiguration().getMetadataStoreOperationTimeoutSeconds();
+        boolean transfer = context.brokerConfiguration().isLoadBalancerTransferEnabled();
+        Map<String, BrokerLookupData> availableBrokers;
+        try {
+            availableBrokers =

Review Comment:
   Updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Technoboy- commented on a diff in pull request #19592: [improve][broker] PIP-192: Support broker isolation policy

Posted by "Technoboy- (via GitHub)" <gi...@apache.org>.
Technoboy- commented on code in PR #19592:
URL: https://github.com/apache/pulsar/pull/19592#discussion_r1125583304


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java:
##########
@@ -434,4 +466,37 @@ private boolean isTransferable(String bundle) {
         }
         return true;
     }
+
+    /**
+     * Check the gave bundle and broker can be transfer or unload with isolation policies applied.
+     *
+     * @param context The load manager context.
+     * @param availableBrokers The available brokers.
+     * @param namespaceBundle The bundle try to unload or transfer.
+     * @param maxBroker The current broker.
+     * @param minBroker The broker will be transfer to.
+     * @return Can be transfer/unload or not.
+     */
+    private boolean canTransferWithIsolationPoliciesToBroker(LoadManagerContext context,
+                                                             Map<String, BrokerLookupData> availableBrokers,
+                                                             NamespaceBundle namespaceBundle,
+                                                             String maxBroker,
+                                                             Optional<String> minBroker) {
+        if (isolationPoliciesHelper == null

Review Comment:
   ’maxBroker‘ -> 'currentBroker' 
   'minBroker' -> 'targetBroker', would be better ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] heesung-sn commented on a diff in pull request #19592: [improve][broker] PIP-192: Support broker isolation policy

Posted by "heesung-sn (via GitHub)" <gi...@apache.org>.
heesung-sn commented on code in PR #19592:
URL: https://github.com/apache/pulsar/pull/19592#discussion_r1122607503


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/policies/IsolationPoliciesHelper.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.policies;
+
+import io.netty.util.concurrent.FastThreadLocal;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
+import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies;
+import org.apache.pulsar.common.naming.ServiceUnitId;
+
+@Slf4j
+public class IsolationPoliciesHelper {
+
+    private final SimpleResourceAllocationPolicies policies;
+
+    public IsolationPoliciesHelper(SimpleResourceAllocationPolicies policies) {
+        this.policies = policies;
+    }
+
+    private static final FastThreadLocal<Set<String>> localBrokerCandidateCache = new FastThreadLocal<>() {
+        @Override
+        protected Set<String> initialValue() {
+            return new HashSet<>();
+        }
+    };
+
+    public Set<String> applyIsolationPolicies(Map<String, BrokerLookupData> availableBrokers,

Review Comment:
   I don't think it is harmful to keep this logic here. I can add those bundle filtering logic(filtering out bundles with isolation and anti-affinity-group in topk bundles) in my next PR(anti-affinity group support).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] heesung-sn commented on a diff in pull request #19592: [improve][broker] PIP-192: Support broker isolation policy

Posted by "heesung-sn (via GitHub)" <gi...@apache.org>.
heesung-sn commented on code in PR #19592:
URL: https://github.com/apache/pulsar/pull/19592#discussion_r1122601706


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/policies/IsolationPoliciesHelper.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.policies;
+
+import io.netty.util.concurrent.FastThreadLocal;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
+import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies;
+import org.apache.pulsar.common.naming.ServiceUnitId;
+
+@Slf4j
+public class IsolationPoliciesHelper {
+
+    private final SimpleResourceAllocationPolicies policies;
+
+    public IsolationPoliciesHelper(SimpleResourceAllocationPolicies policies) {
+        this.policies = policies;
+    }
+
+    private static final FastThreadLocal<Set<String>> localBrokerCandidateCache = new FastThreadLocal<>() {
+        @Override
+        protected Set<String> initialValue() {
+            return new HashSet<>();
+        }
+    };
+
+    public Set<String> applyIsolationPolicies(Map<String, BrokerLookupData> availableBrokers,

Review Comment:
   Nit: 
   It is good to consider the isolation policy here in this shedding strategy. However, it might be better if we just do not automatically unload/transfer bundles that configure any isolation policy or anti-affinity group. 
   
   Reasoning: These bundles configure limited sets of brokers to transfer/unload, which is not an ideal target to move around regarding load balance. (Hopefully, not all of the top k loaded bundles comply with isolation policy) It would be better to move other bundles that don't comply with any policies. "Fix bundles that have hard limits but move other free ones."
   
   For this reason, we could probably filter out those bundles with isolation or anti-affinity group policy in the `TopKBundles,` just like we filter out the system namespace bundles. WDYT?
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] heesung-sn commented on a diff in pull request #19592: [improve][broker] PIP-192: Support broker isolation policy

Posted by "heesung-sn (via GitHub)" <gi...@apache.org>.
heesung-sn commented on code in PR #19592:
URL: https://github.com/apache/pulsar/pull/19592#discussion_r1120552209


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/policies/IsolationPoliciesHelper.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.policies;
+
+import io.netty.util.concurrent.FastThreadLocal;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies;
+import org.apache.pulsar.common.naming.NamespaceBundle;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.ServiceUnitId;
+
+@Slf4j
+public class IsolationPoliciesHelper {
+
+    private final SimpleResourceAllocationPolicies policies;
+
+    public IsolationPoliciesHelper(SimpleResourceAllocationPolicies policies) {
+        this.policies = policies;
+    }
+
+    // Cache for primary brokers according to policies.
+    private static final FastThreadLocal<Set<String>> localPrimariesCache = new FastThreadLocal<>() {
+        @Override
+        protected Set<String> initialValue() {
+            return new HashSet<>();
+        }
+    };
+
+    // Cache for shard brokers according to policies.
+    private static final FastThreadLocal<Set<String>> localSecondaryCache = new FastThreadLocal<>() {
+        @Override
+        protected Set<String> initialValue() {
+            return new HashSet<>();
+        }
+    };
+
+    private static final FastThreadLocal<Set<String>> localBrokerCandidateCache = new FastThreadLocal<>() {
+        @Override
+        protected Set<String> initialValue() {
+            return new HashSet<>();
+        }
+    };
+
+    public Set<String> applyIsolationPolicies(Map<String, BrokerLookupData> availableBrokers,
+                                              ServiceUnitId serviceUnit) {
+        Set<String> primariesCache = localPrimariesCache.get();
+        primariesCache.clear();
+
+        Set<String> secondaryCache = localSecondaryCache.get();
+        secondaryCache.clear();
+
+        NamespaceName namespace = serviceUnit.getNamespaceObject();
+        boolean isIsolationPoliciesPresent = policies.areIsolationPoliciesPresent(namespace);
+        boolean isNonPersistentTopic = serviceUnit instanceof NamespaceBundle
+                && ((NamespaceBundle) serviceUnit).hasNonPersistentTopic();
+        if (isIsolationPoliciesPresent) {
+            log.debug("Isolation Policies Present for namespace - [{}]", namespace.toString());
+        }
+
+        availableBrokers.forEach((broker, lookupData) -> {
+            final String brokerUrlString = String.format("http://%s", broker);

Review Comment:
   nit: It would be great if we could reuse the code from LoadManagerShared.
   
   We can create a static function, `LoadManagerShared.doApplyIsolationPolicies(brokerUrlString, nonPersistentTopicsEnabled, persistentTopicsEnabled...)` and use them both for the extension and the existing load manager. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Demogorgon314 commented on a diff in pull request #19592: [improve][broker] PIP-192: Support broker isolation policy

Posted by "Demogorgon314 (via GitHub)" <gi...@apache.org>.
Demogorgon314 commented on code in PR #19592:
URL: https://github.com/apache/pulsar/pull/19592#discussion_r1119513659


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java:
##########
@@ -334,16 +344,16 @@ public UnloadDecision findBundlesForUnloading(LoadManagerContext context,
                     int remainingTopBundles = topBundlesLoadData.size();
                     for (var e : topBundlesLoadData) {
                         String bundle = e.bundleName();
-                        if (!recentlyUnloadedBundles.containsKey(bundle) && isTransferable(bundle)) {
+                        if (!recentlyUnloadedBundles.containsKey(bundle)
+                                && isTransferable(context, bundle, maxBroker, Optional.ofNullable(minBroker))) {
                             var bundleData = e.stats();
                             double throughput = bundleData.msgThroughputIn + bundleData.msgThroughputOut;
                             if (remainingTopBundles > 1
                                     && (trafficMarkedToOffload < offloadThroughput
                                     || !atLeastOneBundleSelected)) {
                                 if (transfer) {
                                     selectedBundlesCache.put(maxBroker,
-                                            new Unload(maxBroker, bundle,
-                                                    Optional.of(minBroker)));
+                                            new Unload(maxBroker, bundle, Optional.ofNullable(minBroker)));

Review Comment:
   I see the `hasTransferableBrokers` method already checked and guarantees have min broker. I will change back.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Demogorgon314 commented on a diff in pull request #19592: [improve][broker] PIP-192: Support broker isolation policy

Posted by "Demogorgon314 (via GitHub)" <gi...@apache.org>.
Demogorgon314 commented on code in PR #19592:
URL: https://github.com/apache/pulsar/pull/19592#discussion_r1122604968


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/policies/IsolationPoliciesHelper.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.policies;
+
+import io.netty.util.concurrent.FastThreadLocal;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
+import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies;
+import org.apache.pulsar.common.naming.ServiceUnitId;
+
+@Slf4j
+public class IsolationPoliciesHelper {
+
+    private final SimpleResourceAllocationPolicies policies;
+
+    public IsolationPoliciesHelper(SimpleResourceAllocationPolicies policies) {
+        this.policies = policies;
+    }
+
+    private static final FastThreadLocal<Set<String>> localBrokerCandidateCache = new FastThreadLocal<>() {
+        @Override
+        protected Set<String> initialValue() {
+            return new HashSet<>();
+        }
+    };
+
+    public Set<String> applyIsolationPolicies(Map<String, BrokerLookupData> availableBrokers,

Review Comment:
   Good idea!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Demogorgon314 closed pull request #19592: [improve][broker] PIP-192: Support broker isolation policy

Posted by "Demogorgon314 (via GitHub)" <gi...@apache.org>.
Demogorgon314 closed pull request #19592: [improve][broker] PIP-192: Support broker isolation policy
URL: https://github.com/apache/pulsar/pull/19592


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Technoboy- commented on pull request #19592: [improve][broker] PIP-192: Support broker isolation policy

Posted by "Technoboy- (via GitHub)" <gi...@apache.org>.
Technoboy- commented on PR #19592:
URL: https://github.com/apache/pulsar/pull/19592#issuecomment-1463239478

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] heesung-sn commented on a diff in pull request #19592: [improve][broker] PIP-192: Support broker isolation policy

Posted by "heesung-sn (via GitHub)" <gi...@apache.org>.
heesung-sn commented on code in PR #19592:
URL: https://github.com/apache/pulsar/pull/19592#discussion_r1122601706


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/policies/IsolationPoliciesHelper.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.policies;
+
+import io.netty.util.concurrent.FastThreadLocal;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
+import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies;
+import org.apache.pulsar.common.naming.ServiceUnitId;
+
+@Slf4j
+public class IsolationPoliciesHelper {
+
+    private final SimpleResourceAllocationPolicies policies;
+
+    public IsolationPoliciesHelper(SimpleResourceAllocationPolicies policies) {
+        this.policies = policies;
+    }
+
+    private static final FastThreadLocal<Set<String>> localBrokerCandidateCache = new FastThreadLocal<>() {
+        @Override
+        protected Set<String> initialValue() {
+            return new HashSet<>();
+        }
+    };
+
+    public Set<String> applyIsolationPolicies(Map<String, BrokerLookupData> availableBrokers,

Review Comment:
   Nit: 
   It is good to consider the isolation policy here in this shedding strategy. However, it might be better if we just do not automatically unload/transfer bundles that configure any isolation policy or anti-affinity group. 
   
   Reasoning: These bundles have limited sets of brokers to transfer/unload, which is not an ideal target to move around regarding load balance. (Hopefully, not all of the top k loaded bundles comply with isolation policy) It would be better to move other bundles that don't comply with any policies. "Fix bundles that have hard limits but move other free ones."
   
   For this reason, we could probably filter out those bundles with isolation or anti-affinity group policy in the `TopKBundles,` just like we filter out the system namespace bundles. WDYT?
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Demogorgon314 commented on a diff in pull request #19592: [improve][broker] PIP-192: Support broker isolation policy

Posted by "Demogorgon314 (via GitHub)" <gi...@apache.org>.
Demogorgon314 commented on code in PR #19592:
URL: https://github.com/apache/pulsar/pull/19592#discussion_r1125682901


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java:
##########
@@ -434,4 +466,37 @@ private boolean isTransferable(String bundle) {
         }
         return true;
     }
+
+    /**
+     * Check the gave bundle and broker can be transfer or unload with isolation policies applied.
+     *
+     * @param context The load manager context.
+     * @param availableBrokers The available brokers.
+     * @param namespaceBundle The bundle try to unload or transfer.
+     * @param maxBroker The current broker.
+     * @param minBroker The broker will be transfer to.
+     * @return Can be transfer/unload or not.
+     */
+    private boolean canTransferWithIsolationPoliciesToBroker(LoadManagerContext context,
+                                                             Map<String, BrokerLookupData> availableBrokers,
+                                                             NamespaceBundle namespaceBundle,
+                                                             String maxBroker,
+                                                             Optional<String> minBroker) {
+        if (isolationPoliciesHelper == null

Review Comment:
   Updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Technoboy- merged pull request #19592: [improve][broker] PIP-192: Support broker isolation policy

Posted by "Technoboy- (via GitHub)" <gi...@apache.org>.
Technoboy- merged PR #19592:
URL: https://github.com/apache/pulsar/pull/19592


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] heesung-sn commented on a diff in pull request #19592: [improve][broker] PIP-192: Support broker isolation policy

Posted by "heesung-sn (via GitHub)" <gi...@apache.org>.
heesung-sn commented on code in PR #19592:
URL: https://github.com/apache/pulsar/pull/19592#discussion_r1122601706


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/policies/IsolationPoliciesHelper.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.policies;
+
+import io.netty.util.concurrent.FastThreadLocal;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
+import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies;
+import org.apache.pulsar.common.naming.ServiceUnitId;
+
+@Slf4j
+public class IsolationPoliciesHelper {
+
+    private final SimpleResourceAllocationPolicies policies;
+
+    public IsolationPoliciesHelper(SimpleResourceAllocationPolicies policies) {
+        this.policies = policies;
+    }
+
+    private static final FastThreadLocal<Set<String>> localBrokerCandidateCache = new FastThreadLocal<>() {
+        @Override
+        protected Set<String> initialValue() {
+            return new HashSet<>();
+        }
+    };
+
+    public Set<String> applyIsolationPolicies(Map<String, BrokerLookupData> availableBrokers,

Review Comment:
   
   It is good to consider the isolation policy here in this shedding strategy. However, it might be better if we just do not automatically unload/transfer bundles that configure any isolation policy or anti-affinity group. 
   
   Reasoning: These bundles configure limited sets of brokers to transfer/unload, which is not an ideal target to move around regarding load balance. (Hopefully, not all of the top k loaded bundles comply with isolation policy) It would be better to move other bundles that don't comply with any policies. "Fix bundles that have hard limits but move other free ones."
   
   For this reason, we could probably filter out those bundles with isolation or anti-affinity group policy in the `TopKBundles,` just like we filter out the system namespace bundles. WDYT?
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] heesung-sn commented on a diff in pull request #19592: [improve][broker] PIP-192: Support broker isolation policy

Posted by "heesung-sn (via GitHub)" <gi...@apache.org>.
heesung-sn commented on code in PR #19592:
URL: https://github.com/apache/pulsar/pull/19592#discussion_r1122648432


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/policies/IsolationPoliciesHelper.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.policies;
+
+import io.netty.util.concurrent.FastThreadLocal;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
+import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies;
+import org.apache.pulsar.common.naming.ServiceUnitId;
+
+@Slf4j
+public class IsolationPoliciesHelper {
+
+    private final SimpleResourceAllocationPolicies policies;
+
+    public IsolationPoliciesHelper(SimpleResourceAllocationPolicies policies) {
+        this.policies = policies;
+    }
+
+    private static final FastThreadLocal<Set<String>> localBrokerCandidateCache = new FastThreadLocal<>() {
+        @Override
+        protected Set<String> initialValue() {
+            return new HashSet<>();
+        }
+    };
+
+    public Set<String> applyIsolationPolicies(Map<String, BrokerLookupData> availableBrokers,

Review Comment:
   Sure. sounds good to me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Technoboy- commented on a diff in pull request #19592: [improve][broker] PIP-192: Support broker isolation policy

Posted by "Technoboy- (via GitHub)" <gi...@apache.org>.
Technoboy- commented on code in PR #19592:
URL: https://github.com/apache/pulsar/pull/19592#discussion_r1125583482


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java:
##########
@@ -434,4 +466,37 @@ private boolean isTransferable(String bundle) {
         }
         return true;
     }
+
+    /**
+     * Check the gave bundle and broker can be transfer or unload with isolation policies applied.
+     *
+     * @param context The load manager context.
+     * @param availableBrokers The available brokers.
+     * @param namespaceBundle The bundle try to unload or transfer.
+     * @param maxBroker The current broker.
+     * @param minBroker The broker will be transfer to.
+     * @return Can be transfer/unload or not.
+     */
+    private boolean canTransferWithIsolationPoliciesToBroker(LoadManagerContext context,
+                                                             Map<String, BrokerLookupData> availableBrokers,
+                                                             NamespaceBundle namespaceBundle,
+                                                             String maxBroker,
+                                                             Optional<String> minBroker) {
+        if (isolationPoliciesHelper == null

Review Comment:
   Ah, many place use the same name



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org