You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/12/21 03:02:36 UTC
[pulsar] 05/15: fix shedding heartbeat ns (#13208)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 49caf878e1487dda6da509380c651366c0b8f0c7
Author: Zhanpeng Wu <zh...@qq.com>
AuthorDate: Mon Dec 13 19:44:55 2021 +0800
fix shedding heartbeat ns (#13208)
Related to #12252
I found that the problem mentioned in #12252 has not been solved, because the `HEARTBEAT_NAMESPACE_PATTERN` pattern needs a namespace as input, but what actually provides is the full name of the bundle.
1. fix the parttern matching problem
2. add a test case for it
This change is already covered by existing tests.
(cherry picked from commit 78e3d8f7d872746db962be36ad3de49dac1ef015)
---
.../java/org/apache/pulsar/broker/loadbalance/LoadData.java | 10 ++++++++++
.../pulsar/broker/loadbalance/impl/OverloadShedder.java | 8 ++------
.../pulsar/broker/loadbalance/impl/ThresholdShedder.java | 6 +-----
.../org/apache/pulsar/broker/namespace/NamespaceService.java | 6 ++++++
.../org/apache/pulsar/common/naming/NamespaceBundle.java | 12 ++++++++++++
.../apache/pulsar/broker/namespace/NamespaceServiceTest.java | 8 ++++++++
6 files changed, 39 insertions(+), 11 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadData.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadData.java
index a469c5c..4243420 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadData.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadData.java
@@ -20,8 +20,11 @@ package org.apache.pulsar.broker.loadbalance;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
import org.apache.pulsar.broker.BrokerData;
import org.apache.pulsar.broker.BundleData;
+import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.common.naming.NamespaceBundle;
/**
* This class represents all data that could be relevant when making a load management decision.
@@ -59,6 +62,13 @@ public class LoadData {
return bundleData;
}
+ public Map<String, BundleData> getBundleDataForLoadShedding() {
+ return bundleData.entrySet().stream()
+ .filter(e -> !NamespaceService.isSystemServiceNamespace(
+ NamespaceBundle.getBundleNamespace(e.getKey())))
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+ }
+
public Map<String, Long> getRecentlyUnloadedBundles() {
return recentlyUnloadedBundles;
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedder.java
index 3f33fa3..985ed6f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedder.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedder.java
@@ -18,8 +18,6 @@
*/
package org.apache.pulsar.broker.loadbalance.impl;
-import static org.apache.pulsar.broker.namespace.NamespaceService.HEARTBEAT_NAMESPACE_PATTERN;
-import static org.apache.pulsar.broker.namespace.NamespaceService.HEARTBEAT_NAMESPACE_PATTERN_V2;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
import java.util.Map;
@@ -102,10 +100,8 @@ public class OverloadShedder implements LoadSheddingStrategy {
// Sort bundles by throughput, then pick the biggest N which combined
// make up for at least the minimum throughput to offload
- loadData.getBundleData().entrySet().stream()
- .filter(e -> !HEARTBEAT_NAMESPACE_PATTERN.matcher(e.getKey()).matches()
- && !HEARTBEAT_NAMESPACE_PATTERN_V2.matcher(e.getKey()).matches()
- && localData.getBundles().contains(e.getKey()))
+ loadData.getBundleDataForLoadShedding().entrySet().stream()
+ .filter(e -> localData.getBundles().contains(e.getKey()))
.map((e) -> {
// Map to throughput value
// Consider short-term byte rate to address system resource burden
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java
index 3e10326..afca708 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java
@@ -18,8 +18,6 @@
*/
package org.apache.pulsar.broker.loadbalance.impl;
-import static org.apache.pulsar.broker.namespace.NamespaceService.HEARTBEAT_NAMESPACE_PATTERN;
-import static org.apache.pulsar.broker.namespace.NamespaceService.HEARTBEAT_NAMESPACE_PATTERN_V2;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
import java.util.HashMap;
@@ -105,9 +103,7 @@ public class ThresholdShedder implements LoadSheddingStrategy {
MutableBoolean atLeastOneBundleSelected = new MutableBoolean(false);
if (localData.getBundles().size() > 1) {
- loadData.getBundleData().entrySet().stream()
- .filter(e -> !HEARTBEAT_NAMESPACE_PATTERN.matcher(e.getKey()).matches()
- && !HEARTBEAT_NAMESPACE_PATTERN_V2.matcher(e.getKey()).matches())
+ loadData.getBundleDataForLoadShedding().entrySet().stream()
.map((e) -> {
String bundle = e.getKey();
BundleData bundleData = e.getValue();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index f6cba9c..8f6bfb8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -1349,6 +1349,12 @@ public class NamespaceService implements AutoCloseable {
}
}
+ public static boolean isSystemServiceNamespace(String namespace) {
+ return HEARTBEAT_NAMESPACE_PATTERN.matcher(namespace).matches()
+ || HEARTBEAT_NAMESPACE_PATTERN_V2.matcher(namespace).matches()
+ || SLA_NAMESPACE_PATTERN.matcher(namespace).matches();
+ }
+
public boolean registerSLANamespace() throws PulsarServerException {
boolean isNameSpaceRegistered = registerNamespace(getSLAMonitorNamespace(host, config), false);
if (isNameSpaceRegistered) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundle.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundle.java
index 1531095..98dcb93 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundle.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundle.java
@@ -152,6 +152,18 @@ public class NamespaceBundle implements ServiceUnitId, Comparable<NamespaceBundl
return namespaceBundle.substring(namespaceBundle.lastIndexOf('/') + 1);
}
+ public static String getBundleNamespace(String namespaceBundle) {
+ int index = namespaceBundle.lastIndexOf('/');
+ if (index != -1) {
+ try {
+ return NamespaceName.get(namespaceBundle.substring(0, index)).toString();
+ } catch (Exception e) {
+ // return itself if meets invalid format
+ }
+ }
+ return namespaceBundle;
+ }
+
public NamespaceBundleFactory getNamespaceBundleFactory() {
return factory;
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
index 8d35cd3..d45dcc2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
@@ -539,6 +539,14 @@ public class NamespaceServiceTest extends BrokerTestBase {
}
}
+ @Test
+ public void testHeartbeatNamespaceMatch() throws Exception {
+ NamespaceName namespaceName = NamespaceService.getHeartbeatNamespace(pulsar.getAdvertisedAddress(), conf);
+ NamespaceBundle namespaceBundle = pulsar.getNamespaceService().getNamespaceBundleFactory().getFullBundle(namespaceName);
+ assertTrue(NamespaceService.isSystemServiceNamespace(
+ NamespaceBundle.getBundleNamespace(namespaceBundle.toString())));
+ }
+
@SuppressWarnings("unchecked")
private Pair<NamespaceBundles, List<NamespaceBundle>> splitBundles(NamespaceBundleFactory utilityFactory,
NamespaceName nsname, NamespaceBundles bundles, NamespaceBundle targetBundle) throws Exception {