You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/12/21 03:02:36 UTC

[pulsar] 05/15: fix shedding heartbeat ns (#13208)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 49caf878e1487dda6da509380c651366c0b8f0c7
Author: Zhanpeng Wu <zh...@qq.com>
AuthorDate: Mon Dec 13 19:44:55 2021 +0800

    fix shedding heartbeat ns (#13208)
    
    Related to #12252
    
    I found that the problem mentioned in #12252 has not been solved, because the `HEARTBEAT_NAMESPACE_PATTERN` pattern needs a namespace as input, but what actually provides is the full name of the bundle.
    
    1. fix the parttern matching problem
    2. add a test case for it
    
    This change is already covered by existing tests.
    
    (cherry picked from commit 78e3d8f7d872746db962be36ad3de49dac1ef015)
---
 .../java/org/apache/pulsar/broker/loadbalance/LoadData.java  | 10 ++++++++++
 .../pulsar/broker/loadbalance/impl/OverloadShedder.java      |  8 ++------
 .../pulsar/broker/loadbalance/impl/ThresholdShedder.java     |  6 +-----
 .../org/apache/pulsar/broker/namespace/NamespaceService.java |  6 ++++++
 .../org/apache/pulsar/common/naming/NamespaceBundle.java     | 12 ++++++++++++
 .../apache/pulsar/broker/namespace/NamespaceServiceTest.java |  8 ++++++++
 6 files changed, 39 insertions(+), 11 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadData.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadData.java
index a469c5c..4243420 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadData.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadData.java
@@ -20,8 +20,11 @@ package org.apache.pulsar.broker.loadbalance;
 
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
 import org.apache.pulsar.broker.BrokerData;
 import org.apache.pulsar.broker.BundleData;
+import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.common.naming.NamespaceBundle;
 
 /**
  * This class represents all data that could be relevant when making a load management decision.
@@ -59,6 +62,13 @@ public class LoadData {
         return bundleData;
     }
 
+    public Map<String, BundleData> getBundleDataForLoadShedding() {
+        return bundleData.entrySet().stream()
+                .filter(e -> !NamespaceService.isSystemServiceNamespace(
+                        NamespaceBundle.getBundleNamespace(e.getKey())))
+                .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+    }
+
     public Map<String, Long> getRecentlyUnloadedBundles() {
         return recentlyUnloadedBundles;
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedder.java
index 3f33fa3..985ed6f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedder.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedder.java
@@ -18,8 +18,6 @@
  */
 package org.apache.pulsar.broker.loadbalance.impl;
 
-import static org.apache.pulsar.broker.namespace.NamespaceService.HEARTBEAT_NAMESPACE_PATTERN;
-import static org.apache.pulsar.broker.namespace.NamespaceService.HEARTBEAT_NAMESPACE_PATTERN_V2;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.Multimap;
 import java.util.Map;
@@ -102,10 +100,8 @@ public class OverloadShedder implements LoadSheddingStrategy {
                 // Sort bundles by throughput, then pick the biggest N which combined
                 // make up for at least the minimum throughput to offload
 
-                loadData.getBundleData().entrySet().stream()
-                    .filter(e -> !HEARTBEAT_NAMESPACE_PATTERN.matcher(e.getKey()).matches()
-                            && !HEARTBEAT_NAMESPACE_PATTERN_V2.matcher(e.getKey()).matches()
-                            && localData.getBundles().contains(e.getKey()))
+                loadData.getBundleDataForLoadShedding().entrySet().stream()
+                    .filter(e -> localData.getBundles().contains(e.getKey()))
                     .map((e) -> {
                         // Map to throughput value
                         // Consider short-term byte rate to address system resource burden
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java
index 3e10326..afca708 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java
@@ -18,8 +18,6 @@
  */
 package org.apache.pulsar.broker.loadbalance.impl;
 
-import static org.apache.pulsar.broker.namespace.NamespaceService.HEARTBEAT_NAMESPACE_PATTERN;
-import static org.apache.pulsar.broker.namespace.NamespaceService.HEARTBEAT_NAMESPACE_PATTERN_V2;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.Multimap;
 import java.util.HashMap;
@@ -105,9 +103,7 @@ public class ThresholdShedder implements LoadSheddingStrategy {
             MutableBoolean atLeastOneBundleSelected = new MutableBoolean(false);
 
             if (localData.getBundles().size() > 1) {
-                loadData.getBundleData().entrySet().stream()
-                    .filter(e -> !HEARTBEAT_NAMESPACE_PATTERN.matcher(e.getKey()).matches()
-                        && !HEARTBEAT_NAMESPACE_PATTERN_V2.matcher(e.getKey()).matches())
+                loadData.getBundleDataForLoadShedding().entrySet().stream()
                     .map((e) -> {
                         String bundle = e.getKey();
                         BundleData bundleData = e.getValue();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index f6cba9c..8f6bfb8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -1349,6 +1349,12 @@ public class NamespaceService implements AutoCloseable {
         }
     }
 
+    public static boolean isSystemServiceNamespace(String namespace) {
+        return HEARTBEAT_NAMESPACE_PATTERN.matcher(namespace).matches()
+                || HEARTBEAT_NAMESPACE_PATTERN_V2.matcher(namespace).matches()
+                || SLA_NAMESPACE_PATTERN.matcher(namespace).matches();
+    }
+
     public boolean registerSLANamespace() throws PulsarServerException {
         boolean isNameSpaceRegistered = registerNamespace(getSLAMonitorNamespace(host, config), false);
         if (isNameSpaceRegistered) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundle.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundle.java
index 1531095..98dcb93 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundle.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundle.java
@@ -152,6 +152,18 @@ public class NamespaceBundle implements ServiceUnitId, Comparable<NamespaceBundl
         return namespaceBundle.substring(namespaceBundle.lastIndexOf('/') + 1);
     }
 
+    public static String getBundleNamespace(String namespaceBundle) {
+        int index = namespaceBundle.lastIndexOf('/');
+        if (index != -1) {
+            try {
+                return NamespaceName.get(namespaceBundle.substring(0, index)).toString();
+            } catch (Exception e) {
+                // return itself if meets invalid format
+            }
+        }
+        return namespaceBundle;
+    }
+
     public NamespaceBundleFactory getNamespaceBundleFactory() {
         return factory;
     }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
index 8d35cd3..d45dcc2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
@@ -539,6 +539,14 @@ public class NamespaceServiceTest extends BrokerTestBase {
         }
     }
 
+    @Test
+    public void testHeartbeatNamespaceMatch() throws Exception {
+        NamespaceName namespaceName = NamespaceService.getHeartbeatNamespace(pulsar.getAdvertisedAddress(), conf);
+        NamespaceBundle namespaceBundle = pulsar.getNamespaceService().getNamespaceBundleFactory().getFullBundle(namespaceName);
+        assertTrue(NamespaceService.isSystemServiceNamespace(
+                        NamespaceBundle.getBundleNamespace(namespaceBundle.toString())));
+    }
+
     @SuppressWarnings("unchecked")
     private Pair<NamespaceBundles, List<NamespaceBundle>> splitBundles(NamespaceBundleFactory utilityFactory,
             NamespaceName nsname, NamespaceBundles bundles, NamespaceBundle targetBundle) throws Exception {